1 /* -*-pgsql-c-*- */
2 /*
3 * pgpool: a language independent connection pool server for PostgreSQL
4 * written by Tatsuo Ishii
5 *
6 * Copyright (c) 2003-2021 PgPool Global Development Group
7 *
8 * Permission to use, copy, modify, and distribute this software and
9 * its documentation for any purpose and without fee is hereby
10 * granted, provided that the above copyright notice appear in all
11 * copies and that both that copyright notice and this permission
12 * notice appear in supporting documentation, and that the name of the
13 * author not be used in advertising or publicity pertaining to
14 * distribution of the software without specific, written prior
15 * permission. The author makes no representations about the
16 * suitability of this software for any purpose. It is provided "as
17 * is" without express or implied warranty.
18 *
19 *---------------------------------------------------------------------
20 * pool_proto_modules.c: modules corresponding to message protocols.
21 * used by pool_process_query()
22 *---------------------------------------------------------------------
23 */
24 #include "config.h"
25 #include <errno.h>
26
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30 #ifdef HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #ifdef HAVE_SYS_SELECT_H
34 #include <sys/select.h>
35 #endif
36
37
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <string.h>
41 #include <netinet/in.h>
42 #include <ctype.h>
43
44 #include "pool.h"
45 #include "rewrite/pool_timestamp.h"
46 #include "rewrite/pool_lobj.h"
47 #include "protocol/pool_proto_modules.h"
48 #include "protocol/pool_process_query.h"
49 #include "protocol/pool_pg_utils.h"
50 #include "pool_config.h"
51 #include "parser/pool_string.h"
52 #include "context/pool_session_context.h"
53 #include "context/pool_query_context.h"
54 #include "utils/elog.h"
55 #include "utils/pool_select_walker.h"
56 #include "utils/pool_relcache.h"
57 #include "utils/pool_stream.h"
58 #include "utils/ps_status.h"
59 #include "utils/pool_signal.h"
60 #include "utils/pool_ssl.h"
61 #include "utils/palloc.h"
62 #include "utils/memutils.h"
63 #include "query_cache/pool_memqcache.h"
64 #include "main/pool_internal_comms.h"
65 #include "pool_config_variables.h"
66
67 char *copy_table = NULL; /* copy table name */
68 char *copy_schema = NULL; /* copy table name */
69 char copy_delimiter; /* copy delimiter char */
70 char *copy_null = NULL; /* copy null string */
71
72 /*
73 * Non 0 if allow to close internal transaction. This variable was
74 * introduced on 2008/4/3 not to close an internal transaction when
75 * Sync message is received after receiving Parse message. This hack
76 * is for PHP-PDO.
77 */
78 static int allow_close_transaction = 1;
79
80 int is_select_pgcatalog = 0;
81 int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR
82 * UPDATE */
83
84 /*
85 * last query string sent to simpleQuery()
86 */
87 char query_string_buffer[QUERY_STRING_BUFFER_LEN];
88
89 static int check_errors(POOL_CONNECTION_POOL * backend, int backend_id);
90 static void generate_error_message(char *prefix, int specific_error, char *query);
91 static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
92 POOL_CONNECTION_POOL * backend,
93 POOL_SENT_MESSAGE * message,
94 POOL_SENT_MESSAGE * bind_message);
95 static int *find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes);
96 static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
97 POOL_CONNECTION_POOL * backend);
98
99 static char *flatten_set_variable_args(const char *name, List *args);
100 static bool
101 process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context);
102 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
103 POOL_CONNECTION_POOL * backend);
104 static void si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node, bool tstate_check);
105
106 /*
107 * This is the workhorse of processing the pg_terminate_backend function to
108 * make sure that the use of function should not trigger the backend node failover.
109 *
110 * The function searches for the pg_terminate_backend() function call in the
111 * query parse tree and if the search comes out to be successful,
112 * the next step is to locate the pgpool-II child process and a backend node
113 * of that connection whose PID is specified in pg_terminate_backend argument.
114 *
115 * Once the connection is identified, we set the swallow_termination flag of
116 * that connection (in shared memory) and also sets the query destination to
117 * the same backend that hosts the connection.
118 *
119 * The function returns true on success, i.e. when the query contains the
120 * pg_terminate_backend call and that call refers to the backend
121 * connection that belongs to pgpool-II.
122 *
123 * Note: Since upon successful return this function has already
124 * set the destination backend node for the current query,
125 * so for that case pool_where_to_send() should not be called.
126 *
127 */
128 static bool
process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)129 process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)
130 {
131 /*
132 * locate pg_terminate_backend and get the pid argument, if
133 * pg_terminate_backend is present in the query
134 */
135 int backend_pid = pool_get_terminate_backend_pid(query_context->parse_tree);
136
137 if (backend_pid > 0)
138 {
139 int backend_node = 0;
140 ConnectionInfo *conn = pool_coninfo_backend_pid(backend_pid, &backend_node);
141
142 if (conn == NULL)
143 {
144 ereport(LOG,
145 (errmsg("found the pg_terminate_backend request for backend pid:%d, but the backend connection does not belong to pgpool-II", backend_pid)));
146
147 /*
148 * we are not able to find the backend connection with the pid so
149 * there is not much left for us to do here
150 */
151 return false;
152 }
153 ereport(LOG,
154 (errmsg("found the pg_terminate_backend request for backend pid:%d on backend node:%d", backend_pid, backend_node),
155 errdetail("setting the connection flag")));
156
157 pool_set_connection_will_be_terminated(conn);
158
159 /*
160 * It was the pg_terminate_backend call so send the query to
161 * appropriate backend
162 */
163 query_context->pg_terminate_backend_conn = conn;
164 pool_force_query_node_to_backend(query_context, backend_node);
165 return true;
166 }
167 return false;
168 }
169
170 /*
171 * Process Query('Q') message
172 * Query messages include an SQL string.
173 */
174 POOL_STATUS
SimpleQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)175 SimpleQuery(POOL_CONNECTION * frontend,
176 POOL_CONNECTION_POOL * backend, int len, char *contents)
177 {
178 static char *sq_config = "pool_status";
179 static char *sq_pools = "pool_pools";
180 static char *sq_processes = "pool_processes";
181 static char *sq_nodes = "pool_nodes";
182 static char *sq_version = "pool_version";
183 static char *sq_cache = "pool_cache";
184 static char *sq_health_check_stats = "pool_health_check_stats";
185 static char *sq_backend_stats = "pool_backend_stats";
186 int commit;
187 List *parse_tree_list;
188 Node *node = NULL;
189 POOL_STATUS status;
190 int lock_kind;
191 bool is_likely_select = false;
192 int specific_error = 0;
193
194 POOL_SESSION_CONTEXT *session_context;
195 POOL_QUERY_CONTEXT *query_context;
196
197 bool error;
198
199 /* Get session context */
200 session_context = pool_get_session_context(false);
201
202 /* save last query string for logging purpose */
203 strlcpy(query_string_buffer, contents, sizeof(query_string_buffer));
204
205 /* show ps status */
206 query_ps_status(contents, backend);
207
208 /* log query to log file if necessary */
209 if (pool_config->log_statement)
210 ereport(LOG, (errmsg("statement: %s", contents)));
211
212 /*
213 * Fetch memory cache if possible
214 */
215 if (pool_config->memory_cache_enabled)
216 is_likely_select = pool_is_likely_select(contents);
217
218 /*
219 * If memory query cache enabled and the query seems to be a SELECT use
220 * query cache if possible. However if we are in an explicit transaction
221 * and we had writing query before, we should not use query cache. This
222 * means that even the writing query is not anything related to the table
223 * which is used the SELECT, we do not use cache. Of course we could
224 * analyze the SELECT to see if it uses the table modified in the
225 * transaction, but it will need parsing query and accessing to system
226 * catalog, which will add significant overhead. Moreover if we are in
227 * aborted transaction, commands should be ignored, so we should not use
228 * query cache.
229 */
230 if (pool_config->memory_cache_enabled && is_likely_select &&
231 !pool_is_writing_transaction() &&
232 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
233 {
234 bool foundp;
235
236 /*
237 * If the query is SELECT from table to cache, try to fetch cached
238 * result.
239 */
240 status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
241
242 if (status != POOL_CONTINUE)
243 return status;
244
245 if (foundp)
246 {
247 pool_ps_idle_display(backend);
248 pool_set_skip_reading_from_backends();
249 pool_stats_count_up_num_cache_hits();
250 return POOL_CONTINUE;
251 }
252 }
253
254 /* Create query context */
255 query_context = pool_init_query_context();
256 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
257
258 /* parse SQL string */
259 parse_tree_list = raw_parser(contents, RAW_PARSE_DEFAULT, len, &error, !REPLICATION);
260
261 if (parse_tree_list == NIL)
262 {
263 /* is the query empty? */
264 if (*contents == '\0' || *contents == ';' || error == false)
265 {
266 /*
267 * JBoss sends empty queries for checking connections. We replace
268 * the empty query with SELECT command not to affect load balance.
269 * [Pgpool-general] Confused about JDBC and load balancing
270 */
271 parse_tree_list = get_dummy_read_query_tree();
272 }
273 else
274 {
275 /*
276 * Unable to parse the query. Probably syntax error or the query
277 * is too new and our parser cannot understand. Treat as if it
278 * were an DELETE command. Note that the DELETE command does not
279 * execute, instead the original query will be sent to backends,
280 * which may or may not cause an actual syntax errors. The command
281 * will be sent to all backends in replication mode or
282 * primary in native replication mode.
283 */
284 if (!strcmp(remote_host, "[local]"))
285 {
286 ereport(LOG,
287 (errmsg("Unable to parse the query: \"%s\" from local client", contents)));
288 }
289 else
290 {
291 ereport(LOG,
292 (errmsg("Unable to parse the query: \"%s\" from client %s(%s)", contents, remote_host, remote_port)));
293 }
294 parse_tree_list = get_dummy_write_query_tree();
295 query_context->is_parse_error = true;
296 }
297 }
298 MemoryContextSwitchTo(old_context);
299
300 if (parse_tree_list != NIL)
301 {
302 node = raw_parser2(parse_tree_list);
303
304 /*
305 * Start query context
306 */
307 pool_start_query(query_context, contents, len, node);
308
309 /*
310 * Create PostgreSQL version cache. Since the provided query might
311 * cause a syntax error, we want to issue "SELECT version()" which is
312 * called inside Pgversion() here.
313 */
314 Pgversion(backend);
315
316 /*
317 * If the query is DROP DATABASE, after executing it, cache files
318 * directory must be discarded. So we have to get the DB's oid before
319 * it will be DROPped.
320 */
321 if (pool_config->memory_cache_enabled && is_drop_database(node))
322 {
323 DropdbStmt *stmt = (DropdbStmt *) node;
324
325 query_context->dboid = pool_get_database_oid_from_dbname(stmt->dbname);
326 if (query_context->dboid != 0)
327 {
328 ereport(DEBUG1,
329 (errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid)));
330 }
331 }
332
333 /*
334 * Check if multi statement query
335 */
336 if (parse_tree_list && list_length(parse_tree_list) > 1)
337 {
338 query_context->is_multi_statement = true;
339 }
340 else
341 {
342 query_context->is_multi_statement = false;
343 }
344
345 /*
346 * check COPY FROM STDIN if true, set copy_* variable
347 */
348 check_copy_from_stdin(node);
349
350 if (IsA(node, PgpoolVariableShowStmt))
351 {
352 VariableShowStmt *vnode = (VariableShowStmt *) node;
353
354 report_config_variable(frontend, backend, vnode->name);
355
356 pool_ps_idle_display(backend);
357 pool_query_context_destroy(query_context);
358 pool_set_skip_reading_from_backends();
359 return POOL_CONTINUE;
360 }
361
362 if (IsA(node, PgpoolVariableSetStmt))
363 {
364 VariableSetStmt *vnode = (VariableSetStmt *) node;
365 const char *value = NULL;
366
367 if (vnode->kind == VAR_SET_VALUE)
368 {
369 value = flatten_set_variable_args("name", vnode->args);
370 }
371 if (vnode->kind == VAR_RESET_ALL)
372 {
373 reset_all_variables(frontend, backend);
374 }
375 else
376 {
377 set_config_option_for_session(frontend, backend, vnode->name, value);
378 }
379
380 pool_ps_idle_display(backend);
381 pool_query_context_destroy(query_context);
382 pool_set_skip_reading_from_backends();
383 return POOL_CONTINUE;
384 }
385
386 /* status reporting? */
387 if (IsA(node, VariableShowStmt))
388 {
389 bool is_valid_show_command = false;
390 VariableShowStmt *vnode = (VariableShowStmt *) node;
391
392 if (!strcmp(sq_config, vnode->name))
393 {
394 is_valid_show_command = true;
395 ereport(DEBUG1,
396 (errmsg("SimpleQuery"),
397 errdetail("config reporting")));
398 config_reporting(frontend, backend);
399 }
400 else if (!strcmp(sq_pools, vnode->name))
401 {
402 is_valid_show_command = true;
403 ereport(DEBUG1,
404 (errmsg("SimpleQuery"),
405 errdetail("pools reporting")));
406
407 pools_reporting(frontend, backend);
408 }
409 else if (!strcmp(sq_processes, vnode->name))
410 {
411 is_valid_show_command = true;
412 ereport(DEBUG1,
413 (errmsg("SimpleQuery"),
414 errdetail("processes reporting")));
415 processes_reporting(frontend, backend);
416 }
417 else if (!strcmp(sq_nodes, vnode->name))
418 {
419 is_valid_show_command = true;
420 ereport(DEBUG1,
421 (errmsg("SimpleQuery"),
422 errdetail("nodes reporting")));
423 nodes_reporting(frontend, backend);
424 }
425 else if (!strcmp(sq_version, vnode->name))
426 {
427 is_valid_show_command = true;
428 ereport(DEBUG1,
429 (errmsg("SimpleQuery"),
430 errdetail("version reporting")));
431 version_reporting(frontend, backend);
432 }
433 else if (!strcmp(sq_cache, vnode->name))
434 {
435 is_valid_show_command = true;
436 ereport(DEBUG1,
437 (errmsg("SimpleQuery"),
438 errdetail("cache reporting")));
439 cache_reporting(frontend, backend);
440 }
441 else if (!strcmp(sq_health_check_stats, vnode->name))
442 {
443 is_valid_show_command = true;
444 ereport(DEBUG1,
445 (errmsg("SimpleQuery"),
446 errdetail("health check stats")));
447 show_health_check_stats(frontend, backend);
448 }
449
450 else if (!strcmp(sq_backend_stats, vnode->name))
451 {
452 is_valid_show_command = true;
453 ereport(DEBUG1,
454 (errmsg("SimpleQuery"),
455 errdetail("backend stats")));
456 show_backend_stats(frontend, backend);
457 }
458
459 if (is_valid_show_command)
460 {
461 pool_ps_idle_display(backend);
462 pool_query_context_destroy(query_context);
463 pool_set_skip_reading_from_backends();
464 return POOL_CONTINUE;
465 }
466 }
467
468 /*
469 * If the table is to be cached, set is_cache_safe TRUE and register
470 * table oids.
471 */
472 if (pool_config->memory_cache_enabled && query_context->is_multi_statement == false)
473 {
474 bool is_select_query = false;
475 int num_oids;
476 int *oids;
477 int i;
478
479 /* Check if the query is actually SELECT */
480 if (is_likely_select && IsA(node, SelectStmt))
481 {
482 is_select_query = true;
483 }
484
485 /* Switch the flag of is_cache_safe in session_context */
486 if (is_select_query && !query_context->is_parse_error &&
487 pool_is_allow_to_cache(query_context->parse_tree,
488 query_context->original_query))
489 {
490 pool_set_cache_safe();
491 }
492 else
493 {
494 pool_unset_cache_safe();
495 }
496
497 /*
498 * If table is to be cached and the query is DML, save the table
499 * oid
500 */
501 if (!query_context->is_parse_error)
502 {
503 num_oids = pool_extract_table_oids(node, &oids);
504
505 if (num_oids > 0)
506 {
507 /* Save to oid buffer */
508 for (i = 0; i < num_oids; i++)
509 {
510 pool_add_dml_table_oid(oids[i]);
511 }
512 }
513 }
514 }
515
516 /*
517 * If we are in SI mode, start an internal transaction if needed.
518 */
519 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
520 {
521 start_internal_transaction(frontend, backend, node);
522 }
523
524 /*
525 * From now on it is possible that query is actually sent to backend.
526 * So we need to acquire snapshot while there's no committing backend
527 * in snapshot isolation mode except while processing reset queries.
528 * For this purpose, we send a query to know whether the transaction
529 * is READ ONLY or not. Sending actual user's query is not possible
530 * because it might cause rw-conflict, which in turn causes a
531 * deadlock.
532 */
533 si_get_snapshot(frontend, backend, node,
534 !INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID));
535
536 /*
537 * pg_terminate function needs special handling, process it if the
538 * query contains one, otherwise use pool_where_to_send() to decide
539 * destination backend node for the query
540 */
541 if (process_pg_terminate_backend_func(query_context) == false)
542 {
543 /*
544 * Decide where to send query
545 */
546 pool_where_to_send(query_context, query_context->original_query,
547 query_context->parse_tree);
548 }
549
550 /*
551 * if this is DROP DATABASE command, send USR1 signal to parent and
552 * ask it to close all idle connections. XXX This is overkill. It
553 * would be better to close the idle connection for the database which
554 * DROP DATABASE command tries to drop. This is impossible at this
555 * point, since we have no way to pass such info to other processes.
556 */
557 if (is_drop_database(node))
558 {
559 struct timeval stime;
560
561 stime.tv_usec = 0;
562 stime.tv_sec = 5; /* XXX give arbitrary time to allow
563 * closing idle connections */
564
565 ereport(DEBUG1,
566 (errmsg("Query: sending SIGUSR1 signal to parent")));
567
568 ignore_sigusr1 = 1; /* disable SIGUSR1 handler */
569 close_idle_connections();
570
571 /*
572 * We need to loop over here since we might get some signals while
573 * sleeping
574 */
575 for (;;)
576 {
577 int sts;
578
579 errno = 0;
580 sts = select(0, NULL, NULL, NULL, &stime);
581 if (stime.tv_usec == 0 && stime.tv_sec == 0)
582 break;
583 if (sts != 0 && errno != EINTR)
584 {
585 elog(DEBUG1, "select(2) returns error: %s", strerror(errno));
586 break;
587 }
588 }
589
590 ignore_sigusr1 = 0;
591 }
592
593 /*---------------------------------------------------------------------------
594 * determine if we need to lock the table
595 * to keep SERIAL data consistency among servers
596 * conditions:
597 * - replication is enabled
598 * - protocol is V3
599 * - statement is INSERT
600 * - either "INSERT LOCK" comment exists or insert_lock directive specified
601 *---------------------------------------------------------------------------
602 */
603 if (!RAW_MODE && !SL_MODE)
604 {
605 /*
606 * If there's only one node to send the command, there's no point
607 * to start a transaction.
608 */
609 if (pool_multi_node_to_be_sent(query_context))
610 {
611 /* start a transaction if needed */
612 start_internal_transaction(frontend, backend, (Node *) node);
613
614 /* check if need lock */
615 lock_kind = need_insert_lock(backend, contents, node);
616 if (lock_kind)
617 {
618 /* if so, issue lock command */
619 status = insert_lock(frontend, backend, contents, (InsertStmt *) node, lock_kind);
620 if (status != POOL_CONTINUE)
621 {
622 pool_query_context_destroy(query_context);
623 return status;
624 }
625 }
626 }
627 }
628 }
629
630 if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
631 {
632 int i;
633
634 for (i = 0; i < NUM_BACKENDS; i++)
635 {
636 if (VALID_BACKEND(i))
637 TSTATE(backend, i) = 'T';
638 }
639 }
640
641 if (node)
642 {
643 POOL_SENT_MESSAGE *msg = NULL;
644
645 if (IsA(node, PrepareStmt))
646 {
647 msg = pool_create_sent_message('Q', len, contents, 0,
648 ((PrepareStmt *) node)->name,
649 query_context);
650 session_context->uncompleted_message = msg;
651 }
652 else
653 session_context->uncompleted_message = NULL;
654 }
655
656
657 if (!RAW_MODE)
658 {
659 /* check if query is "COMMIT" or "ROLLBACK" */
660 commit = is_commit_or_rollback_query(node);
661
662 /*
663 * Query is not commit/rollback
664 */
665 if (!commit)
666 {
667 char *rewrite_query = NULL;
668
669 if (node)
670 {
671 POOL_SENT_MESSAGE *msg = NULL;
672
673 if (IsA(node, PrepareStmt))
674 {
675 msg = session_context->uncompleted_message;
676 }
677 else if (IsA(node, ExecuteStmt))
678 {
679 msg = pool_get_sent_message('Q', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
680 if (!msg)
681 msg = pool_get_sent_message('P', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
682 }
683
684 /* rewrite `now()' to timestamp literal */
685 if (!is_select_query(node, query_context->original_query) ||
686 pool_has_function_call(node) || pool_config->replicate_select)
687 rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg);
688
689 /*
690 * If the query is BEGIN READ WRITE or BEGIN ... SERIALIZABLE
691 * in native replication mode, we send BEGIN to standbys
692 * instead. original_query which is BEGIN READ WRITE is sent
693 * to primary. rewritten_query which is BEGIN is sent to
694 * standbys.
695 */
696 if (pool_need_to_treat_as_if_default_transaction(query_context))
697 {
698 rewrite_query = pstrdup("BEGIN");
699 }
700
701 if (rewrite_query != NULL)
702 {
703 query_context->rewritten_query = rewrite_query;
704 query_context->rewritten_length = strlen(rewrite_query) + 1;
705 }
706 }
707
708 /*
709 * Optimization effort: If there's only one session, we do not
710 * need to wait for the main node's response, and could execute
711 * the query concurrently. In snapshot isolation mode we cannot
712 * do this optimization because we need to wait for main node's
713 * response first.
714 */
715 if (pool_config->num_init_children == 1 &&
716 pool_config->backend_clustering_mode != CM_SNAPSHOT_ISOLATION)
717 {
718 /* Send query to all DB nodes at once */
719 status = pool_send_and_wait(query_context, 0, 0);
720 /* free_parser(); */
721 return status;
722 }
723
724 /* Send the query to main node */
725 pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
726
727 /* Check specific errors */
728 specific_error = check_errors(backend, MAIN_NODE_ID);
729 if (specific_error)
730 {
731 /* log error message */
732 generate_error_message("SimpleQuery: ", specific_error, contents);
733 }
734 }
735
736 if (specific_error)
737 {
738 char msg[1024] = POOL_ERROR_QUERY; /* large enough */
739 int len = strlen(msg) + 1;
740
741 memset(msg + len, 0, sizeof(int));
742
743 /* send query to other nodes */
744 query_context->rewritten_query = msg;
745 query_context->rewritten_length = len;
746 pool_send_and_wait(query_context, -1, MAIN_NODE_ID);
747 }
748 else
749 {
750 /*
751 * If commit command and Snapshot Isolation mode, wait for until
752 * snapshot prepared.
753 */
754 if (commit && pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
755 {
756 si_commit_request();
757 }
758
759 /*
760 * Send the query to other than main node.
761 */
762 pool_send_and_wait(query_context, -1, MAIN_NODE_ID);
763
764 }
765
766 /*
767 * Send "COMMIT" or "ROLLBACK" to only main node if query is
768 * "COMMIT" or "ROLLBACK"
769 */
770 if (commit)
771 {
772 pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
773
774 /*
775 * If we are in the snapshot isolation mode, we need to declare
776 * that commit has been done. This would wake up other children
777 * waiting for acquiring snapshot.
778 */
779 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
780 {
781 si_commit_done();
782 session_context->transaction_read_only = false;
783 }
784 }
785
786 }
787 else
788 {
789 pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
790 }
791
792 return POOL_CONTINUE;
793 }
794
795 /*
796 * process EXECUTE (V3 only)
797 */
798 POOL_STATUS
Execute(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)799 Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
800 int len, char *contents)
801 {
802 int commit = 0;
803 char *query = NULL;
804 Node *node;
805 int specific_error = 0;
806 POOL_SESSION_CONTEXT *session_context;
807 POOL_QUERY_CONTEXT *query_context;
808 POOL_SENT_MESSAGE *bind_msg;
809 bool foundp = false;
810
811 /* Get session context */
812 session_context = pool_get_session_context(false);
813
814 if (pool_config->log_client_messages)
815 ereport(LOG,
816 (errmsg("Execute message from frontend."),
817 errdetail("portal: \"%s\"", contents)));
818 ereport(DEBUG2,
819 (errmsg("Execute: portal name <%s>", contents)));
820
821 bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
822 if (!bind_msg)
823 ereport(FATAL,
824 (return_code(2),
825 errmsg("unable to Execute"),
826 errdetail("unable to get bind message")));
827
828 if (!bind_msg->query_context)
829 ereport(FATAL,
830 (return_code(2),
831 errmsg("unable to Execute"),
832 errdetail("unable to get query context")));
833
834 if (!bind_msg->query_context->original_query)
835 ereport(FATAL,
836 (return_code(2),
837 errmsg("unable to Execute"),
838 errdetail("unable to get original query")));
839
840 if (!bind_msg->query_context->parse_tree)
841 ereport(FATAL,
842 (return_code(2),
843 errmsg("unable to Execute"),
844 errdetail("unable to get parse tree")));
845
846 query_context = bind_msg->query_context;
847 node = bind_msg->query_context->parse_tree;
848 query = bind_msg->query_context->original_query;
849
850 strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
851
852 ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query)));
853
854 ereport(DEBUG1, (errmsg("Execute: pool_is_writing_transaction: %d TSTATE: %c",
855 pool_is_writing_transaction(),
856 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
857
858 /* log query to log file if necessary */
859 if (pool_config->log_statement)
860 ereport(LOG, (errmsg("statement: %s", query)));
861
862 /*
863 * Fetch memory cache if possible
864 */
865 if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() &&
866 (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
867 && pool_is_likely_select(query))
868 {
869 POOL_STATUS status;
870 char *search_query = NULL;
871 int len;
872
873 #define STR_ALLOC_SIZE 1024
874 ereport(DEBUG1, (errmsg("Execute: pool_is_likely_select: true pool_is_writing_transaction: %d TSTATE: %c",
875 pool_is_writing_transaction(),
876 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
877
878 len = strlen(query) + 1;
879 search_query = MemoryContextStrdup(query_context->memory_context, query);
880
881 ereport(DEBUG1, (errmsg("Execute: checking cache fetch condition")));
882
883 /*
884 * Add bind message's info to query to search.
885 */
886 if (query_context->is_cache_safe && bind_msg->param_offset && bind_msg->contents)
887 {
888 /* Extract binary contents from bind message */
889 char *query_in_bind_msg = bind_msg->contents + bind_msg->param_offset;
890 char hex_str[4]; /* 02X chars + white space + null end */
891 int i;
892 int alloc_len;
893
894 alloc_len = (len / STR_ALLOC_SIZE + 1) * STR_ALLOC_SIZE;
895 search_query = repalloc(search_query, alloc_len);
896
897 for (i = 0; i < bind_msg->len - bind_msg->param_offset; i++)
898 {
899 int hexlen;
900
901 snprintf(hex_str, sizeof(hex_str), (i == 0) ? " %02X" : "%02X", 0xff & query_in_bind_msg[i]);
902 hexlen = strlen(hex_str);
903
904 if ((len + hexlen) >= alloc_len)
905 {
906 alloc_len += STR_ALLOC_SIZE;
907 search_query = repalloc(search_query, alloc_len);
908 }
909 strcat(search_query, hex_str);
910 len += hexlen;
911 }
912
913 /*
914 * If bind message is sent again to an existing prepared statement,
915 * it is possible that query_w_hex remains. Before setting newly
916 * allocated query_w_hex's pointer to the query context, free the
917 * previously allocated memory.
918 */
919 if (query_context->query_w_hex)
920 {
921 pfree(query_context->query_w_hex);
922 }
923 query_context->query_w_hex = search_query;
924
925 /*
926 * When a transaction is committed,
927 * query_context->temp_cache->query is used to create md5 hash to
928 * search for query cache. So overwrite the query text in temp
929 * cache to the one with the hex of bind message. If not, md5 hash
930 * will be created by the query text without bind message, and it
931 * will happen to find cache never or to get a wrong result.
932 *
933 * However, It is possible that temp_cache does not exist.
934 * Consider following scenario: - In the previous execute cache is
935 * overflowed, and temp_cache discarded. - In the subsequent
936 * bind/execute uses the same portal
937 */
938 if (query_context->temp_cache)
939 {
940 pfree(query_context->temp_cache->query);
941 query_context->temp_cache->query = MemoryContextStrdup(session_context->memory_context, search_query);
942 }
943 }
944
945 /*
946 * If the query is SELECT from table to cache, try to fetch cached
947 * result.
948 */
949 status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
950
951 if (status != POOL_CONTINUE)
952 return status;
953
954 if (foundp)
955 {
956 #ifdef DEBUG
957 extern bool stop_now;
958 #endif
959 pool_stats_count_up_num_cache_hits();
960 query_context->skip_cache_commit = true;
961 #ifdef DEBUG
962 stop_now = true;
963 #endif
964
965 if (!SL_MODE || !pool_is_doing_extended_query_message())
966 {
967 pool_set_skip_reading_from_backends();
968 pool_stats_count_up_num_cache_hits();
969 pool_unset_query_in_progress();
970 return POOL_CONTINUE;
971 }
972 }
973 else
974 query_context->skip_cache_commit = false;
975 }
976
977 /* show ps status */
978 query_ps_status(query, backend);
979
980 session_context->query_context = query_context;
981
982 /*
983 * Calling pool_where_to_send here is dangerous because the node
984 * parse/bind has been sent could be change by pool_where_to_send() and it
985 * leads to "portal not found" etc. errors.
986 */
987
988 /* check if query is "COMMIT" or "ROLLBACK" */
989 commit = is_commit_or_rollback_query(node);
990
991 if (!SL_MODE)
992 {
993 /*
994 * Query is not commit/rollback
995 */
996 if (!commit)
997 {
998 /* Send the query to main node */
999 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, false);
1000
1001 /* Check specific errors */
1002 specific_error = check_errors(backend, MAIN_NODE_ID);
1003 if (specific_error)
1004 {
1005 /* log error message */
1006 generate_error_message("Execute: ", specific_error, contents);
1007 }
1008 }
1009
1010 if (specific_error)
1011 {
1012 char msg[1024] = "pgpool_error_portal"; /* large enough */
1013 int len = strlen(msg);
1014
1015 memset(msg + len, 0, sizeof(int));
1016
1017 /* send query to other nodes */
1018 pool_extended_send_and_wait(query_context, "E", len, msg, -1, MAIN_NODE_ID, false);
1019 }
1020 else
1021 {
1022 /*
1023 * If commit command and Snapshot Isolation mode, wait for until
1024 * snapshot prepared.
1025 */
1026 if (commit && pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1027 {
1028 si_commit_request();
1029 }
1030
1031 pool_extended_send_and_wait(query_context, "E", len, contents, -1, MAIN_NODE_ID, false);
1032 }
1033
1034 /*
1035 * send "COMMIT" or "ROLLBACK" to only main node if query is
1036 * "COMMIT" or "ROLLBACK"
1037 */
1038 if (commit)
1039 {
1040 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, false);
1041
1042 /*
1043 * If we are in the snapshot isolation mode, we need to declare
1044 * that commit has been done. This would wake up other children
1045 * waiting for acquiring snapshot.
1046 */
1047 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1048 {
1049 si_commit_done();
1050 session_context->transaction_read_only = false;
1051 }
1052 }
1053 }
1054 else /* streaming replication mode */
1055 {
1056 POOL_PENDING_MESSAGE *pmsg;
1057
1058 if (!foundp)
1059 {
1060 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, true);
1061 pool_extended_send_and_wait(query_context, "E", len, contents, -1, MAIN_NODE_ID, true);
1062 }
1063
1064 /* Add pending message */
1065 pmsg = pool_pending_message_create('E', len, contents);
1066 pool_pending_message_dest_set(pmsg, query_context);
1067 pool_pending_message_query_set(pmsg, query_context);
1068 pool_pending_message_add(pmsg);
1069 pool_pending_message_free_pending_message(pmsg);
1070
1071 /* Various take care at the transaction start */
1072 handle_query_context(backend);
1073
1074 /*
1075 * Take care of "writing transaction" flag.
1076 */
1077 if ((!is_select_query(node, query) || pool_has_function_call(node)) &&
1078 !is_start_transaction_query(node) &&
1079 !is_commit_or_rollback_query(node))
1080 {
1081 ereport(DEBUG1,
1082 (errmsg("Execute: TSTATE:%c",
1083 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
1084
1085 /*
1086 * If the query was not READ SELECT, and we are in an explicit
1087 * transaction, remember that we had a write query in this
1088 * transaction.
1089 */
1090 if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' ||
1091 pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
1092 {
1093 /*
1094 * However, if the query is "SET TRANSACTION READ ONLY" or its
1095 * variant, don't set it.
1096 */
1097 if (!pool_is_transaction_read_only(node))
1098 {
1099 pool_set_writing_transaction();
1100 }
1101 }
1102 }
1103 pool_unset_query_in_progress();
1104 }
1105
1106 return POOL_CONTINUE;
1107 }
1108
1109 /*
1110 * process Parse (V3 only)
1111 */
1112 POOL_STATUS
Parse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1113 Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1114 int len, char *contents)
1115 {
1116 int deadlock_detected = 0;
1117 int insert_stmt_with_lock = 0;
1118 char *name;
1119 char *stmt;
1120 List *parse_tree_list;
1121 Node *node = NULL;
1122 POOL_SENT_MESSAGE *msg;
1123 POOL_STATUS status;
1124 POOL_SESSION_CONTEXT *session_context;
1125 POOL_QUERY_CONTEXT *query_context;
1126
1127 bool error;
1128
1129 /* Get session context */
1130 session_context = pool_get_session_context(false);
1131
1132 /* Create query context */
1133 query_context = pool_init_query_context();
1134
1135 ereport(DEBUG1,
1136 (errmsg("Parse: statement name <%s>", contents)));
1137
1138 name = contents;
1139 stmt = contents + strlen(contents) + 1;
1140
1141 if (pool_config->log_client_messages)
1142 ereport(LOG,
1143 (errmsg("Parse message from frontend."),
1144 errdetail("statement: \"%s\", query: \"%s\"", name, stmt)));
1145
1146 /* parse SQL string */
1147 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
1148
1149 parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt),&error,!REPLICATION);
1150
1151 if (parse_tree_list == NIL)
1152 {
1153 /* is the query empty? */
1154 if (*stmt == '\0' || *stmt == ';' || error == false)
1155 {
1156 /*
1157 * JBoss sends empty queries for checking connections. We replace
1158 * the empty query with SELECT command not to affect load balance.
1159 * [Pgpool-general] Confused about JDBC and load balancing
1160 */
1161 parse_tree_list = get_dummy_read_query_tree();
1162 }
1163 else
1164 {
1165 /*
1166 * Unable to parse the query. Probably syntax error or the query
1167 * is too new and our parser cannot understand. Treat as if it
1168 * were an DELETE command. Note that the DELETE command does not
1169 * execute, instead the original query will be sent to backends,
1170 * which may or may not cause an actual syntax errors. The command
1171 * will be sent to all backends in replication mode or
1172 * primary in native replication mode.
1173 */
1174 if (!strcmp(remote_host, "[local]"))
1175 {
1176 ereport(LOG,
1177 (errmsg("Unable to parse the query: \"%s\" from local client", stmt)));
1178 }
1179 else
1180 {
1181 ereport(LOG,
1182 (errmsg("Unable to parse the query: \"%s\" from client %s(%s)", stmt, remote_host, remote_port)));
1183 }
1184 parse_tree_list = get_dummy_write_query_tree();
1185 query_context->is_parse_error = true;
1186 }
1187 }
1188 MemoryContextSwitchTo(old_context);
1189
1190 if (parse_tree_list != NIL)
1191 {
1192 /* Save last query string for logging purpose */
1193 snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
1194
1195 node = raw_parser2(parse_tree_list);
1196
1197 /*
1198 * If replication mode, check to see what kind of insert lock is
1199 * necessary.
1200 */
1201 if (REPLICATION)
1202 insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
1203
1204 /*
1205 * Start query context
1206 */
1207 pool_start_query(query_context, stmt, strlen(stmt) + 1, node);
1208
1209 /*
1210 * Create PostgreSQL version cache. Since the provided query might
1211 * cause a syntax error, we want to issue "SELECT version()" which is
1212 * called inside Pgversion() here.
1213 */
1214 Pgversion(backend);
1215
1216 msg = pool_create_sent_message('P', len, contents, 0, name, query_context);
1217
1218 session_context->uncompleted_message = msg;
1219
1220 /*
1221 * If the table is to be cached, set is_cache_safe TRUE and register
1222 * table oids.
1223 */
1224 if (pool_config->memory_cache_enabled)
1225 {
1226 bool is_likely_select = false;
1227 bool is_select_query = false;
1228 int num_oids;
1229 int *oids;
1230 int i;
1231
1232 /* Check if the query is actually SELECT */
1233 is_likely_select = pool_is_likely_select(query_context->original_query);
1234 if (is_likely_select && IsA(node, SelectStmt))
1235 {
1236 is_select_query = true;
1237 }
1238
1239 /* Switch the flag of is_cache_safe in session_context */
1240 if (is_select_query && !query_context->is_parse_error &&
1241 pool_is_allow_to_cache(query_context->parse_tree,
1242 query_context->original_query))
1243 {
1244 pool_set_cache_safe();
1245 }
1246 else
1247 {
1248 pool_unset_cache_safe();
1249 }
1250
1251 /*
1252 * If table is to be cached and the query is DML, save the table
1253 * oid
1254 */
1255 if (!is_select_query && !query_context->is_parse_error)
1256 {
1257 num_oids = pool_extract_table_oids(node, &oids);
1258
1259 if (num_oids > 0)
1260 {
1261 /* Save to oid buffer */
1262 for (i = 0; i < num_oids; i++)
1263 {
1264 pool_add_dml_table_oid(oids[i]);
1265 }
1266 }
1267 }
1268 }
1269
1270 /*
1271 * If we are in SI mode, start an internal transaction if needed.
1272 */
1273 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1274 {
1275 start_internal_transaction(frontend, backend, node);
1276 }
1277
1278 /*
1279 * Get snapshot if needed.
1280 */
1281 si_get_snapshot(frontend, backend, node,
1282 !INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID));
1283
1284 /*
1285 * Decide where to send query
1286 */
1287 pool_where_to_send(query_context, query_context->original_query,
1288 query_context->parse_tree);
1289
1290 if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
1291 pool_setall_node_to_be_sent(query_context);
1292
1293 if (REPLICATION)
1294 {
1295 char *rewrite_query;
1296 bool rewrite_to_params = true;
1297
1298 /*
1299 * rewrite `now()'. if stmt is unnamed, we rewrite `now()' to
1300 * timestamp constant. else we rewrite `now()' to params and
1301 * expand that at Bind message.
1302 */
1303 if (*name == '\0')
1304 rewrite_to_params = false;
1305 msg->num_tsparams = 0;
1306 rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg);
1307 if (rewrite_query != NULL)
1308 {
1309 int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
1310
1311 /* switch memory context */
1312 MemoryContext oldcxt = MemoryContextSwitchTo(session_context->memory_context);
1313
1314 contents = repalloc(msg->contents, alloc_len);
1315 MemoryContextSwitchTo(oldcxt);
1316
1317 strcpy(contents, name);
1318 strcpy(contents + strlen(name) + 1, rewrite_query);
1319 memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
1320 stmt + strlen(stmt) + 1,
1321 len - (strlen(name) + strlen(stmt) + 2));
1322
1323 len = alloc_len;
1324 name = contents;
1325 stmt = contents + strlen(name) + 1;
1326 ereport(DEBUG1,
1327 (errmsg("Parse: rewrite query name:\"%s\" statement:\"%s\" len=%d", name, stmt, len)));
1328
1329 msg->len = len;
1330 msg->contents = contents;
1331
1332 query_context->rewritten_query = rewrite_query;
1333 }
1334 }
1335
1336 /*
1337 * If the query is BEGIN READ WRITE in main replica mode, we send
1338 * BEGIN instead of it to standbys. original_query which is
1339 * BEGIN READ WRITE is sent to primary. rewritten_query which is BEGIN
1340 * is sent to standbys.
1341 */
1342 if (is_start_transaction_query(query_context->parse_tree) &&
1343 is_read_write((TransactionStmt *) query_context->parse_tree) &&
1344 MAIN_REPLICA)
1345 {
1346 query_context->rewritten_query = pstrdup("BEGIN");
1347 }
1348 }
1349
1350 /*
1351 * If not in streaming or logical replication mode, send "SYNC" message if not in a transaction.
1352 */
1353 if (!SL_MODE)
1354 {
1355 char kind;
1356
1357 if (TSTATE(backend, MAIN_NODE_ID) != 'T')
1358 {
1359 int i;
1360
1361 /* synchronize transaction state */
1362 for (i = 0; i < NUM_BACKENDS; i++)
1363 {
1364 if (!VALID_BACKEND(i))
1365 continue;
1366 /* send sync message */
1367 send_extended_protocol_message(backend, i, "S", 0, "");
1368 }
1369
1370 kind = pool_read_kind(backend);
1371 if (kind != 'Z')
1372 ereport(ERROR,
1373 (errmsg("unable to parse the query"),
1374 errdetail("invalid read kind")));
1375
1376 /*
1377 * SYNC message returns "Ready for Query" message.
1378 */
1379 if (ReadyForQuery(frontend, backend, 0, false) != POOL_CONTINUE)
1380 {
1381 pool_query_context_destroy(query_context);
1382 return POOL_END;
1383 }
1384
1385 /*
1386 * set in_progress flag, because ReadyForQuery unset it.
1387 * in_progress flag influences VALID_BACKEND.
1388 */
1389 if (!pool_is_query_in_progress())
1390 pool_set_query_in_progress();
1391 }
1392
1393 if (is_strict_query(query_context->parse_tree))
1394 {
1395 start_internal_transaction(frontend, backend, query_context->parse_tree);
1396 allow_close_transaction = 1;
1397 }
1398
1399 if (insert_stmt_with_lock)
1400 {
1401 /* start a transaction if needed and lock the table */
1402 status = insert_lock(frontend, backend, stmt, (InsertStmt *) query_context->parse_tree, insert_stmt_with_lock);
1403 if (status != POOL_CONTINUE)
1404 ereport(ERROR,
1405 (errmsg("unable to parse the query"),
1406 errdetail("unable to get insert lock")));
1407
1408 }
1409 }
1410
1411 if (REPLICATION || SLONY)
1412 {
1413 /*
1414 * We must synchronize because Parse message acquires table locks.
1415 */
1416 ereport(DEBUG1,
1417 (errmsg("Parse: waiting for main node completing the query")));
1418 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, false);
1419
1420
1421 /*
1422 * We must check deadlock error because a aborted transaction by
1423 * detecting deadlock isn't same on all nodes. If a transaction is
1424 * aborted on main node, pgpool send a error query to another nodes.
1425 */
1426 deadlock_detected = detect_deadlock_error(MAIN(backend), MAJOR(backend));
1427
1428 /*
1429 * Check if other than deadlock error detected. If so, emit log. This
1430 * is useful when invalid encoding error occurs. In this case,
1431 * PostgreSQL does not report what statement caused that error and
1432 * make users confused.
1433 */
1434 per_node_error_log(backend, MAIN_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
1435
1436 if (deadlock_detected)
1437 {
1438 POOL_QUERY_CONTEXT *error_qc;
1439
1440 error_qc = pool_init_query_context();
1441
1442
1443 pool_start_query(error_qc, POOL_ERROR_QUERY, strlen(POOL_ERROR_QUERY) + 1, node);
1444 pool_copy_prep_where(query_context->where_to_send, error_qc->where_to_send);
1445
1446 ereport(LOG,
1447 (errmsg("Parse: received deadlock error message from main node")));
1448
1449 pool_send_and_wait(error_qc, -1, MAIN_NODE_ID);
1450
1451 pool_query_context_destroy(error_qc);
1452
1453
1454 pool_set_query_in_progress();
1455 session_context->query_context = query_context;
1456 }
1457 else
1458 {
1459 pool_extended_send_and_wait(query_context, "P", len, contents, -1, MAIN_NODE_ID, false);
1460 }
1461 }
1462 else if (SL_MODE)
1463 {
1464 POOL_PENDING_MESSAGE *pmsg;
1465
1466 /*
1467 * XXX fix me:even with streaming replication mode, couldn't we have a
1468 * deadlock
1469 */
1470 pool_set_query_in_progress();
1471 #ifdef NOT_USED
1472 pool_clear_sync_map();
1473 #endif
1474 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, true);
1475 pool_extended_send_and_wait(query_context, "P", len, contents, -1, MAIN_NODE_ID, true);
1476 pool_add_sent_message(session_context->uncompleted_message);
1477
1478 /* Add pending message */
1479 pmsg = pool_pending_message_create('P', len, contents);
1480 pool_pending_message_dest_set(pmsg, query_context);
1481 pool_pending_message_add(pmsg);
1482 pool_pending_message_free_pending_message(pmsg);
1483
1484 pool_unset_query_in_progress();
1485 }
1486 else
1487 {
1488 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, false);
1489 }
1490
1491 return POOL_CONTINUE;
1492
1493 }
1494
1495 POOL_STATUS
Bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1496 Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1497 int len, char *contents)
1498 {
1499 char *pstmt_name;
1500 char *portal_name;
1501 char *rewrite_msg = NULL;
1502 POOL_SENT_MESSAGE *parse_msg;
1503 POOL_SENT_MESSAGE *bind_msg;
1504 POOL_SESSION_CONTEXT *session_context;
1505 POOL_QUERY_CONTEXT *query_context;
1506 int insert_stmt_with_lock = 0;
1507 bool nowait;
1508
1509 /* Get session context */
1510 session_context = pool_get_session_context(false);
1511
1512 /*
1513 * Rewrite message
1514 */
1515 portal_name = contents;
1516 pstmt_name = contents + strlen(portal_name) + 1;
1517
1518 if (pool_config->log_client_messages)
1519 ereport(LOG,
1520 (errmsg("Bind message from frontend."),
1521 errdetail("portal: \"%s\", statement: \"%s\"", portal_name, pstmt_name)));
1522 parse_msg = pool_get_sent_message('Q', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1523 if (!parse_msg)
1524 parse_msg = pool_get_sent_message('P', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1525 if (!parse_msg)
1526 {
1527 ereport(FATAL,
1528 (errmsg("unable to bind"),
1529 errdetail("cannot get parse message \"%s\"", pstmt_name)));
1530 }
1531
1532 bind_msg = pool_create_sent_message('B', len, contents,
1533 parse_msg->num_tsparams, portal_name,
1534 parse_msg->query_context);
1535
1536 query_context = parse_msg->query_context;
1537 if (!query_context)
1538 {
1539 ereport(FATAL,
1540 (errmsg("unable to bind"),
1541 errdetail("cannot get the query context")));
1542 }
1543
1544 /*
1545 * If the query can be cached, save its offset of query text in bind
1546 * message's content.
1547 */
1548 if (query_context->is_cache_safe)
1549 {
1550 bind_msg->param_offset = sizeof(char) * (strlen(portal_name) + strlen(pstmt_name) + 2);
1551 }
1552
1553 session_context->uncompleted_message = bind_msg;
1554
1555 /* rewrite bind message */
1556 if (REPLICATION && bind_msg->num_tsparams > 0)
1557 {
1558 rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len);
1559 if (rewrite_msg != NULL)
1560 contents = rewrite_msg;
1561 }
1562
1563 session_context->query_context = query_context;
1564
1565 /*
1566 * Take care the case when the previous parse message has been sent to
1567 * other than primary node. In this case, we send a parse message to the
1568 * primary node.
1569 */
1570 if (pool_config->load_balance_mode && pool_is_writing_transaction() &&
1571 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' &&
1572 pool_config->disable_load_balance_on_write != DLBOW_OFF)
1573 {
1574 if (!SL_MODE)
1575 {
1576 pool_where_to_send(query_context, query_context->original_query,
1577 query_context->parse_tree);
1578 }
1579
1580 if (parse_before_bind(frontend, backend, parse_msg, bind_msg) != POOL_CONTINUE)
1581 return POOL_END;
1582 }
1583
1584 if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
1585 TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
1586 {
1587 pool_where_to_send(query_context, query_context->original_query,
1588 query_context->parse_tree);
1589 }
1590
1591 /*
1592 * Start a transaction if necessary in replication mode
1593 */
1594 if (REPLICATION)
1595 {
1596 ereport(DEBUG1,
1597 (errmsg("Bind: checking strict query")));
1598 if (is_strict_query(query_context->parse_tree))
1599 {
1600 ereport(DEBUG1,
1601 (errmsg("Bind: strict query")));
1602 start_internal_transaction(frontend, backend, query_context->parse_tree);
1603 allow_close_transaction = 1;
1604 }
1605
1606 ereport(DEBUG1,
1607 (errmsg("Bind: checking insert lock")));
1608 insert_stmt_with_lock = need_insert_lock(backend, query_context->original_query, query_context->parse_tree);
1609 if (insert_stmt_with_lock)
1610 {
1611 ereport(DEBUG1,
1612 (errmsg("Bind: issuing insert lock")));
1613 /* issue a LOCK command to keep consistency */
1614 if (insert_lock(frontend, backend, query_context->original_query, (InsertStmt *) query_context->parse_tree, insert_stmt_with_lock) != POOL_CONTINUE)
1615 {
1616 pool_query_context_destroy(query_context);
1617 return POOL_END;
1618 }
1619 }
1620 }
1621
1622 ereport(DEBUG1,
1623 (errmsg("Bind: waiting for main node completing the query")));
1624
1625 pool_set_query_in_progress();
1626
1627 if (SL_MODE)
1628 {
1629 nowait = true;
1630 session_context->query_context = query_context = bind_msg->query_context;
1631 }
1632 else
1633 nowait = false;
1634
1635 pool_extended_send_and_wait(query_context, "B", len, contents, 1, MAIN_NODE_ID, nowait);
1636 pool_extended_send_and_wait(query_context, "B", len, contents, -1, MAIN_NODE_ID, nowait);
1637
1638 if (SL_MODE)
1639 {
1640 POOL_PENDING_MESSAGE *pmsg;
1641
1642 pool_unset_query_in_progress();
1643 pool_add_sent_message(session_context->uncompleted_message);
1644
1645 /* Add pending message */
1646 pmsg = pool_pending_message_create('B', len, contents);
1647 pool_pending_message_dest_set(pmsg, query_context);
1648 pool_pending_message_query_set(pmsg, query_context);
1649 pool_pending_message_add(pmsg);
1650 pool_pending_message_free_pending_message(pmsg);
1651 }
1652
1653 if (rewrite_msg)
1654 pfree(rewrite_msg);
1655 return POOL_CONTINUE;
1656 }
1657
1658 POOL_STATUS
Describe(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1659 Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1660 int len, char *contents)
1661 {
1662 POOL_SENT_MESSAGE *msg;
1663 POOL_SESSION_CONTEXT *session_context;
1664 POOL_QUERY_CONTEXT *query_context;
1665
1666 bool nowait;
1667
1668 /* Get session context */
1669 session_context = pool_get_session_context(false);
1670
1671 /* Prepared Statement */
1672 if (*contents == 'S')
1673 {
1674 if (pool_config->log_client_messages)
1675 ereport(LOG,
1676 (errmsg("Describe message from frontend."),
1677 errdetail("statement: \"%s\"", contents + 1)));
1678 msg = pool_get_sent_message('Q', contents + 1, POOL_SENT_MESSAGE_CREATED);
1679 if (!msg)
1680 msg = pool_get_sent_message('P', contents + 1, POOL_SENT_MESSAGE_CREATED);
1681 if (!msg)
1682 ereport(FATAL,
1683 (return_code(2),
1684 errmsg("unable to execute Describe"),
1685 errdetail("unable to get the parse message")));
1686 }
1687 /* Portal */
1688 else
1689 {
1690 if (pool_config->log_client_messages)
1691 ereport(LOG,
1692 (errmsg("Describe message from frontend."),
1693 errdetail("portal: \"%s\"", contents + 1)));
1694 msg = pool_get_sent_message('B', contents + 1, POOL_SENT_MESSAGE_CREATED);
1695 if (!msg)
1696 ereport(FATAL,
1697 (return_code(2),
1698 errmsg("unable to execute Describe"),
1699 errdetail("unable to get the bind message")));
1700 }
1701
1702 query_context = msg->query_context;
1703
1704 if (query_context == NULL)
1705 ereport(FATAL,
1706 (return_code(2),
1707 errmsg("unable to execute Describe"),
1708 errdetail("unable to get the query context")));
1709
1710 session_context->query_context = query_context;
1711
1712 /*
1713 * Calling pool_where_to_send here is dangerous because the node
1714 * parse/bind has been sent could be change by pool_where_to_send() and it
1715 * leads to "portal not found" etc. errors.
1716 */
1717 ereport(DEBUG1,
1718 (errmsg("Describe: waiting for main node completing the query")));
1719
1720 nowait = SL_MODE;
1721
1722 pool_set_query_in_progress();
1723 pool_extended_send_and_wait(query_context, "D", len, contents, 1, MAIN_NODE_ID, nowait);
1724 pool_extended_send_and_wait(query_context, "D", len, contents, -1, MAIN_NODE_ID, nowait);
1725
1726 if (SL_MODE)
1727 {
1728 POOL_PENDING_MESSAGE *pmsg;
1729
1730 /* Add pending message */
1731 pmsg = pool_pending_message_create('D', len, contents);
1732 pool_pending_message_dest_set(pmsg, query_context);
1733 pool_pending_message_query_set(pmsg, query_context);
1734 pool_pending_message_add(pmsg);
1735 pool_pending_message_free_pending_message(pmsg);
1736
1737 pool_unset_query_in_progress();
1738 }
1739
1740 return POOL_CONTINUE;
1741 }
1742
1743
1744 POOL_STATUS
Close(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1745 Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1746 int len, char *contents)
1747 {
1748 POOL_SENT_MESSAGE *msg;
1749 POOL_SESSION_CONTEXT *session_context;
1750 POOL_QUERY_CONTEXT *query_context;
1751
1752 /* Get session context */
1753 session_context = pool_get_session_context(false);
1754
1755 /* Prepared Statement */
1756 if (*contents == 'S')
1757 {
1758 msg = pool_get_sent_message('Q', contents + 1, POOL_SENT_MESSAGE_CREATED);
1759 if (pool_config->log_client_messages)
1760 ereport(LOG,
1761 (errmsg("Close message from frontend."),
1762 errdetail("statement: \"%s\"", contents + 1)));
1763 if (!msg)
1764 msg = pool_get_sent_message('P', contents + 1, POOL_SENT_MESSAGE_CREATED);
1765 }
1766 /* Portal */
1767 else if (*contents == 'P')
1768 {
1769 if (pool_config->log_client_messages)
1770 ereport(LOG,
1771 (errmsg("Close message from frontend."),
1772 errdetail("portal: \"%s\"", contents + 1)));
1773 msg = pool_get_sent_message('B', contents + 1, POOL_SENT_MESSAGE_CREATED);
1774 }
1775 else
1776 ereport(FATAL,
1777 (return_code(2),
1778 errmsg("unable to execute close, invalid message")));
1779
1780 /*
1781 * As per the postgresql, calling close on non existing portals or
1782 * statements is not an error. So on the same footings we will ignore all
1783 * such calls and return the close complete message to clients with out
1784 * going to backend
1785 */
1786 if (!msg)
1787 {
1788 int len = htonl(4);
1789
1790 pool_set_command_success();
1791 pool_unset_query_in_progress();
1792
1793 pool_write(frontend, "3", 1);
1794 pool_write_and_flush(frontend, &len, sizeof(len));
1795
1796 return POOL_CONTINUE;
1797 }
1798
1799 session_context->uncompleted_message = msg;
1800 query_context = msg->query_context;
1801
1802 if (!query_context)
1803 ereport(FATAL,
1804 (return_code(2),
1805 errmsg("unable to execute close"),
1806 errdetail("unable to get the query context")));
1807
1808 session_context->query_context = query_context;
1809
1810 /*
1811 * pool_where_to_send(query_context, query_context->original_query,
1812 * query_context->parse_tree);
1813 */
1814
1815 ereport(DEBUG1,
1816 (errmsg("Close: waiting for main node completing the query")));
1817
1818 pool_set_query_in_progress();
1819
1820 if (!SL_MODE)
1821 {
1822 pool_extended_send_and_wait(query_context, "C", len, contents, 1, MAIN_NODE_ID, false);
1823 pool_extended_send_and_wait(query_context, "C", len, contents, -1, MAIN_NODE_ID, false);
1824 }
1825 else
1826 {
1827 POOL_PENDING_MESSAGE *pmsg;
1828 bool where_to_send_save[MAX_NUM_BACKENDS];
1829
1830 /*
1831 * Parse_before_bind() may have sent a bind message to the primary
1832 * node id. So send the close message to the primary node as well.
1833 * Even if not, sending a close message for non existing
1834 * statement/portal is harmless. No error will happen.
1835 */
1836 if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1837 {
1838 /* save where_to_send map */
1839 memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
1840
1841 query_context->where_to_send[PRIMARY_NODE_ID] = true;
1842 query_context->where_to_send[session_context->load_balance_node_id] = true;
1843 }
1844
1845 pool_extended_send_and_wait(query_context, "C", len, contents, 1, MAIN_NODE_ID, true);
1846 pool_extended_send_and_wait(query_context, "C", len, contents, -1, MAIN_NODE_ID, true);
1847
1848 /* Add pending message */
1849 pmsg = pool_pending_message_create('C', len, contents);
1850 pool_pending_message_dest_set(pmsg, query_context);
1851 pool_pending_message_query_set(pmsg, query_context);
1852 pool_pending_message_add(pmsg);
1853 pool_pending_message_free_pending_message(pmsg);
1854
1855 if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1856 {
1857 /* Restore where_to_send map */
1858 memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));
1859 }
1860
1861 #ifdef NOT_USED
1862 dump_pending_message();
1863 #endif
1864 pool_unset_query_in_progress();
1865
1866 /*
1867 * Remove sent message
1868 */
1869 ereport(DEBUG1,
1870 (errmsg("Close: removing sent message %c %s", *contents, contents + 1)));
1871 pool_set_sent_message_state(msg);
1872 }
1873
1874 return POOL_CONTINUE;
1875 }
1876
1877
1878 POOL_STATUS
FunctionCall3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1879 FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1880 int len, char *contents)
1881 {
1882 /*
1883 * If Function call message for lo_creat, rewrite it
1884 */
1885 char *rewrite_lo;
1886 int rewrite_len;
1887
1888 rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
1889 backend, &rewrite_len);
1890
1891 if (rewrite_lo != NULL)
1892 {
1893 contents = rewrite_lo;
1894 len = rewrite_len;
1895 }
1896 return SimpleForwardToBackend('F', frontend, backend, len, contents);
1897 }
1898
1899 /*
1900 * Process ReadyForQuery('Z') message.
1901 * If send_ready is true, send 'Z' message to frontend.
1902 * If cache_commit is true, commit or discard query cache according to
1903 * transaction state.
1904 *
1905 * - if the error status "mismatch_ntuples" is set, send an error query
1906 * to all DB nodes to abort transaction or do failover.
1907 * - internal transaction is closed
1908 */
1909 POOL_STATUS
ReadyForQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool send_ready,bool cache_commit)1910 ReadyForQuery(POOL_CONNECTION * frontend,
1911 POOL_CONNECTION_POOL * backend, bool send_ready, bool cache_commit)
1912 {
1913 int i;
1914 int len;
1915 signed char kind;
1916 signed char state = 0;
1917 POOL_SESSION_CONTEXT *session_context;
1918 Node *node = NULL;
1919 char *query = NULL;
1920 bool got_estate = false;
1921
1922 /*
1923 * It is possible that the "ignore until sync is received" flag was set if
1924 * we send sync to backend and the backend returns error. Let's reset the
1925 * flag unconditionally because we apparently have received a "ready for
1926 * query" message from backend.
1927 */
1928 pool_unset_ignore_till_sync();
1929
1930 /* Reset previous message */
1931 pool_pending_message_reset_previous_message();
1932
1933 /* Get session context */
1934 session_context = pool_get_session_context(false);
1935
1936 /*
1937 * If the numbers of update tuples are differ and
1938 * failover_if_affected_tuples_mismatch is false, we abort transactions by
1939 * using do_error_command. If failover_if_affected_tuples_mismatch is
1940 * true, trigger failover. This only works with PROTO_MAJOR_V3.
1941 */
1942 if (session_context->mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1943 {
1944 int i;
1945 char kind;
1946
1947 /*
1948 * If failover_if_affected_tuples_mismatch, is true, then decide
1949 * victim nodes by using find_victim_nodes and degenerate them.
1950 */
1951 if (pool_config->failover_if_affected_tuples_mismatch)
1952 {
1953 int *victim_nodes;
1954 int number_of_nodes;
1955 char msgbuf[128];
1956
1957 victim_nodes = find_victim_nodes(session_context->ntuples, NUM_BACKENDS,
1958 MAIN_NODE_ID, &number_of_nodes);
1959 if (victim_nodes)
1960 {
1961 int i;
1962 String *msg;
1963
1964 msg = init_string("ReadyForQuery: Degenerate backends:");
1965
1966 for (i = 0; i < number_of_nodes; i++)
1967 {
1968 snprintf(msgbuf, sizeof(msgbuf), " %d", victim_nodes[i]);
1969 string_append_char(msg, msgbuf);
1970 }
1971 ereport(LOG,
1972 (errmsg("processing ready for query message"),
1973 errdetail("%s", msg->data)));
1974
1975 free_string(msg);
1976
1977 msg = init_string("ReadyForQuery: Number of affected tuples are:");
1978
1979 for (i = 0; i < NUM_BACKENDS; i++)
1980 {
1981 snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
1982 string_append_char(msg, msgbuf);
1983 }
1984 ereport(LOG,
1985 (errmsg("processing ready for query message"),
1986 errdetail("%s", msg->data)));
1987
1988 free_string(msg);
1989
1990 degenerate_backend_set(victim_nodes, number_of_nodes, REQ_DETAIL_CONFIRMED | REQ_DETAIL_SWITCHOVER);
1991 child_exit(POOL_EXIT_AND_RESTART);
1992 }
1993 else
1994 {
1995 ereport(LOG,
1996 (errmsg("processing ready for query message"),
1997 errdetail("find_victim_nodes returned no victim node")));
1998 }
1999 }
2000
2001 /*
2002 * XXX: discard rest of ReadyForQuery packet
2003 */
2004
2005 if (pool_read_message_length(backend) < 0)
2006 return POOL_END;
2007
2008 state = pool_read_kind(backend);
2009 if (state < 0)
2010 return POOL_END;
2011
2012 for (i = 0; i < NUM_BACKENDS; i++)
2013 {
2014 if (VALID_BACKEND(i))
2015 {
2016 /* abort transaction on all nodes. */
2017 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
2018 }
2019 }
2020
2021 /* loop through until we get ReadyForQuery */
2022 for (;;)
2023 {
2024 kind = pool_read_kind(backend);
2025 if (kind < 0)
2026 return POOL_END;
2027
2028 if (kind == 'Z')
2029 break;
2030
2031
2032 /* put the message back to read buffer */
2033 for (i = 0; i < NUM_BACKENDS; i++)
2034 {
2035 if (VALID_BACKEND(i))
2036 {
2037 pool_unread(CONNECTION(backend, i), &kind, 1);
2038 }
2039 }
2040
2041 /* discard rest of the packet */
2042 if (pool_discard_packet(backend) != POOL_CONTINUE)
2043 return POOL_END;
2044 }
2045 session_context->mismatch_ntuples = false;
2046 }
2047
2048 /*
2049 * if a transaction is started for insert lock, we need to close the
2050 * transaction.
2051 */
2052 /* if (pool_is_query_in_progress() && allow_close_transaction) */
2053 if (allow_close_transaction)
2054 {
2055 bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID);
2056
2057 /*
2058 * If we are running in snapshot isolation mode and started an
2059 * internal transaction, wait until snapshot is prepared.
2060 */
2061 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
2062 internal_transaction_started)
2063 {
2064 si_commit_request();
2065 }
2066
2067 /* close an internal transaction */
2068 if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
2069 return POOL_END;
2070
2071 /*
2072 * If we are running in snapshot isolation mode and started an
2073 * internal transaction, notice that commit is done.
2074 */
2075 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
2076 internal_transaction_started)
2077 {
2078 si_commit_done();
2079 /* reset transaction readonly flag */
2080 session_context->transaction_read_only = false;
2081 }
2082 }
2083
2084 if (MAJOR(backend) == PROTO_MAJOR_V3)
2085 {
2086 if ((len = pool_read_message_length(backend)) < 0)
2087 return POOL_END;
2088
2089 /*
2090 * Set transaction state for each node
2091 */
2092 state = TSTATE(backend,
2093 MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID);
2094
2095 for (i = 0; i < NUM_BACKENDS; i++)
2096 {
2097 if (!VALID_BACKEND(i))
2098 continue;
2099
2100 if (pool_read(CONNECTION(backend, i), &kind, sizeof(kind)))
2101 return POOL_END;
2102
2103 TSTATE(backend, i) = kind;
2104 ereport(DEBUG5,
2105 (errmsg("processing ReadyForQuery"),
2106 errdetail("transaction state '%c'(%02x)", state, state)));
2107
2108 /*
2109 * The transaction state to be returned to frontend is main node's.
2110 */
2111 if (i == (MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))
2112 {
2113 state = kind;
2114 }
2115
2116 /*
2117 * However, if the state is 'E', then frontend should had been
2118 * already reported an ERROR. So, to match with that, let be the
2119 * state to be returned to frontend.
2120 */
2121 if (kind == 'E')
2122 got_estate = true;
2123 }
2124 }
2125
2126 /*
2127 * Make sure that no message remains in the backend buffer. If something
2128 * remains, it could be an "out of band" ERROR or FATAL error, or a NOTICE
2129 * message, which was generated by backend itself for some reasons like
2130 * recovery conflict or SIGTERM received. If so, let's consume it and emit
2131 * a log message so that next read_kind_from_backend() will not hang in
2132 * trying to read from backend which may have not produced such a message.
2133 */
2134 if (pool_is_query_in_progress())
2135 {
2136 for (i = 0; i < NUM_BACKENDS; i++)
2137 {
2138 if (!VALID_BACKEND(i))
2139 continue;
2140 if (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
2141 per_node_error_log(backend, i,
2142 "(out of band message)",
2143 "ReadyForQuery: Error or notice message from backend: ", false);
2144 }
2145 }
2146
2147 if (send_ready)
2148 {
2149 pool_write(frontend, "Z", 1);
2150
2151 if (MAJOR(backend) == PROTO_MAJOR_V3)
2152 {
2153 len = htonl(len);
2154 pool_write(frontend, &len, sizeof(len));
2155 if (got_estate)
2156 state = 'E';
2157 pool_write(frontend, &state, 1);
2158 }
2159 pool_flush(frontend);
2160 }
2161
2162 if (pool_is_query_in_progress())
2163 {
2164 node = pool_get_parse_tree();
2165 query = pool_get_query_string();
2166
2167 if (pool_is_command_success())
2168 {
2169 if (node)
2170 pool_at_command_success(frontend, backend);
2171
2172 /* Memory cache enabled? */
2173 if (cache_commit && pool_config->memory_cache_enabled)
2174 {
2175
2176 /*
2177 * If we are doing extended query and the state is after
2178 * EXECUTE, then we can commit cache. We check latter
2179 * condition by looking at query_context->query_w_hex. This
2180 * check is necessary for certain frame work such as PHP PDO.
2181 * It sends Sync message right after PARSE and it produces
2182 * "Ready for query" message from backend.
2183 */
2184 if (pool_is_doing_extended_query_message())
2185 {
2186 if (session_context->query_context &&
2187 session_context->query_context->query_state[MAIN_NODE_ID] == POOL_EXECUTE_COMPLETE)
2188 {
2189 pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
2190 if (session_context->query_context->query_w_hex)
2191 pfree(session_context->query_context->query_w_hex);
2192 session_context->query_context->query_w_hex = NULL;
2193 }
2194 }
2195 else
2196 {
2197 if (MAJOR(backend) != PROTO_MAJOR_V3)
2198 {
2199 state = 'I'; /* XXX I don't think query cache works
2200 * with PROTO2 protocol */
2201 }
2202 pool_handle_query_cache(backend, query, node, state);
2203 }
2204 }
2205 }
2206
2207 /*
2208 * If PREPARE or extended query protocol commands caused error, remove
2209 * the temporary saved message. (except when ReadyForQuery() is called
2210 * during Parse() of extended queries)
2211 */
2212 else
2213 {
2214 if ((pool_is_doing_extended_query_message() &&
2215 session_context->query_context &&
2216 session_context->query_context->query_state[MAIN_NODE_ID] != POOL_UNPARSED &&
2217 session_context->uncompleted_message) ||
2218 (!pool_is_doing_extended_query_message() && session_context->uncompleted_message &&
2219 session_context->uncompleted_message->kind != 0))
2220 {
2221 pool_add_sent_message(session_context->uncompleted_message);
2222 pool_remove_sent_message(session_context->uncompleted_message->kind,
2223 session_context->uncompleted_message->name);
2224 session_context->uncompleted_message = NULL;
2225 }
2226 }
2227
2228 pool_unset_query_in_progress();
2229 }
2230 if (!pool_is_doing_extended_query_message())
2231 {
2232 if (!(node && IsA(node, PrepareStmt)))
2233 {
2234 pool_query_context_destroy(pool_get_session_context(false)->query_context);
2235 }
2236 }
2237
2238 /*
2239 * Show ps idle status
2240 */
2241 pool_ps_idle_display(backend);
2242
2243 return POOL_CONTINUE;
2244 }
2245
2246 /*
2247 * Close running transactions on standbys.
2248 */
close_standby_transactions(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2249 static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
2250 POOL_CONNECTION_POOL * backend)
2251 {
2252 int i;
2253
2254 for (i = 0; i < NUM_BACKENDS; i++)
2255 {
2256 if (CONNECTION_SLOT(backend, i) &&
2257 TSTATE(backend, i) == 'T' &&
2258 BACKEND_INFO(i).backend_status == CON_UP &&
2259 (MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != i)
2260 {
2261 per_node_statement_log(backend, i, "COMMIT");
2262 if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend),
2263 MAIN_CONNECTION(backend)->pid,
2264 MAIN_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
2265 ereport(ERROR,
2266 (errmsg("unable to close standby transactions"),
2267 errdetail("do_command returned DEADLOCK status")));
2268 }
2269 }
2270 return POOL_CONTINUE;
2271 }
2272
2273 POOL_STATUS
ParseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2274 ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2275 {
2276 POOL_SESSION_CONTEXT *session_context;
2277
2278 /* Get session context */
2279 session_context = pool_get_session_context(false);
2280
2281 if (!SL_MODE && session_context->uncompleted_message)
2282 {
2283 POOL_QUERY_CONTEXT *qc;
2284
2285 pool_add_sent_message(session_context->uncompleted_message);
2286
2287 qc = session_context->uncompleted_message->query_context;
2288 if (qc)
2289 pool_set_query_state(qc, POOL_PARSE_COMPLETE);
2290
2291 session_context->uncompleted_message = NULL;
2292 }
2293
2294 return SimpleForwardToFrontend('1', frontend, backend);
2295 }
2296
2297 POOL_STATUS
BindComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2298 BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2299 {
2300 POOL_SESSION_CONTEXT *session_context;
2301
2302 /* Get session context */
2303 session_context = pool_get_session_context(false);
2304
2305 if (!SL_MODE && session_context->uncompleted_message)
2306 {
2307 POOL_QUERY_CONTEXT *qc;
2308
2309 pool_add_sent_message(session_context->uncompleted_message);
2310
2311 qc = session_context->uncompleted_message->query_context;
2312 if (qc)
2313 pool_set_query_state(qc, POOL_BIND_COMPLETE);
2314
2315 session_context->uncompleted_message = NULL;
2316 }
2317
2318 return SimpleForwardToFrontend('2', frontend, backend);
2319 }
2320
2321 POOL_STATUS
CloseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2322 CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2323 {
2324 POOL_SESSION_CONTEXT *session_context;
2325 POOL_STATUS status;
2326 char kind = ' ';
2327 char *name = "";
2328
2329 /* Get session context */
2330 session_context = pool_get_session_context(false);
2331
2332 /* Send CloseComplete(3) to frontend before removing the target message */
2333 status = SimpleForwardToFrontend('3', frontend, backend);
2334
2335 /* Remove the target message */
2336 if (SL_MODE)
2337 {
2338 POOL_PENDING_MESSAGE *pmsg;
2339
2340 pmsg = pool_pending_message_pull_out();
2341
2342 if (pmsg)
2343 {
2344 /* Sanity check */
2345 if (pmsg->type != POOL_CLOSE)
2346 {
2347 ereport(LOG,
2348 (errmsg("CloseComplete: pending message was not Close request: %s", pool_pending_message_type_to_string(pmsg->type))));
2349 }
2350 else
2351 {
2352 kind = pool_get_close_message_spec(pmsg);
2353 name = pool_get_close_message_name(pmsg);
2354 kind = kind == 'S' ? 'P' : 'B';
2355 }
2356 pool_pending_message_free_pending_message(pmsg);
2357 }
2358 }
2359 else
2360 {
2361 if (session_context->uncompleted_message)
2362 {
2363 kind = session_context->uncompleted_message->kind;
2364 name = session_context->uncompleted_message->name;
2365 session_context->uncompleted_message = NULL;
2366 }
2367 else
2368 {
2369 ereport(ERROR,
2370 (errmsg("processing CloseComplete, uncompleted message not found")));
2371 }
2372 }
2373
2374 if (kind != ' ')
2375 {
2376 pool_remove_sent_message(kind, name);
2377 ereport(DEBUG1,
2378 (errmsg("CloseComplete: remove sent message. kind:%c, name:%s",
2379 kind, name)));
2380 if (pool_config->memory_cache_enabled)
2381 {
2382 /*
2383 * Discard current temp query cache. This is necessary for the
2384 * case of clustering mode is not either streaming or logical
2385 * replication. Because in the cases the cache has been already
2386 * discarded upon receiving CommandComplete.
2387 */
2388 pool_discard_current_temp_query_cache();
2389 }
2390 }
2391
2392 return status;
2393 }
2394
2395 POOL_STATUS
ParameterDescription(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2396 ParameterDescription(POOL_CONNECTION * frontend,
2397 POOL_CONNECTION_POOL * backend)
2398 {
2399 int len,
2400 len1 = 0;
2401 char *p = NULL;
2402 char *p1 = NULL;
2403 int sendlen;
2404 int i;
2405
2406 POOL_SESSION_CONTEXT *session_context;
2407 int num_params,
2408 send_num_params,
2409 num_dmy;
2410 char kind = 't';
2411
2412 session_context = pool_get_session_context(false);
2413
2414 /* only in replication mode and rewritten query */
2415 if (!REPLICATION || !session_context->query_context->rewritten_query)
2416 return SimpleForwardToFrontend('t', frontend, backend);
2417
2418 /* get number of parameters in original query */
2419 num_params = session_context->query_context->num_original_params;
2420
2421 pool_read(MAIN(backend), &len, sizeof(len));
2422
2423 len = ntohl(len);
2424 len -= sizeof(int32);
2425 len1 = len;
2426
2427 /* number of parameters in rewritten query is just discarded */
2428 pool_read(MAIN(backend), &num_dmy, sizeof(int16));
2429 len -= sizeof(int16);
2430
2431 p = pool_read2(MAIN(backend), len);
2432 if (p == NULL)
2433 ereport(ERROR,
2434 (errmsg("ParameterDescription. connection error"),
2435 errdetail("read from backend failed")));
2436
2437
2438 p1 = palloc(len);
2439 memcpy(p1, p, len);
2440
2441 for (i = 0; i < NUM_BACKENDS; i++)
2442 {
2443 if (VALID_BACKEND(i) && !IS_MAIN_NODE_ID(i))
2444 {
2445 pool_read(CONNECTION(backend, i), &len, sizeof(len));
2446
2447 len = ntohl(len);
2448 len -= sizeof(int32);
2449
2450 p = pool_read2(CONNECTION(backend, i), len);
2451 if (p == NULL)
2452 ereport(ERROR,
2453 (errmsg("ParameterDescription. connection error"),
2454 errdetail("read from backend no %d failed", i)));
2455
2456 if (len != len1)
2457 ereport(DEBUG1,
2458 (errmsg("ParameterDescription. backends does not match"),
2459 errdetail("length does not match between backends main(%d) %d th backend(%d) kind:(%c)", len, i, len1, kind)));
2460 }
2461 }
2462
2463 pool_write(frontend, &kind, 1);
2464
2465 /* send back OIDs of parameters in original query and left are discarded */
2466 len = sizeof(int16) + num_params * sizeof(int32);
2467 sendlen = htonl(len + sizeof(int32));
2468 pool_write(frontend, &sendlen, sizeof(int32));
2469
2470 send_num_params = htons(num_params);
2471 pool_write(frontend, &send_num_params, sizeof(int16));
2472
2473 pool_write_and_flush(frontend, p1, num_params * sizeof(int32));
2474
2475 pfree(p1);
2476 return POOL_CONTINUE;
2477 }
2478
2479 POOL_STATUS
ErrorResponse3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2480 ErrorResponse3(POOL_CONNECTION * frontend,
2481 POOL_CONNECTION_POOL * backend)
2482 {
2483 POOL_STATUS ret;
2484
2485 ret = SimpleForwardToFrontend('E', frontend, backend);
2486 if (ret != POOL_CONTINUE)
2487 return ret;
2488
2489 if (!SL_MODE)
2490 raise_intentional_error_if_need(backend);
2491
2492 return POOL_CONTINUE;
2493 }
2494
2495 POOL_STATUS
FunctionCall(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2496 FunctionCall(POOL_CONNECTION * frontend,
2497 POOL_CONNECTION_POOL * backend)
2498 {
2499 char dummy[2];
2500 int oid;
2501 int argn;
2502 int i;
2503
2504 for (i = 0; i < NUM_BACKENDS; i++)
2505 {
2506 if (VALID_BACKEND(i))
2507 {
2508 pool_write(CONNECTION(backend, i), "F", 1);
2509 }
2510 }
2511
2512 /* dummy */
2513 pool_read(frontend, dummy, sizeof(dummy));
2514
2515 for (i = 0; i < NUM_BACKENDS; i++)
2516 {
2517 if (VALID_BACKEND(i))
2518 {
2519 pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
2520 }
2521 }
2522
2523 /* function object id */
2524 pool_read(frontend, &oid, sizeof(oid));
2525
2526 for (i = 0; i < NUM_BACKENDS; i++)
2527 {
2528 if (VALID_BACKEND(i))
2529 {
2530 pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
2531 }
2532 }
2533
2534 /* number of arguments */
2535 pool_read(frontend, &argn, sizeof(argn));
2536
2537 for (i = 0; i < NUM_BACKENDS; i++)
2538 {
2539 if (VALID_BACKEND(i))
2540 {
2541 pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
2542 }
2543 }
2544
2545 argn = ntohl(argn);
2546
2547 for (i = 0; i < argn; i++)
2548 {
2549 int len;
2550 char *arg;
2551
2552 /* length of each argument in bytes */
2553 pool_read(frontend, &len, sizeof(len));
2554
2555 for (i = 0; i < NUM_BACKENDS; i++)
2556 {
2557 if (VALID_BACKEND(i))
2558 {
2559 pool_write(CONNECTION(backend, i), &len, sizeof(len));
2560 }
2561 }
2562
2563 len = ntohl(len);
2564
2565 /* argument value itself */
2566 if ((arg = pool_read2(frontend, len)) == NULL)
2567 ereport(FATAL,
2568 (return_code(2),
2569 errmsg("failed to process function call"),
2570 errdetail("read from frontend failed")));
2571
2572 for (i = 0; i < NUM_BACKENDS; i++)
2573 {
2574 if (VALID_BACKEND(i))
2575 {
2576 pool_write(CONNECTION(backend, i), arg, len);
2577 }
2578 }
2579 }
2580
2581 for (i = 0; i < NUM_BACKENDS; i++)
2582 {
2583 if (VALID_BACKEND(i))
2584 {
2585 pool_flush(CONNECTION(backend, i));
2586 }
2587 }
2588 return POOL_CONTINUE;
2589 }
2590
2591 POOL_STATUS
ProcessFrontendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2592 ProcessFrontendResponse(POOL_CONNECTION * frontend,
2593 POOL_CONNECTION_POOL * backend)
2594 {
2595 char fkind;
2596 char *bufp = NULL;
2597 char *contents;
2598 POOL_STATUS status;
2599 int len = 0;
2600
2601 /* Get session context */
2602 pool_get_session_context(false);
2603
2604 if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
2605 return POOL_CONTINUE;
2606
2607 /* Are we suspending reading from frontend? */
2608 if (pool_is_suspend_reading_from_frontend())
2609 return POOL_CONTINUE;
2610
2611 pool_read(frontend, &fkind, 1);
2612
2613 ereport(DEBUG5,
2614 (errmsg("processing frontend response"),
2615 errdetail("received kind '%c'(%02x) from frontend", fkind, fkind)));
2616
2617
2618 if (MAJOR(backend) == PROTO_MAJOR_V3)
2619 {
2620 if (pool_read(frontend, &len, sizeof(len)) < 0)
2621 ereport(ERROR,
2622 (errmsg("unable to process frontend response"),
2623 errdetail("failed to read message length from frontend. frontend abnormally exited")));
2624
2625 len = ntohl(len) - 4;
2626 if (len > 0)
2627 bufp = pool_read2(frontend, len);
2628 else if (len < 0)
2629 ereport(ERROR,
2630 (errmsg("frontend message length is less than 4 (kind: %c)", fkind)));
2631 }
2632 else
2633 {
2634 if (fkind != 'F')
2635 bufp = pool_read_string(frontend, &len, 0);
2636 }
2637
2638 if (len > 0 && bufp == NULL)
2639 ereport(ERROR,
2640 (errmsg("unable to process frontend response"),
2641 errdetail("failed to read message from frontend. frontend abnormally exited")));
2642
2643 if (fkind != 'S' && pool_is_ignore_till_sync())
2644 {
2645 /*
2646 * Flag setting for calling ProcessBackendResponse() in
2647 * pool_process_query().
2648 */
2649 if (!pool_is_query_in_progress())
2650 pool_set_query_in_progress();
2651
2652 return POOL_CONTINUE;
2653 }
2654
2655 pool_unset_doing_extended_query_message();
2656
2657 /*
2658 * Allocate buffer and copy the packet contents. Because inside these
2659 * protocol modules, pool_read2 maybe called and modify its buffer
2660 * contents.
2661 */
2662 if (len > 0)
2663 {
2664 contents = palloc(len);
2665 memcpy(contents, bufp, len);
2666 }
2667 else
2668 {
2669 /*
2670 * Set dummy content if len <= 0. this happens only when protocol
2671 * version is 2.
2672 */
2673 contents = palloc(1);
2674 memcpy(contents, "", 1);
2675 }
2676
2677 switch (fkind)
2678 {
2679 POOL_QUERY_CONTEXT *query_context;
2680 char *query;
2681 Node *node;
2682
2683 case 'X': /* Terminate */
2684 if (contents)
2685 pfree(contents);
2686 if (pool_config->log_client_messages)
2687 ereport(LOG,
2688 (errmsg("Terminate message from frontend.")));
2689 ereport(DEBUG5,
2690 (errmsg("Frontend terminated"),
2691 errdetail("received message kind 'X' from frontend")));
2692 return POOL_END;
2693
2694 case 'Q': /* Query */
2695 allow_close_transaction = 1;
2696 if (pool_config->log_client_messages)
2697 ereport(LOG,
2698 (errmsg("Query message from frontend."),
2699 errdetail("query: \"%s\"", contents)));
2700 status = SimpleQuery(frontend, backend, len, contents);
2701 break;
2702
2703 case 'E': /* Execute */
2704 allow_close_transaction = 1;
2705 pool_set_doing_extended_query_message();
2706 if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2707 pool_set_query_in_progress();
2708 status = Execute(frontend, backend, len, contents);
2709 break;
2710
2711 case 'P': /* Parse */
2712 allow_close_transaction = 0;
2713 pool_set_doing_extended_query_message();
2714 status = Parse(frontend, backend, len, contents);
2715 break;
2716
2717 case 'B': /* Bind */
2718 pool_set_doing_extended_query_message();
2719 status = Bind(frontend, backend, len, contents);
2720 break;
2721
2722 case 'C': /* Close */
2723 pool_set_doing_extended_query_message();
2724 if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2725 pool_set_query_in_progress();
2726 status = Close(frontend, backend, len, contents);
2727 break;
2728
2729 case 'D': /* Describe */
2730 pool_set_doing_extended_query_message();
2731 status = Describe(frontend, backend, len, contents);
2732 break;
2733
2734 case 'S': /* Sync */
2735 if (pool_config->log_client_messages)
2736 ereport(LOG,
2737 (errmsg("Sync message from frontend.")));
2738 pool_set_doing_extended_query_message();
2739 if (pool_is_ignore_till_sync())
2740 pool_unset_ignore_till_sync();
2741
2742 if (SL_MODE)
2743 {
2744 POOL_PENDING_MESSAGE *msg;
2745
2746 pool_unset_query_in_progress();
2747 msg = pool_pending_message_create('S', 0, NULL);
2748 pool_pending_message_add(msg);
2749 pool_pending_message_free_pending_message(msg);
2750 }
2751 else if (!pool_is_query_in_progress())
2752 pool_set_query_in_progress();
2753 status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2754
2755 if (SL_MODE)
2756 {
2757 /*
2758 * From now on suspend to read from frontend until we receive
2759 * ready for query message from backend.
2760 */
2761 pool_set_suspend_reading_from_frontend();
2762 }
2763 break;
2764
2765 case 'F': /* FunctionCall */
2766 if (pool_config->log_client_messages)
2767 {
2768 int oid;
2769
2770 memcpy(&oid, contents, sizeof(int));
2771 ereport(LOG,
2772 (errmsg("FunctionCall message from frontend."),
2773 errdetail("oid: \"%d\"", ntohl(oid))));
2774 }
2775
2776 /*
2777 * Create dummy query context as if it were an INSERT.
2778 */
2779 query_context = pool_init_query_context();
2780 query = "INSERT INTO foo VALUES(1)";
2781 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
2782
2783 node = get_dummy_insert_query_node();
2784 pool_start_query(query_context, query, strlen(query) + 1, node);
2785
2786 MemoryContextSwitchTo(old_context);
2787
2788 pool_where_to_send(query_context, query_context->original_query,
2789 query_context->parse_tree);
2790
2791 if (MAJOR(backend) == PROTO_MAJOR_V3)
2792 status = FunctionCall3(frontend, backend, len, contents);
2793 else
2794 status = FunctionCall(frontend, backend);
2795
2796 break;
2797
2798 case 'c': /* CopyDone */
2799 case 'd': /* CopyData */
2800 case 'f': /* CopyFail */
2801 case 'H': /* Flush */
2802 if (fkind == 'H' && pool_config->log_client_messages)
2803 ereport(LOG,
2804 (errmsg("Flush message from frontend.")));
2805 if (MAJOR(backend) == PROTO_MAJOR_V3)
2806 {
2807 if (fkind == 'H')
2808 {
2809 pool_set_doing_extended_query_message();
2810 }
2811 status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2812
2813 /*
2814 * After flush message received, extended query mode should be
2815 * continued.
2816 */
2817 if (fkind != 'H' && pool_is_doing_extended_query_message())
2818 {
2819 pool_unset_doing_extended_query_message();
2820 }
2821
2822 break;
2823 }
2824
2825 default:
2826 ereport(FATAL,
2827 (return_code(2),
2828 errmsg("unable to process frontend response"),
2829 errdetail("unknown message type %c(%02x)", fkind, fkind)));
2830
2831 }
2832 if (contents)
2833 pfree(contents);
2834
2835 if (status != POOL_CONTINUE)
2836 ereport(FATAL,
2837 (return_code(2),
2838 errmsg("unable to process frontend response")));
2839
2840 return status;
2841 }
2842
2843 POOL_STATUS
ProcessBackendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int * state,short * num_fields)2844 ProcessBackendResponse(POOL_CONNECTION * frontend,
2845 POOL_CONNECTION_POOL * backend,
2846 int *state, short *num_fields)
2847 {
2848 int status = POOL_CONTINUE;
2849 char kind;
2850
2851 /* Get session context */
2852 pool_get_session_context(false);
2853
2854 if (pool_is_ignore_till_sync())
2855 {
2856 if (pool_is_query_in_progress())
2857 pool_unset_query_in_progress();
2858
2859 /*
2860 * Check if we have pending data in backend connection cache. If we
2861 * do, it is likely that a sync message has been sent to backend and
2862 * the backend replied back to us. So we need to process it.
2863 */
2864 if (is_backend_cache_empty(backend))
2865 {
2866 return POOL_CONTINUE;
2867 }
2868 }
2869
2870 if (pool_is_skip_reading_from_backends())
2871 {
2872 pool_unset_skip_reading_from_backends();
2873 return POOL_CONTINUE;
2874 }
2875
2876 read_kind_from_backend(frontend, backend, &kind);
2877
2878 /*
2879 * Sanity check
2880 */
2881 if (kind == 0)
2882 {
2883 ereport(FATAL,
2884 (return_code(2),
2885 errmsg("unable to process backend response"),
2886 errdetail("invalid message kind sent by backend connection")));
2887 }
2888 ereport(DEBUG5,
2889 (errmsg("processing backend response"),
2890 errdetail("received kind '%c'(%02x) from backend", kind, kind)));
2891
2892
2893 if (MAJOR(backend) == PROTO_MAJOR_V3)
2894 {
2895 switch (kind)
2896 {
2897 case 'G': /* CopyInResponse */
2898 status = CopyInResponse(frontend, backend);
2899 break;
2900
2901 case 'S': /* ParameterStatus */
2902 status = ParameterStatus(frontend, backend);
2903 break;
2904
2905 case 'Z': /* ReadyForQuery */
2906 ereport(DEBUG5,
2907 (errmsg("processing backend response"),
2908 errdetail("Ready For Query received")));
2909 pool_unset_suspend_reading_from_frontend();
2910 status = ReadyForQuery(frontend, backend, true, true);
2911 #ifdef DEBUG
2912 extern bool stop_now;
2913
2914 if (stop_now)
2915 exit(0);
2916 #endif
2917 break;
2918
2919 case '1': /* ParseComplete */
2920 if (SL_MODE)
2921 {
2922 POOL_PENDING_MESSAGE *pmsg;
2923
2924 pmsg = pool_pending_message_get_previous_message();
2925 if (pmsg && pmsg->not_forward_to_frontend)
2926 {
2927 /*
2928 * parse_before_bind() was called. Do not forward the
2929 * parse complete message to frontend.
2930 */
2931 ereport(DEBUG5,
2932 (errmsg("processing backend response"),
2933 errdetail("do not forward parse complete message to frontend")));
2934 pool_discard_packet_contents(backend);
2935 pool_unset_query_in_progress();
2936 pool_set_command_success();
2937 status = POOL_CONTINUE;
2938 break;
2939 }
2940 }
2941 status = ParseComplete(frontend, backend);
2942 pool_set_command_success();
2943 pool_unset_query_in_progress();
2944 break;
2945
2946 case '2': /* BindComplete */
2947 status = BindComplete(frontend, backend);
2948 pool_set_command_success();
2949 pool_unset_query_in_progress();
2950 break;
2951
2952 case '3': /* CloseComplete */
2953 if (SL_MODE)
2954 {
2955 POOL_PENDING_MESSAGE *pmsg;
2956
2957 pmsg = pool_pending_message_get_previous_message();
2958 if (pmsg && pmsg->not_forward_to_frontend)
2959 {
2960 /*
2961 * parse_before_bind() was called. Do not forward the
2962 * close complete message to frontend.
2963 */
2964 ereport(DEBUG5,
2965 (errmsg("processing backend response"),
2966 errdetail("do not forward close complete message to frontend")));
2967 pool_discard_packet_contents(backend);
2968
2969 /*
2970 * Remove pending message here because
2971 * read_kind_from_backend() did not do it.
2972 */
2973 pmsg = pool_pending_message_pull_out();
2974 pool_pending_message_free_pending_message(pmsg);
2975
2976 if (pool_is_doing_extended_query_message())
2977 pool_unset_query_in_progress();
2978 pool_set_command_success();
2979 status = POOL_CONTINUE;
2980 break;
2981 }
2982 }
2983
2984 status = CloseComplete(frontend, backend);
2985 pool_set_command_success();
2986 if (pool_is_doing_extended_query_message())
2987 pool_unset_query_in_progress();
2988 break;
2989
2990 case 'E': /* ErrorResponse */
2991 if (pool_is_doing_extended_query_message())
2992 {
2993 char *message;
2994
2995 /* Log the error message which was possibly missed till
2996 * a sync message was sent */
2997 if (pool_extract_error_message(false, MAIN(backend), PROTO_MAJOR_V3,
2998 true, &message) == 1)
2999 {
3000 ereport(LOG,
3001 (errmsg("Error message from backend: DB node id: %d message: \"%s\"",
3002 MAIN_NODE_ID, message)));
3003 pfree(message);
3004 }
3005 }
3006
3007 /* Forward the error message to frontend */
3008 status = ErrorResponse3(frontend, backend);
3009 pool_unset_command_success();
3010 if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID :
3011 REAL_MAIN_NODE_ID) != 'I')
3012 {
3013 pool_set_failed_transaction();
3014
3015 /* Remove ongoing CREATE/DROP temp tables */
3016 pool_temp_tables_remove_pending();
3017 }
3018 if (pool_is_doing_extended_query_message())
3019 {
3020 pool_set_ignore_till_sync();
3021 pool_unset_query_in_progress();
3022 pool_unset_suspend_reading_from_frontend();
3023 if (SL_MODE)
3024 pool_discard_except_sync_and_ready_for_query(frontend, backend);
3025 }
3026 break;
3027
3028 case 'C': /* CommandComplete */
3029 status = CommandComplete(frontend, backend, true);
3030 pool_set_command_success();
3031 if (pool_is_doing_extended_query_message())
3032 pool_unset_query_in_progress();
3033 break;
3034
3035 case 't': /* ParameterDescription */
3036 status = ParameterDescription(frontend, backend);
3037 break;
3038
3039 case 'I': /* EmptyQueryResponse */
3040 status = CommandComplete(frontend, backend, false);
3041
3042 /*
3043 * Empty query response message should be treated same as
3044 * Command complete message. When we receive the Command
3045 * complete message, we unset the query in progress flag if
3046 * operated in streaming replication mode. So we unset the
3047 * flag as well. See bug 190 for more details.
3048 */
3049 if (pool_is_doing_extended_query_message())
3050 pool_unset_query_in_progress();
3051 break;
3052
3053 case 'T': /* RowDescription */
3054 status = SimpleForwardToFrontend(kind, frontend, backend);
3055 if (pool_is_doing_extended_query_message())
3056 pool_unset_query_in_progress();
3057 break;
3058
3059 case 'n': /* NoData */
3060 status = SimpleForwardToFrontend(kind, frontend, backend);
3061 if (pool_is_doing_extended_query_message())
3062 pool_unset_query_in_progress();
3063 break;
3064
3065 case 's': /* PortalSuspended */
3066 status = SimpleForwardToFrontend(kind, frontend, backend);
3067 if (pool_is_doing_extended_query_message())
3068 pool_unset_query_in_progress();
3069 break;
3070
3071 default:
3072 status = SimpleForwardToFrontend(kind, frontend, backend);
3073 break;
3074 }
3075 }
3076 else
3077 {
3078 switch (kind)
3079 {
3080 case 'A': /* NotificationResponse */
3081 status = NotificationResponse(frontend, backend);
3082 break;
3083
3084 case 'B': /* BinaryRow */
3085 status = BinaryRow(frontend, backend, *num_fields);
3086 break;
3087
3088 case 'C': /* CompletedResponse */
3089 status = CompletedResponse(frontend, backend);
3090 break;
3091
3092 case 'D': /* AsciiRow */
3093 status = AsciiRow(frontend, backend, *num_fields);
3094 break;
3095
3096 case 'E': /* ErrorResponse */
3097 status = ErrorResponse(frontend, backend);
3098 if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID :
3099 REAL_MAIN_NODE_ID) != 'I')
3100 pool_set_failed_transaction();
3101 break;
3102
3103 case 'G': /* CopyInResponse */
3104 status = CopyInResponse(frontend, backend);
3105 break;
3106
3107 case 'H': /* CopyOutResponse */
3108 status = CopyOutResponse(frontend, backend);
3109 break;
3110
3111 case 'I': /* EmptyQueryResponse */
3112 EmptyQueryResponse(frontend, backend);
3113 break;
3114
3115 case 'N': /* NoticeResponse */
3116 NoticeResponse(frontend, backend);
3117 break;
3118
3119 case 'P': /* CursorResponse */
3120 status = CursorResponse(frontend, backend);
3121 break;
3122
3123 case 'T': /* RowDescription */
3124 status = RowDescription(frontend, backend, num_fields);
3125 break;
3126
3127 case 'V': /* FunctionResultResponse and
3128 * FunctionVoidResponse */
3129 status = FunctionResultResponse(frontend, backend);
3130 break;
3131
3132 case 'Z': /* ReadyForQuery */
3133 status = ReadyForQuery(frontend, backend, true, true);
3134 break;
3135
3136 default:
3137 ereport(FATAL,
3138 (return_code(1),
3139 errmsg("Unknown message type %c(%02x)", kind, kind)));
3140 }
3141 }
3142
3143 /*
3144 * Do we receive ready for query while processing reset request?
3145 */
3146 if (kind == 'Z' && frontend->no_forward && *state == 1)
3147 {
3148 *state = 0;
3149 }
3150
3151 if (status != POOL_CONTINUE)
3152 ereport(FATAL,
3153 (return_code(2),
3154 errmsg("unable to process backend response for message kind '%c'", kind)));
3155
3156 return status;
3157 }
3158
3159 POOL_STATUS
CopyInResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3160 CopyInResponse(POOL_CONNECTION * frontend,
3161 POOL_CONNECTION_POOL * backend)
3162 {
3163 POOL_STATUS status;
3164
3165 /* forward to the frontend */
3166 if (MAJOR(backend) == PROTO_MAJOR_V3)
3167 {
3168 SimpleForwardToFrontend('G', frontend, backend);
3169 pool_flush(frontend);
3170 }
3171 else
3172 pool_write_and_flush(frontend, "G", 1);
3173
3174 status = CopyDataRows(frontend, backend, 1);
3175 return status;
3176 }
3177
3178 POOL_STATUS
CopyOutResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3179 CopyOutResponse(POOL_CONNECTION * frontend,
3180 POOL_CONNECTION_POOL * backend)
3181 {
3182 POOL_STATUS status;
3183
3184 /* forward to the frontend */
3185 if (MAJOR(backend) == PROTO_MAJOR_V3)
3186 {
3187 SimpleForwardToFrontend('H', frontend, backend);
3188 pool_flush(frontend);
3189 }
3190 else
3191 pool_write_and_flush(frontend, "H", 1);
3192
3193 status = CopyDataRows(frontend, backend, 0);
3194 return status;
3195 }
3196
3197 POOL_STATUS
CopyDataRows(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int copyin)3198 CopyDataRows(POOL_CONNECTION * frontend,
3199 POOL_CONNECTION_POOL * backend, int copyin)
3200 {
3201 char *string = NULL;
3202 int len;
3203 int i;
3204 int copy_count;
3205
3206 #ifdef DEBUG
3207 int j = 0;
3208 char buf[1024];
3209 #endif
3210
3211 copy_count = 0;
3212 for (;;)
3213 {
3214 if (copyin)
3215 {
3216 if (MAJOR(backend) == PROTO_MAJOR_V3)
3217 {
3218 char kind;
3219 char *contents = NULL;
3220
3221 pool_read(frontend, &kind, 1);
3222
3223 ereport(DEBUG5,
3224 (errmsg("copy data rows"),
3225 errdetail("read kind from frontend %c(%02x)", kind, kind)));
3226
3227 pool_read(frontend, &len, sizeof(len));
3228 len = ntohl(len) - 4;
3229 if (len > 0)
3230 contents = pool_read2(frontend, len);
3231
3232 SimpleForwardToBackend(kind, frontend, backend, len, contents);
3233
3234 /* CopyData? */
3235 if (kind == 'd')
3236 {
3237 copy_count++;
3238 continue;
3239 }
3240 else
3241 {
3242 if (pool_config->log_client_messages && copy_count != 0)
3243 ereport(LOG,
3244 (errmsg("CopyData message from frontend."),
3245 errdetail("count: %d", copy_count)));
3246 if (kind == 'c' && pool_config->log_client_messages)
3247 ereport(LOG,
3248 (errmsg("CopyDone message from frontend.")));
3249 if (kind == 'f' && pool_config->log_client_messages)
3250 ereport(LOG,
3251 (errmsg("CopyFail message from frontend.")));
3252 ereport(DEBUG1,
3253 (errmsg("copy data rows"),
3254 errdetail("invalid copyin kind. expected 'd' got '%c'", kind)));
3255 break;
3256 }
3257 }
3258 else
3259 string = pool_read_string(frontend, &len, 1);
3260 }
3261 else
3262 {
3263 /* CopyOut */
3264 if (MAJOR(backend) == PROTO_MAJOR_V3)
3265 {
3266 signed char kind;
3267
3268 kind = pool_read_kind(backend);
3269
3270 SimpleForwardToFrontend(kind, frontend, backend);
3271
3272 /* CopyData? */
3273 if (kind == 'd')
3274 continue;
3275 else
3276 break;
3277 }
3278 else
3279 {
3280 for (i = 0; i < NUM_BACKENDS; i++)
3281 {
3282 if (VALID_BACKEND(i))
3283 {
3284 string = pool_read_string(CONNECTION(backend, i), &len, 1);
3285 }
3286 }
3287 }
3288 }
3289
3290 if (string == NULL)
3291 ereport(FATAL,
3292 (errmsg("unable to copy data rows"),
3293 errdetail("cannot read string message from backend")));
3294
3295 #ifdef DEBUG
3296 strlcpy(buf, string, sizeof(buf));
3297 ereport(DEBUG1,
3298 (errmsg("copy data rows"),
3299 errdetail("copy line %d %d bytes :%s:", j++, len, buf)));
3300 #endif
3301
3302 if (copyin)
3303 {
3304 for (i = 0; i < NUM_BACKENDS; i++)
3305 {
3306 if (VALID_BACKEND(i))
3307 {
3308 pool_write(CONNECTION(backend, i), string, len);
3309 }
3310 }
3311 }
3312 else
3313 pool_write(frontend, string, len);
3314
3315 if (len == PROTO_MAJOR_V3)
3316 {
3317 /* end of copy? */
3318 if (string[0] == '\\' &&
3319 string[1] == '.' &&
3320 string[2] == '\n')
3321 {
3322 break;
3323 }
3324 }
3325 }
3326
3327 /*
3328 * Wait till backend responds
3329 */
3330 if (copyin)
3331 {
3332 for (i = 0; i < NUM_BACKENDS; i++)
3333 {
3334 if (VALID_BACKEND(i))
3335 {
3336 pool_flush(CONNECTION(backend, i));
3337
3338 /*
3339 * Check response from the backend. First check SSL and read
3340 * buffer of the backend. It is possible that there's an error
3341 * message in the buffer if the COPY command went wrong.
3342 * Otherwise wait for data arrival to the backend socket.
3343 */
3344 if (!pool_ssl_pending(CONNECTION(backend, i)) &&
3345 pool_read_buffer_is_empty(CONNECTION(backend, i)) &&
3346 synchronize(CONNECTION(backend, i)))
3347 ereport(FATAL,
3348 (return_code(2),
3349 errmsg("unable to copy data rows"),
3350 errdetail("failed to synchronize")));
3351 }
3352 }
3353 }
3354 else
3355 pool_flush(frontend);
3356
3357 return POOL_CONTINUE;
3358 }
3359
3360 /*
3361 * This function raises intentional error to make backends the same
3362 * transaction state.
3363 */
3364 void
raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)3365 raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)
3366 {
3367 int i;
3368 POOL_SESSION_CONTEXT *session_context;
3369 POOL_QUERY_CONTEXT *query_context;
3370
3371 /* Get session context */
3372 session_context = pool_get_session_context(false);
3373
3374 query_context = session_context->query_context;
3375
3376 if (MAIN_REPLICA &&
3377 TSTATE(backend, PRIMARY_NODE_ID) == 'T' &&
3378 PRIMARY_NODE_ID != MAIN_NODE_ID &&
3379 query_context &&
3380 is_select_query(query_context->parse_tree, query_context->original_query))
3381 {
3382 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
3383 if (pool_is_doing_extended_query_message())
3384 {
3385 do_error_execute_command(backend, PRIMARY_NODE_ID, PROTO_MAJOR_V3);
3386 }
3387 else
3388 {
3389 do_error_command(CONNECTION(backend, PRIMARY_NODE_ID), MAJOR(backend));
3390 }
3391 ereport(DEBUG1,
3392 (errmsg("raising intentional error"),
3393 errdetail("generating intentional error to sync backends transaction states")));
3394 }
3395
3396 if (REPLICATION &&
3397 TSTATE(backend, REAL_MAIN_NODE_ID) == 'T' &&
3398 !pool_config->replicate_select &&
3399 query_context &&
3400 is_select_query(query_context->parse_tree, query_context->original_query))
3401 {
3402 for (i = 0; i < NUM_BACKENDS; i++)
3403 {
3404 /*
3405 * Send a syntax error query to all backends except the node which
3406 * the original query was sent.
3407 */
3408 if (pool_is_node_to_be_sent(query_context, i))
3409 continue;
3410 else
3411 pool_set_node_to_be_sent(query_context, i);
3412
3413 if (VALID_BACKEND(i))
3414 {
3415 /*
3416 * We must abort transaction to sync transaction state. If the
3417 * error was caused by an Execute message, we must send
3418 * invalid Execute message to abort transaction.
3419 *
3420 * Because extended query protocol ignores all messages before
3421 * receiving Sync message inside error state.
3422 */
3423 if (pool_is_doing_extended_query_message())
3424 {
3425 do_error_execute_command(backend, i, PROTO_MAJOR_V3);
3426 }
3427 else
3428 {
3429 do_error_command(CONNECTION(backend, i), MAJOR(backend));
3430 }
3431 }
3432 }
3433 }
3434 }
3435
3436 /*---------------------------------------------------
3437 * Check various errors from backend. return values:
3438 * 0: no error
3439 * 1: deadlock detected
3440 * 2: serialization error detected
3441 * 3: query cancel
3442 * detected: 4
3443 *---------------------------------------------------
3444 */
3445 static int
check_errors(POOL_CONNECTION_POOL * backend,int backend_id)3446 check_errors(POOL_CONNECTION_POOL * backend, int backend_id)
3447 {
3448
3449 /*
3450 * Check dead lock error on the main node and abort transactions on all
3451 * nodes if so.
3452 */
3453 if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3454 return 1;
3455
3456 /*-----------------------------------------------------------------------------------
3457 * Check serialization failure error and abort
3458 * transactions on all nodes if so. Otherwise we allow
3459 * data inconsistency among DB nodes. See following
3460 * scenario: (M:main, S:replica)
3461 *
3462 * M:S1:BEGIN;
3463 * M:S2:BEGIN;
3464 * S:S1:BEGIN;
3465 * S:S2:BEGIN;
3466 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3467 * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3468 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3469 * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3470 * M:S1:UPDATE t1 SET i = i + 1;
3471 * S:S1:UPDATE t1 SET i = i + 1;
3472 * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
3473 * S:S1:COMMIT;
3474 * M:S1:COMMIT;
3475 * M:S2:ERROR: could not serialize access due to concurrent update
3476 * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
3477 *-----------------------------------------------------------------------------------
3478 */
3479 if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend), true) == SPECIFIED_ERROR)
3480 return 2;
3481
3482 /*-------------------------------------------------------------------------------
3483 * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
3484 * This happens in following scenario:
3485 *
3486 * M:S1:BEGIN;
3487 * S:S1:BEGIN;
3488 * M:S1:SELECT 1; <-- only sent to MAIN
3489 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3490 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3491 * M: <-- error
3492 * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
3493 *-------------------------------------------------------------------------------
3494 */
3495
3496 if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3497 return 3;
3498
3499 /* check query cancel error */
3500 if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3501 return 4;
3502
3503 return 0;
3504 }
3505
3506 static void
generate_error_message(char * prefix,int specific_error,char * query)3507 generate_error_message(char *prefix, int specific_error, char *query)
3508 {
3509 POOL_SESSION_CONTEXT *session_context;
3510
3511 session_context = pool_get_session_context(true);
3512 if (!session_context)
3513 return;
3514
3515 static char *error_messages[] = {
3516 "received deadlock error message from main node. query: %s",
3517 "received serialization failure error message from main node. query: %s",
3518 "received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
3519 "received query cancel error message from main node. query: %s"
3520 };
3521
3522 String *msg;
3523
3524 if (specific_error < 1 || specific_error > sizeof(error_messages) / sizeof(char *))
3525 {
3526 ereport(LOG,
3527 (errmsg("generate_error_message: invalid specific_error: %d", specific_error)));
3528 return;
3529 }
3530
3531 specific_error--;
3532
3533 msg = init_string(prefix);
3534 string_append_char(msg, error_messages[specific_error]);
3535 ereport(LOG,
3536 (errmsg(msg->data, query)));
3537 free_string(msg);
3538 }
3539
3540 /*
3541 * Make per DB node statement log
3542 */
3543 void
per_node_statement_log(POOL_CONNECTION_POOL * backend,int node_id,char * query)3544 per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query)
3545 {
3546 POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3547
3548 if (pool_config->log_per_node_statement)
3549 ereport(LOG,
3550 (errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query)));
3551 }
3552
3553 /*
3554 * Check kind and produce error message
3555 * All data read in this function is returned to stream.
3556 */
3557 void
per_node_error_log(POOL_CONNECTION_POOL * backend,int node_id,char * query,char * prefix,bool unread)3558 per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, char *prefix, bool unread)
3559 {
3560 POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3561 char *message;
3562 char kind;
3563
3564 pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
3565 pool_unread(CONNECTION(backend, node_id), &kind, sizeof(kind));
3566
3567 if (kind != 'E' && kind != 'N')
3568 {
3569 return;
3570 }
3571
3572 if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), unread, &message) == 1)
3573 {
3574 ereport(LOG,
3575 (errmsg("%s: DB node id: %d backend pid: %d statement: \"%s\" message: \"%s\"",
3576 prefix, node_id, ntohl(slot->pid), query, message)));
3577 pfree(message);
3578 }
3579 }
3580
3581 /*
3582 * Send parse message to primary/main node and wait for reply if particular
3583 * message is not yet parsed on the primary/main node but parsed on other
3584 * node. Caller must provide the parse message data as "message".
3585 */
parse_before_bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,POOL_SENT_MESSAGE * message,POOL_SENT_MESSAGE * bind_message)3586 static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
3587 POOL_CONNECTION_POOL * backend,
3588 POOL_SENT_MESSAGE * message,
3589 POOL_SENT_MESSAGE * bind_message)
3590 {
3591 int i;
3592 int len = message->len;
3593 char kind = '\0';
3594 char *contents = message->contents;
3595 bool parse_was_sent = false;
3596 bool backup[MAX_NUM_BACKENDS];
3597 POOL_QUERY_CONTEXT *qc = message->query_context;
3598
3599 memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
3600
3601 if (SL_MODE)
3602 {
3603 if (message->kind == 'P' && qc->where_to_send[PRIMARY_NODE_ID] == 0)
3604 {
3605 POOL_PENDING_MESSAGE *pmsg;
3606 POOL_QUERY_CONTEXT *new_qc;
3607 char message_body[1024];
3608 int offset;
3609 int message_len;
3610
3611 /*
3612 * we are in streaming replication mode and the parse message has
3613 * not been sent to primary yet
3614 */
3615
3616 /* Prepare modified query context */
3617 new_qc = pool_query_context_shallow_copy(qc);
3618 memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send));
3619 new_qc->where_to_send[PRIMARY_NODE_ID] = 1;
3620 new_qc->virtual_main_node_id = PRIMARY_NODE_ID;
3621 new_qc->load_balance_node_id = PRIMARY_NODE_ID;
3622
3623 /*
3624 * Before sending the parse message to the primary, we need to
3625 * close the named statement. Otherwise we will get an error from
3626 * backend if the named statement already exists. This could
3627 * happened if parse_before_bind is called with a bind message
3628 * using same named statement. If the named statement does not
3629 * exist, it's fine. PostgreSQL just ignores a request trying to
3630 * close a non-existing statement. If the statement is unnamed
3631 * one, we do not need it because unnamed statement can be
3632 * overwritten anytime.
3633 */
3634 message_body[0] = 'S';
3635 offset = strlen(bind_message->contents) + 1;
3636
3637 ereport(DEBUG1,
3638 (errmsg("parse before bind"),
3639 errdetail("close statement: %s", bind_message->contents + offset)));
3640
3641 if (bind_message->contents[offset] != '\0')
3642 {
3643 message_len = 1 + strlen(bind_message->contents + offset) + 1;
3644 StrNCpy(message_body + 1, bind_message->contents + offset, sizeof(message_body) - 1);
3645 pool_extended_send_and_wait(qc, "C", message_len, message_body, 1, PRIMARY_NODE_ID, false);
3646 /* Add pending message */
3647 pmsg = pool_pending_message_create('C', message_len, message_body);
3648 pmsg->not_forward_to_frontend = true;
3649 pool_pending_message_dest_set(pmsg, new_qc);
3650 pool_pending_message_add(pmsg);
3651 pool_pending_message_free_pending_message(pmsg);
3652 }
3653
3654 /* Send parse message to primary node */
3655 ereport(DEBUG1,
3656 (errmsg("parse before bind"),
3657 errdetail("waiting for primary completing parse")));
3658
3659 pool_extended_send_and_wait(qc, "P", len, contents, 1, PRIMARY_NODE_ID, false);
3660
3661 /* Add pending message */
3662 pmsg = pool_pending_message_create('P', len, contents);
3663 pmsg->not_forward_to_frontend = true;
3664 pool_pending_message_dest_set(pmsg, new_qc);
3665 pool_pending_message_add(pmsg);
3666 pool_pending_message_free_pending_message(pmsg);
3667
3668 /* Replace the query context of bind message */
3669 bind_message->query_context = new_qc;
3670
3671 #ifdef NOT_USED
3672 /*
3673 * XXX pool_remove_sent_message() will pfree memory allocated by "contents".
3674 */
3675
3676 /* Remove old sent message */
3677 pool_remove_sent_message('P', contents);
3678 /* Create and add sent message of this parse message */
3679 msg = pool_create_sent_message('P', len, contents, 0, contents, new_qc);
3680 pool_add_sent_message(msg);
3681 #endif
3682 /* Replace the query context of parse message */
3683 message->query_context = new_qc;
3684
3685 return POOL_CONTINUE;
3686 }
3687 else
3688 {
3689 ereport(DEBUG1,
3690 (errmsg("parse before bind"),
3691 errdetail("no need to re-send parse")));
3692 return POOL_CONTINUE;
3693 }
3694 }
3695 else
3696 {
3697 /* expect to send to main node only */
3698 for (i = 0; i < NUM_BACKENDS; i++)
3699 {
3700 if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
3701 {
3702 ereport(DEBUG1,
3703 (errmsg("parse before bind"),
3704 errdetail("waiting for backend %d completing parse", i)));
3705
3706 pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
3707 }
3708 else
3709 {
3710 qc->where_to_send[i] = 0;
3711 }
3712 }
3713 }
3714
3715 for (i = 0; i < NUM_BACKENDS; i++)
3716 {
3717 if (qc->where_to_send[i])
3718 {
3719 parse_was_sent = true;
3720 break;
3721 }
3722 }
3723
3724 if (!SL_MODE && parse_was_sent)
3725 {
3726 pool_set_query_in_progress();
3727
3728 while (kind != '1')
3729 {
3730 PG_TRY();
3731 {
3732 read_kind_from_backend(frontend, backend, &kind);
3733 pool_discard_packet_contents(backend);
3734 }
3735 PG_CATCH();
3736 {
3737 memcpy(qc->where_to_send, backup, sizeof(backup));
3738 PG_RE_THROW();
3739 }
3740 PG_END_TRY();
3741 }
3742 }
3743
3744 memcpy(qc->where_to_send, backup, sizeof(backup));
3745 return POOL_CONTINUE;
3746 }
3747
3748 /*
3749 * Find victim nodes by "decide by majority" rule and returns array
3750 * of victim node ids. If no victim is found, return NULL.
3751 *
3752 * Arguments:
3753 * ntuples: Array of number of affected tuples. -1 represents down node.
3754 * nmembers: Number of elements in ntuples.
3755 * main_node: The main node id. Less than 0 means ignore this parameter.
3756 * number_of_nodes: Number of elements in victim nodes array.
3757 *
3758 * Note: If no one wins and main_node >= 0, winner would be the
3759 * main and other nodes who has same number of tuples as the main.
3760 *
3761 * Caution: Returned victim node array is allocated in static memory
3762 * of this function. Subsequent calls to this function will overwrite
3763 * the memory.
3764 */
3765 static int *
find_victim_nodes(int * ntuples,int nmembers,int main_node,int * number_of_nodes)3766 find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes)
3767 {
3768 static int victim_nodes[MAX_NUM_BACKENDS];
3769 static int votes[MAX_NUM_BACKENDS];
3770 int maxvotes;
3771 int majority_ntuples;
3772 int me;
3773 int cnt;
3774 int healthy_nodes;
3775 int i,
3776 j;
3777
3778 healthy_nodes = 0;
3779 *number_of_nodes = 0;
3780 maxvotes = 0;
3781 majority_ntuples = 0;
3782
3783 for (i = 0; i < nmembers; i++)
3784 {
3785 me = ntuples[i];
3786
3787 /* Health node? */
3788 if (me < 0)
3789 {
3790 votes[i] = -1;
3791 continue;
3792 }
3793
3794 healthy_nodes++;
3795 votes[i] = 1;
3796
3797 for (j = 0; j < nmembers; j++)
3798 {
3799 if (i != j && me == ntuples[j])
3800 {
3801 votes[i]++;
3802
3803 if (votes[i] > maxvotes)
3804 {
3805 maxvotes = votes[i];
3806 majority_ntuples = me;
3807 }
3808 }
3809 }
3810 }
3811
3812 /* Everyone is different */
3813 if (maxvotes == 1)
3814 {
3815 /* Main node is specified? */
3816 if (main_node < 0)
3817 return NULL;
3818
3819 /*
3820 * If main node is specified, let it and others who has same ntuples
3821 * win.
3822 */
3823 majority_ntuples = ntuples[main_node];
3824 }
3825 else
3826 {
3827 /* Find number of majority */
3828 cnt = 0;
3829 for (i = 0; i < nmembers; i++)
3830 {
3831 if (votes[i] == maxvotes)
3832 {
3833 cnt++;
3834 }
3835 }
3836
3837 if (cnt <= healthy_nodes / 2.0)
3838 {
3839 /* No one wins */
3840
3841 /* Main node is specified? */
3842 if (main_node < 0)
3843 return NULL;
3844
3845 /*
3846 * If main node is specified, let it and others who has same
3847 * ntuples win.
3848 */
3849 majority_ntuples = ntuples[main_node];
3850 }
3851 }
3852
3853 /* Make victim nodes list */
3854 for (i = 0; i < nmembers; i++)
3855 {
3856 if (ntuples[i] >= 0 && ntuples[i] != majority_ntuples)
3857 {
3858 victim_nodes[(*number_of_nodes)++] = i;
3859 }
3860 }
3861
3862 return victim_nodes;
3863 }
3864
3865
3866 /*
3867 * flatten_set_variable_args
3868 * Given a parsenode List as emitted by the grammar for SET,
3869 * convert to the flat string representation used by GUC.
3870 *
3871 * We need to be told the name of the variable the args are for, because
3872 * the flattening rules vary (ugh).
3873 *
3874 * The result is NULL if args is NIL (ie, SET ... TO DEFAULT), otherwise
3875 * a palloc'd string.
3876 */
3877 static char *
flatten_set_variable_args(const char * name,List * args)3878 flatten_set_variable_args(const char *name, List *args)
3879 {
3880 StringInfoData buf;
3881 ListCell *l;
3882
3883 /* Fast path if just DEFAULT */
3884 if (args == NIL)
3885 return NULL;
3886
3887 initStringInfo(&buf);
3888
3889 /*
3890 * Each list member may be a plain A_Const node, or an A_Const within a
3891 * TypeCast; the latter case is supported only for ConstInterval arguments
3892 * (for SET TIME ZONE).
3893 */
3894 foreach(l, args)
3895 {
3896 Node *arg = (Node *) lfirst(l);
3897 char *val;
3898 A_Const *con;
3899
3900 if (l != list_head(args))
3901 appendStringInfoString(&buf, ", ");
3902
3903 if (IsA(arg, TypeCast))
3904 {
3905 TypeCast *tc = (TypeCast *) arg;
3906
3907 arg = tc->arg;
3908 }
3909
3910 if (!IsA(arg, A_Const))
3911 elog(ERROR, "unrecognized node type: %d", (int) nodeTag(arg));
3912 con = (A_Const *) arg;
3913
3914 switch (nodeTag(&con->val))
3915 {
3916 case T_Integer:
3917 appendStringInfo(&buf, "%d", intVal(&con->val));
3918 break;
3919 case T_Float:
3920 /* represented as a string, so just copy it */
3921 appendStringInfoString(&buf, strVal(&con->val));
3922 break;
3923 case T_String:
3924 val = strVal(&con->val);
3925 appendStringInfoString(&buf, val);
3926 break;
3927 default:
3928 ereport(ERROR, (errmsg("unrecognized node type: %d",
3929 (int) nodeTag(&con->val))));
3930 break;
3931 }
3932 }
3933
3934 return buf.data;
3935 }
3936
3937 #ifdef NOT_USED
3938 /* Called when sync message is received.
3939 * Wait till ready for query received.
3940 */
3941 static void
pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)3942 pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)
3943 {
3944 char kind;
3945 int len;
3946 int poplen;
3947 char *buf;
3948 int i;
3949
3950 for (i = 0; i < NUM_BACKENDS; i++)
3951 {
3952 if (VALID_BACKEND(i))
3953 {
3954 for (;;)
3955 {
3956 pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3957 ereport(DEBUG5,
3958 (errmsg("pool_wait_till_ready_for_query: kind: %c", kind)));
3959 pool_push(CONNECTION(backend, i), &kind, sizeof(kind));
3960 pool_read(CONNECTION(backend, i), &len, sizeof(len));
3961 pool_push(CONNECTION(backend, i), &len, sizeof(len));
3962 if ((ntohl(len) - sizeof(len)) > 0)
3963 {
3964 buf = pool_read2(CONNECTION(backend, i), ntohl(len) - sizeof(len));
3965 pool_push(CONNECTION(backend, i), buf, ntohl(len) - sizeof(len));
3966 }
3967
3968 if (kind == 'Z') /* Ready for query? */
3969 {
3970 pool_pop(CONNECTION(backend, i), &poplen);
3971 ereport(DEBUG5,
3972 (errmsg("pool_wait_till_ready_for_query: backend:%d ready for query found. buffer length:%d",
3973 i, CONNECTION(backend, i)->len)));
3974 break;
3975 }
3976 }
3977 }
3978 }
3979 }
3980 #endif
3981
3982 /*
3983 * Called when error response received in streaming replication mode and doing
3984 * extended query. Remove all pending messages and backend message buffer data
3985 * except POOL_SYNC pending message and ready for query. If sync message is
3986 * not received yet, continue to read data from frontend until a sync message
3987 * is read.
3988 */
3989 static void
pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3990 pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
3991 POOL_CONNECTION_POOL * backend)
3992 {
3993 POOL_PENDING_MESSAGE *pmsg;
3994 int i;
3995
3996 if (!pool_is_doing_extended_query_message() || !SL_MODE)
3997 return;
3998
3999 /*
4000 * Check to see if we already received a sync message. If not, call
4001 * ProcessFrontendResponse() to get the sync message from client.
4002 */
4003 pmsg = pool_pending_message_get(POOL_SYNC);
4004 if (pmsg == NULL)
4005 {
4006 char kind;
4007 int len;
4008 POOL_PENDING_MESSAGE *msg;
4009 char *contents = NULL;
4010
4011 for (;;)
4012 {
4013 pool_read(frontend, &kind, sizeof(kind));
4014 pool_read(frontend, &len, sizeof(len));
4015 len = ntohl(len) - sizeof(len);
4016 if (len > 0)
4017 contents = pool_read2(frontend, len);
4018 if (kind == 'S')
4019 {
4020 msg = pool_pending_message_create('S', 0, NULL);
4021 pool_pending_message_add(msg);
4022 pool_pending_message_free_pending_message(msg);
4023 SimpleForwardToBackend(kind, frontend, backend, len, contents);
4024 break;
4025 }
4026 }
4027 }
4028 else
4029 pool_pending_message_free_pending_message(pmsg);
4030
4031 /* Remove all pending messages except sync message */
4032 do
4033 {
4034 pmsg = pool_pending_message_head_message();
4035 if (pmsg && pmsg->type == POOL_SYNC)
4036 {
4037 ereport(DEBUG1,
4038 (errmsg("Process backend response: sync pending message found after receiving error response")));
4039 pool_unset_ignore_till_sync();
4040 pool_pending_message_free_pending_message(pmsg);
4041 break;
4042 }
4043 pool_pending_message_free_pending_message(pmsg);
4044 pmsg = pool_pending_message_pull_out();
4045 pool_pending_message_free_pending_message(pmsg);
4046 }
4047 while (pmsg);
4048
4049 pool_pending_message_reset_previous_message();
4050
4051 /* Discard read buffer except "Ready for query" */
4052 for (i = 0; i < NUM_BACKENDS; i++)
4053 {
4054 if (VALID_BACKEND(i))
4055 {
4056 char kind;
4057 int len;
4058 int sts;
4059
4060 while (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
4061 {
4062 sts = pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
4063 if (sts < 0 || kind == '\0')
4064 {
4065 ereport(DEBUG1,
4066 (errmsg("pool_discard_except_sync_and_ready_for_query: EOF detected while reading from backend: %d buffer length: %d sts: %d",
4067 i, CONNECTION(backend, i)->len, sts)));
4068 pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
4069 break;
4070 }
4071
4072 if (kind == 'Z') /* Ready for query? */
4073 {
4074 pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
4075 ereport(DEBUG1,
4076 (errmsg("pool_discard_except_sync_and_ready_for_query: Ready for query found. backend:%d",
4077 i)));
4078 break;
4079 }
4080 else
4081 {
4082 /* Read and discard packet */
4083 pool_read(CONNECTION(backend, i), &len, sizeof(len));
4084 if ((ntohl(len) - sizeof(len)) > 0)
4085 {
4086 pool_read2(CONNECTION(backend, i), ntohl(len) - sizeof(len));
4087 }
4088 ereport(DEBUG1,
4089 (errmsg("pool_discard_except_sync_and_ready_for_query: discarding packet %c (len:%lu) of backend:%d", kind, ntohl(len) - sizeof(len), i)));
4090 }
4091 }
4092 }
4093 }
4094 }
4095
4096 /*
4097 * Handle misc treatment when a command successfully completed.
4098 * Preconditions: query is in progress. The command is succeeded.
4099 */
4100 void
pool_at_command_success(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)4101 pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
4102 {
4103 Node *node;
4104 char *query;
4105
4106 /* Sanity checks */
4107 if (!pool_is_query_in_progress())
4108 {
4109 ereport(ERROR,
4110 (errmsg("pool_at_command_success: query is not in progress")));
4111 }
4112
4113 if (!pool_is_command_success())
4114 {
4115 ereport(ERROR,
4116 (errmsg("pool_at_command_success: command did not succeed")));
4117 }
4118
4119 node = pool_get_parse_tree();
4120
4121 if (!node)
4122 {
4123 ereport(ERROR,
4124 (errmsg("pool_at_command_success: no parse tree found")));
4125 }
4126
4127 query = pool_get_query_string();
4128
4129 if (query == NULL)
4130 {
4131 ereport(ERROR,
4132 (errmsg("pool_at_command_success: no query found")));
4133 }
4134
4135 /*
4136 * If the query was BEGIN/START TRANSACTION, clear the history that we had
4137 * a writing command in the transaction and forget the transaction
4138 * isolation level.
4139 *
4140 * XXX If BEGIN is received while we are already in an explicit
4141 * transaction, the command *successes* (just with a NOTICE message). In
4142 * this case we lose "writing_transaction" etc. info.
4143 */
4144 if (is_start_transaction_query(node))
4145 {
4146 if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
4147 pool_unset_writing_transaction();
4148
4149 pool_unset_failed_transaction();
4150 pool_unset_transaction_isolation();
4151 }
4152
4153 /*
4154 * If the query was COMMIT/ABORT, clear the history that we had a writing
4155 * command in the transaction and forget the transaction isolation level.
4156 * This is necessary if succeeding transaction is not an explicit one.
4157 */
4158 else if (is_commit_or_rollback_query(node))
4159 {
4160 if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
4161 pool_unset_writing_transaction();
4162
4163 pool_unset_failed_transaction();
4164 pool_unset_transaction_isolation();
4165 }
4166
4167 /*
4168 * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET SESSION
4169 * CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE, remember
4170 * it.
4171 */
4172 else if (is_set_transaction_serializable(node))
4173 {
4174 pool_set_transaction_isolation(POOL_SERIALIZABLE);
4175 }
4176
4177 /*
4178 * If 2PC commands has been executed, automatically close transactions on
4179 * standbys if there is any open transaction since 2PC commands close
4180 * transaction on primary.
4181 */
4182 else if (is_2pc_transaction_query(node))
4183 {
4184 close_standby_transactions(frontend, backend);
4185 }
4186
4187 else if (!is_select_query(node, query) || pool_has_function_call(node))
4188 {
4189 /*
4190 * If the query was not READ SELECT, and we are in an explicit
4191 * transaction or disable_load_balance_on_write is 'ALWAYS', remember
4192 * that we had a write query in this transaction.
4193 */
4194 if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' ||
4195 pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
4196 {
4197 /*
4198 * However, if the query is "SET TRANSACTION READ ONLY" or its
4199 * variant, don't set it.
4200 */
4201 if (!pool_is_transaction_read_only(node))
4202 {
4203 ereport(DEBUG1,
4204 (errmsg("not SET TRANSACTION READ ONLY")));
4205
4206 pool_set_writing_transaction();
4207 }
4208 }
4209
4210 /*
4211 * If the query was CREATE TEMP TABLE, discard temp table relcache
4212 * because we might have had persistent table relation cache which has
4213 * table name as the temp table.
4214 */
4215 if (IsA(node, CreateStmt))
4216 {
4217 CreateStmt *create_table_stmt = (CreateStmt *) node;
4218
4219 if (create_table_stmt->relation->relpersistence == 't')
4220 discard_temp_table_relcache();
4221 }
4222 }
4223 }
4224
4225 /*
4226 * read message length (V3 only)
4227 */
4228 int
pool_read_message_length(POOL_CONNECTION_POOL * cp)4229 pool_read_message_length(POOL_CONNECTION_POOL * cp)
4230 {
4231 int length,
4232 length0;
4233 int i;
4234
4235 /* read message from main node */
4236 pool_read(CONNECTION(cp, MAIN_NODE_ID), &length0, sizeof(length0));
4237 length0 = ntohl(length0);
4238
4239 ereport(DEBUG5,
4240 (errmsg("reading message length"),
4241 errdetail("slot: %d length: %d", MAIN_NODE_ID, length0)));
4242
4243 for (i = 0; i < NUM_BACKENDS; i++)
4244 {
4245 if (!VALID_BACKEND(i) || IS_MAIN_NODE_ID(i))
4246 {
4247 continue;
4248 }
4249
4250 pool_read(CONNECTION(cp, i), &length, sizeof(length));
4251
4252 length = ntohl(length);
4253 ereport(DEBUG5,
4254 (errmsg("reading message length"),
4255 errdetail("slot: %d length: %d", i, length)));
4256
4257 if (length != length0)
4258 {
4259 ereport(LOG,
4260 (errmsg("unable to read message length"),
4261 errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0)));
4262 return -1;
4263 }
4264
4265 }
4266
4267 if (length0 < 0)
4268 ereport(ERROR,
4269 (errmsg("unable to read message length"),
4270 errdetail("invalid message length (%d)", length)));
4271
4272 return length0;
4273 }
4274
4275 /*
4276 * read message length2 (V3 only)
4277 * unlike pool_read_message_length, this returns an array of message length.
4278 * The array is in the static storage, thus it will be destroyed by subsequent calls.
4279 */
4280 int *
pool_read_message_length2(POOL_CONNECTION_POOL * cp)4281 pool_read_message_length2(POOL_CONNECTION_POOL * cp)
4282 {
4283 int length,
4284 length0;
4285 int i;
4286 static int length_array[MAX_CONNECTION_SLOTS];
4287
4288 /* read message from main node */
4289 pool_read(CONNECTION(cp, MAIN_NODE_ID), &length0, sizeof(length0));
4290
4291 length0 = ntohl(length0);
4292 length_array[MAIN_NODE_ID] = length0;
4293 ereport(DEBUG5,
4294 (errmsg("reading message length"),
4295 errdetail("main slot: %d length: %d", MAIN_NODE_ID, length0)));
4296
4297 for (i = 0; i < NUM_BACKENDS; i++)
4298 {
4299 if (VALID_BACKEND(i) && !IS_MAIN_NODE_ID(i))
4300 {
4301 pool_read(CONNECTION(cp, i), &length, sizeof(length));
4302
4303 length = ntohl(length);
4304 ereport(DEBUG5,
4305 (errmsg("reading message length"),
4306 errdetail("main slot: %d length: %d", i, length)));
4307
4308 if (length != length0)
4309 {
4310 ereport(LOG,
4311 (errmsg("reading message length"),
4312 errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0)));
4313 }
4314
4315 if (length < 0)
4316 {
4317 ereport(ERROR,
4318 (errmsg("unable to read message length"),
4319 errdetail("invalid message length (%d)", length)));
4320 }
4321
4322 length_array[i] = length;
4323 }
4324
4325 }
4326 return &length_array[0];
4327 }
4328
4329 signed char
pool_read_kind(POOL_CONNECTION_POOL * cp)4330 pool_read_kind(POOL_CONNECTION_POOL * cp)
4331 {
4332 char kind0,
4333 kind;
4334 int i;
4335
4336 kind = -1;
4337 kind0 = 0;
4338
4339 for (i = 0; i < NUM_BACKENDS; i++)
4340 {
4341 if (!VALID_BACKEND(i))
4342 {
4343 continue;
4344 }
4345
4346 pool_read(CONNECTION(cp, i), &kind, sizeof(kind));
4347
4348 if (IS_MAIN_NODE_ID(i))
4349 {
4350 kind0 = kind;
4351 }
4352 else
4353 {
4354 if (kind != kind0)
4355 {
4356 char *message;
4357
4358 if (kind0 == 'E')
4359 {
4360 if (pool_extract_error_message(false, MAIN(cp), MAJOR(cp), true, &message) == 1)
4361 {
4362 ereport(LOG,
4363 (errmsg("pool_read_kind: error message from main backend:%s", message)));
4364 pfree(message);
4365 }
4366 }
4367 else if (kind == 'E')
4368 {
4369 if (pool_extract_error_message(false, CONNECTION(cp, i), MAJOR(cp), true, &message) == 1)
4370 {
4371 ereport(LOG,
4372 (errmsg("pool_read_kind: error message from %d th backend:%s", i, message)));
4373 pfree(message);
4374 }
4375 }
4376 ereport(ERROR,
4377 (errmsg("unable to read message kind"),
4378 errdetail("kind does not match between main(%x) slot[%d] (%x)", kind0, i, kind)));
4379 }
4380 }
4381 }
4382
4383 return kind;
4384 }
4385
4386 int
pool_read_int(POOL_CONNECTION_POOL * cp)4387 pool_read_int(POOL_CONNECTION_POOL * cp)
4388 {
4389 int data0,
4390 data;
4391 int i;
4392
4393 data = -1;
4394 data0 = 0;
4395
4396 for (i = 0; i < NUM_BACKENDS; i++)
4397 {
4398 if (!VALID_BACKEND(i))
4399 {
4400 continue;
4401 }
4402 pool_read(CONNECTION(cp, i), &data, sizeof(data));
4403 if (IS_MAIN_NODE_ID(i))
4404 {
4405 data0 = data;
4406 }
4407 else
4408 {
4409 if (data != data0)
4410 {
4411 ereport(ERROR,
4412 (errmsg("unable to read int value"),
4413 errdetail("data does not match between between main(%x) slot[%d] (%x)", data0, i, data)));
4414
4415 }
4416 }
4417 }
4418 return data;
4419 }
4420
4421 /*
4422 * Acquire snapshot in snapshot isolation mode.
4423 * If tstate_check is true, check the transaction state is 'T' (idle in transaction).
4424 * In case of starting an internal transaction, this should be false.
4425 */
4426 static void
si_get_snapshot(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,Node * node,bool tstate_check)4427 si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * node, bool tstate_check)
4428 {
4429 POOL_SESSION_CONTEXT *session_context;
4430
4431 session_context = pool_get_session_context(true);
4432 if (!session_context)
4433 return;
4434
4435 /*
4436 * From now on it is possible that query is actually sent to backend.
4437 * So we need to acquire snapshot while there's no committing backend
4438 * in snapshot isolation mode except while processing reset queries.
4439 * For this purpose, we send a query to know whether the transaction
4440 * is READ ONLY or not. Sending actual user's query is not possible
4441 * because it might cause rw-conflict, which in turn causes a
4442 * deadlock.
4443 */
4444 if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
4445 (!tstate_check || (tstate_check && TSTATE(backend, MAIN_NODE_ID) == 'T')) &&
4446 si_snapshot_acquire_command(node) &&
4447 !si_snapshot_prepared() &&
4448 frontend && frontend->no_forward == 0)
4449 {
4450 int i;
4451
4452 si_acquire_snapshot();
4453
4454 for (i = 0; i < NUM_BACKENDS; i++)
4455 {
4456 static char *si_query = "SELECT current_setting('transaction_read_only')";
4457 POOL_SELECT_RESULT *res;
4458
4459 /* We cannot use VALID_BACKEND macro here because load balance
4460 * node has not been decided yet.
4461 */
4462 if (!VALID_BACKEND_RAW(i))
4463 continue;
4464
4465 do_query(CONNECTION(backend, i), si_query, &res, MAJOR(backend));
4466 if (res)
4467 {
4468 if (res->data[0] && !strcmp(res->data[0], "on"))
4469 session_context->transaction_read_only = true;
4470 else
4471 session_context->transaction_read_only = false;
4472 free_select_result(res);
4473 }
4474 per_node_statement_log(backend, i, si_query);
4475 }
4476
4477 si_snapshot_acquired();
4478 }
4479 }
4480