1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2021 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 *
21 *---------------------------------------------------------------------
22 * pool_proto_modules.c: modules corresponding to message protocols.
23 * used by pool_process_query()
24 *---------------------------------------------------------------------
25 */
26 #include "config.h"
27 #include <errno.h>
28
29 #ifdef HAVE_SYS_TYPES_H
30 #include <sys/types.h>
31 #endif
32 #ifdef HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #ifdef HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38
39
40 #include <stdlib.h>
41 #include <unistd.h>
42 #include <string.h>
43 #include <netinet/in.h>
44 #include <ctype.h>
45
46 #include "pool.h"
47 #include "rewrite/pool_timestamp.h"
48 #include "rewrite/pool_lobj.h"
49 #include "protocol/pool_proto_modules.h"
50 #include "pool_config.h"
51 #include "parser/pool_string.h"
52 #include "context/pool_session_context.h"
53 #include "context/pool_query_context.h"
54 #include "utils/elog.h"
55 #include "utils/pool_select_walker.h"
56 #include "utils/pool_relcache.h"
57 #include "utils/pool_stream.h"
58 #include "query_cache/pool_memqcache.h"
59 #include "utils/pool_signal.h"
60 #include "utils/palloc.h"
61 #include "utils/memutils.h"
62 #include "pool_config_variables.h"
63
64 char *copy_table = NULL; /* copy table name */
65 char *copy_schema = NULL; /* copy table name */
66 char copy_delimiter; /* copy delimiter char */
67 char *copy_null = NULL; /* copy null string */
68
69 /*
70 * Non 0 if allow to close internal transaction. This variable was
71 * introduced on 2008/4/3 not to close an internal transaction when
72 * Sync message is received after receiving Parse message. This hack
73 * is for PHP-PDO.
74 */
75 static int allow_close_transaction = 1;
76
77 int is_select_pgcatalog = 0;
78 int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
79
80 /*
81 * last query string sent to simpleQuery()
82 */
83 char query_string_buffer[QUERY_STRING_BUFFER_LEN];
84
85 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
86 static void generate_error_message(char *prefix, int specific_error, char *query);
87 static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
88 POOL_CONNECTION_POOL *backend,
89 POOL_SENT_MESSAGE *message,
90 POOL_SENT_MESSAGE *bind_message);
91 static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes);
92 static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend,
93 POOL_CONNECTION_POOL *backend);
94
95 static char *
96 flatten_set_variable_args(const char *name, List *args);
97 static bool
98 process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context);
99 static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend);
100 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
101 POOL_CONNECTION_POOL *backend);
102
103 /*
104 * This is the workhorse of processing the pg_terminate_backend function to
105 * make sure that the use of function should not trigger the backend node failover.
106 *
107 * The function searches for the pg_terminate_backend() function call in the
108 * query parse tree and if the search comes out to be successful,
109 * the next step is to locate the pgpool-II child process and a backend node
110 * of that connection whose PID is specified in pg_terminate_backend argument.
111 *
112 * Once the connection is identified, we set the swallow_termination flag of
113 * that connection (in shared memory) and also sets the query destination to
114 * the same backend that hosts the connection.
115 *
116 * The function returns true on success, i.e. when the query contains the
117 * pg_terminate_backend call and that call refers to the backend
118 * connection that belongs to pgpool-II.
119 *
120 * Note: Since upon successful return this function has already
121 * set the destination backend node for the current query,
122 * so for that case pool_where_to_send() should not be called.
123 *
124 */
process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)125 static bool process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context)
126 {
127 /*
128 * locate pg_terminate_backend and get the pid argument, if pg_terminate_backend
129 * is present in the query
130 */
131 int backend_pid = pool_get_terminate_backend_pid(query_context->parse_tree);
132 if (backend_pid > 0)
133 {
134 int backend_node = 0;
135 ConnectionInfo* conn = pool_coninfo_backend_pid(backend_pid, &backend_node);
136 if(conn == NULL)
137 {
138 ereport(LOG,
139 (errmsg("found the pg_terminate_backend request for backend pid:%d, but the backend connection does not belong to pgpool-II",backend_pid)));
140 /* we are not able to find the backend connection with the pid
141 * so there is not much left for us to do here
142 */
143 return false;
144 }
145 ereport(LOG,
146 (errmsg("found the pg_terminate_backend request for backend pid:%d on backend node:%d",backend_pid,backend_node),
147 errdetail("setting the connection flag")));
148
149 pool_set_connection_will_be_terminated(conn);
150 /* It was the pg_terminate_backend call so send the query to appropriate backend */
151 query_context->pg_terminate_backend_conn = conn;
152 pool_force_query_node_to_backend(query_context, backend_node);
153 return true;
154 }
155 return false;
156 }
157
158 /*
159 * Process Query('Q') message
160 * Query messages include an SQL string.
161 */
SimpleQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)162 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
163 POOL_CONNECTION_POOL *backend, int len, char *contents)
164 {
165 static char *sq_config = "pool_status";
166 static char *sq_pools = "pool_pools";
167 static char *sq_processes = "pool_processes";
168 static char *sq_nodes = "pool_nodes";
169 static char *sq_version = "pool_version";
170 static char *sq_cache = "pool_cache";
171 int commit;
172 List *parse_tree_list;
173 Node *node = NULL;
174 POOL_STATUS status;
175 int lock_kind;
176 bool is_likely_select = false;
177 int specific_error = 0;
178
179 POOL_SESSION_CONTEXT *session_context;
180 POOL_QUERY_CONTEXT *query_context;
181
182 bool error;
183
184 /* Get session context */
185 session_context = pool_get_session_context(false);
186
187 /* save last query string for logging purpose */
188 strlcpy(query_string_buffer, contents, sizeof(query_string_buffer));
189
190 /* show ps status */
191 query_ps_status(contents, backend);
192
193 /* log query to log file if necessary */
194 if (pool_config->log_statement)
195 ereport(pool_config->log_statement? LOG: DEBUG1,(errmsg("statement: %s", contents)));
196
197 /*
198 * Fetch memory cache if possible
199 */
200 is_likely_select = pool_is_likely_select(contents);
201
202 /*
203 * If memory query cache enabled and the query seems to be a
204 * SELECT use query cache if possible. However if we are in an
205 * explicit transaction and we had writing query before, we should
206 * not use query cache. This means that even the writing query is
207 * not anything related to the table which is used the SELECT, we
208 * do not use cache. Of course we could analyze the SELECT to see
209 * if it uses the table modified in the transaction, but it will
210 * need parsing query and accessing to system catalog, which will
211 * add significant overhead. Moreover if we are in aborted
212 * transaction, commands should be ignored, so we should not use
213 * query cache.
214 */
215 if (pool_config->memory_cache_enabled && is_likely_select &&
216 !pool_is_writing_transaction() &&
217 TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != 'E')
218 {
219 bool foundp;
220
221 /* If the query is SELECT from table to cache, try to fetch cached result. */
222 status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
223
224 if (status != POOL_CONTINUE)
225 return status;
226
227 if (foundp)
228 {
229 pool_ps_idle_display(backend);
230 pool_set_skip_reading_from_backends();
231 pool_stats_count_up_num_cache_hits();
232 return POOL_CONTINUE;
233 }
234 }
235
236 /* Create query context */
237 query_context = pool_init_query_context();
238 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
239
240 /* parse SQL string */
241 parse_tree_list = raw_parser(contents, &error);
242
243 if (parse_tree_list == NIL)
244 {
245 /* is the query empty? */
246 if (*contents == '\0' || *contents == ';' || error == false)
247 {
248 /*
249 * JBoss sends empty queries for checking connections.
250 * We replace the empty query with SELECT command not
251 * to affect load balance.
252 * [Pgpool-general] Confused about JDBC and load balancing
253 */
254 parse_tree_list = raw_parser(POOL_DUMMY_READ_QUERY, &error);
255 }
256 else
257 {
258 /*
259 * Unable to parse the query. Probably syntax error or the
260 * query is too new and our parser cannot understand. Treat as
261 * if it were an DELETE command. Note that the DELETE command
262 * does not execute, instead the original query will be sent
263 * to backends, which may or may not cause an actual syntax errors.
264 * The command will be sent to all backends in replication mode
265 * or master/primary in master/slave mode.
266 */
267 if (!strcmp(remote_host, "[local]"))
268 {
269 ereport(LOG,
270 (errmsg("Unable to parse the query: \"%s\" from local client", contents)));
271 }
272 else
273 {
274 ereport(LOG,
275 (errmsg("Unable to parse the query: \"%s\" from client %s(%s)", contents, remote_host, remote_port)));
276 }
277 parse_tree_list = raw_parser(POOL_DUMMY_WRITE_QUERY, &error);
278 query_context->is_parse_error = true;
279 }
280 }
281 MemoryContextSwitchTo(old_context);
282
283 if (parse_tree_list != NIL)
284 {
285 /*
286 * XXX: Currently we only process the first element of the parse tree.
287 * rest of multiple statements are silently discarded.
288 */
289 node = (Node *) lfirst(list_head(parse_tree_list));
290 /*
291 * Start query context
292 */
293 pool_start_query(query_context, contents, len, node);
294
295 /*
296 * If the query is DROP DATABASE, after executing it, cache files directory must be discarded.
297 * So we have to get the DB's oid before it will be DROPped.
298 */
299 if (pool_config->memory_cache_enabled && is_drop_database(node))
300 {
301 DropdbStmt *stmt = (DropdbStmt *)node;
302 query_context->dboid = pool_get_database_oid_from_dbname(stmt->dbname);
303 if (query_context->dboid != 0)
304 {
305 ereport(DEBUG1,
306 (errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid)));
307 }
308 }
309
310 /*
311 * Check if multi statement query
312 */
313 if (parse_tree_list && list_length(parse_tree_list) > 1)
314 {
315 query_context->is_multi_statement = true;
316 }
317 else
318 {
319 query_context->is_multi_statement = false;
320 }
321
322 /* check COPY FROM STDIN
323 * if true, set copy_* variable
324 */
325 check_copy_from_stdin(node);
326
327 if (IsA(node, PgpoolVariableShowStmt))
328 {
329 VariableShowStmt *vnode = (VariableShowStmt *)node;
330
331 report_config_variable(frontend, backend, vnode->name);
332
333 pool_ps_idle_display(backend);
334 pool_query_context_destroy(query_context);
335 pool_set_skip_reading_from_backends();
336 return POOL_CONTINUE;
337 }
338
339 if (IsA(node, PgpoolVariableSetStmt))
340 {
341 VariableSetStmt *vnode = (VariableSetStmt *)node;
342 const char *value = NULL;
343 if (vnode->kind == VAR_SET_VALUE)
344 {
345 value = flatten_set_variable_args("name", vnode->args);
346 }
347 if (vnode->kind == VAR_RESET_ALL)
348 {
349 reset_all_variables(frontend, backend);
350 }
351 else
352 {
353 set_config_option_for_session(frontend, backend, vnode->name, value);
354 }
355
356 pool_ps_idle_display(backend);
357 pool_query_context_destroy(query_context);
358 pool_set_skip_reading_from_backends();
359 return POOL_CONTINUE;
360 }
361
362 /* status reporting? */
363 if (IsA(node, VariableShowStmt))
364 {
365 bool is_valid_show_command = false;
366 VariableShowStmt *vnode = (VariableShowStmt *)node;
367
368 if (!strcmp(sq_config, vnode->name))
369 {
370 is_valid_show_command = true;
371 ereport(DEBUG1,
372 (errmsg("SimpleQuery"),
373 errdetail("config reporting")));
374 config_reporting(frontend, backend);
375 }
376 else if (!strcmp(sq_pools, vnode->name))
377 {
378 is_valid_show_command = true;
379 ereport(DEBUG1,
380 (errmsg("SimpleQuery"),
381 errdetail("pools reporting")));
382
383 pools_reporting(frontend, backend);
384 }
385 else if (!strcmp(sq_processes, vnode->name))
386 {
387 is_valid_show_command = true;
388 ereport(DEBUG1,
389 (errmsg("SimpleQuery"),
390 errdetail("processes reporting")));
391 processes_reporting(frontend, backend);
392 }
393 else if (!strcmp(sq_nodes, vnode->name))
394 {
395 is_valid_show_command = true;
396 ereport(DEBUG1,
397 (errmsg("SimpleQuery"),
398 errdetail("nodes reporting")));
399 nodes_reporting(frontend, backend);
400 }
401 else if (!strcmp(sq_version, vnode->name))
402 {
403 is_valid_show_command = true;
404 ereport(DEBUG1,
405 (errmsg("SimpleQuery"),
406 errdetail("version reporting")));
407 version_reporting(frontend, backend);
408 }
409 else if (!strcmp(sq_cache, vnode->name))
410 {
411 is_valid_show_command = true;
412 ereport(DEBUG1,
413 (errmsg("SimpleQuery"),
414 errdetail("cache reporting")));
415 cache_reporting(frontend, backend);
416 }
417
418 if (is_valid_show_command)
419 {
420 pool_ps_idle_display(backend);
421 pool_query_context_destroy(query_context);
422 pool_set_skip_reading_from_backends();
423 return POOL_CONTINUE;
424 }
425 }
426
427 /*
428 * If the table is to be cached, set is_cache_safe TRUE and register table oids.
429 */
430 if (pool_config->memory_cache_enabled && query_context->is_multi_statement == false)
431 {
432 bool is_select_query = false;
433 int num_oids;
434 int *oids;
435 int i;
436
437 /* Check if the query is actually SELECT */
438 if (is_likely_select && IsA(node, SelectStmt))
439 {
440 is_select_query = true;
441 }
442
443 /* Switch the flag of is_cache_safe in session_context */
444 if (is_select_query && !query_context->is_parse_error &&
445 pool_is_allow_to_cache(query_context->parse_tree,
446 query_context->original_query))
447 {
448 pool_set_cache_safe();
449 }
450 else
451 {
452 pool_unset_cache_safe();
453 }
454
455 /*
456 * If table is to be cached and the query is DML, save the table
457 * oid
458 */
459 if (!query_context->is_parse_error)
460 {
461 num_oids = pool_extract_table_oids(node, &oids);
462
463 if (num_oids > 0)
464 {
465 /* Save to oid buffer */
466 for (i=0;i<num_oids;i++)
467 {
468 pool_add_dml_table_oid(oids[i]);
469 }
470 }
471 }
472 }
473
474 /*
475 * pg_terminate function needs special handling, process it if the query
476 * contains one, otherwise use pool_where_to_send() to decide destination
477 * backend node for the query
478 */
479 if (process_pg_terminate_backend_func(query_context) == false)
480 {
481 /*
482 * Decide where to send query
483 */
484 pool_where_to_send(query_context, query_context->original_query,
485 query_context->parse_tree);
486 }
487 /*
488 * if this is DROP DATABASE command, send USR1 signal to parent and
489 * ask it to close all idle connections.
490 * XXX This is overkill. It would be better to close the idle
491 * connection for the database which DROP DATABASE command tries
492 * to drop. This is impossible at this point, since we have no way
493 * to pass such info to other processes.
494 */
495 if (is_drop_database(node))
496 {
497 struct timeval stime;
498
499 stime.tv_usec = 0;
500 stime.tv_sec = 5; /* XXX give arbitrary time to allow
501 * closing idle connections */
502 ereport(DEBUG1,
503 (errmsg("Query: sending SIGUSR1 signal to parent")));
504
505 ignore_sigusr1 = 1; /* disable SIGUSR1 handler */
506 register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, false, 0);
507
508 /*
509 * We need to loop over here since we might get some signals while
510 * sleeping
511 */
512 for (;;)
513 {
514 int sts;
515
516 errno = 0;
517 sts = select(0, NULL, NULL, NULL, &stime);
518 if (stime.tv_usec == 0 && stime.tv_sec == 0)
519 break;
520 if (sts != 0 && errno != EINTR)
521 {
522 elog(DEBUG1, "select(2) returns error: %s", strerror(errno));
523 break;
524 }
525 }
526
527 ignore_sigusr1 = 0;
528 }
529
530 /*
531 * determine if we need to lock the table
532 * to keep SERIAL data consistency among servers
533 * conditions:
534 * - replication is enabled
535 * - protocol is V3
536 * - statement is INSERT
537 * - either "INSERT LOCK" comment exists or insert_lock directive specified
538 */
539 if (!RAW_MODE && !STREAM)
540 {
541 /*
542 * If there's only one node to send the command, there's no
543 * point to start a transaction.
544 */
545 if (pool_multi_node_to_be_sent(query_context))
546 {
547 /* start a transaction if needed */
548 start_internal_transaction(frontend, backend, (Node *)node);
549
550 /* check if need lock */
551 lock_kind = need_insert_lock(backend, contents, node);
552 if (lock_kind)
553 {
554 /* if so, issue lock command */
555 status = insert_lock(frontend, backend, contents, (InsertStmt *)node, lock_kind);
556 if (status != POOL_CONTINUE)
557 {
558 pool_query_context_destroy(query_context);
559 return status;
560 }
561 }
562 }
563 }
564 else if (REPLICATION && contents == NULL && start_internal_transaction(frontend, backend, node))
565 {
566 pool_query_context_destroy(query_context);
567 return POOL_ERROR;
568 }
569 }
570
571 if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
572 {
573 int i;
574
575 for (i=0;i<NUM_BACKENDS;i++)
576 {
577 if(VALID_BACKEND(i))
578 TSTATE(backend, i) = 'T';
579 }
580 }
581
582 if (node)
583 {
584 POOL_SENT_MESSAGE *msg = NULL;
585
586 if (IsA(node, PrepareStmt))
587 {
588 msg = pool_create_sent_message('Q', len, contents, 0,
589 ((PrepareStmt *)node)->name,
590 query_context);
591 session_context->uncompleted_message = msg;
592 }
593 else
594 session_context->uncompleted_message = NULL;
595 }
596
597
598 if (!RAW_MODE)
599 {
600 /* check if query is "COMMIT" or "ROLLBACK" */
601 commit = is_commit_or_rollback_query(node);
602
603 /*
604 * Query is not commit/rollback
605 */
606 if (!commit)
607 {
608 char *rewrite_query = NULL;
609
610 if (node)
611 {
612 POOL_SENT_MESSAGE *msg = NULL;
613
614 if (IsA(node, PrepareStmt))
615 {
616 msg = session_context->uncompleted_message;
617 }
618 else if (IsA(node, ExecuteStmt))
619 {
620 msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
621 if (!msg)
622 msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
623 }
624
625 /* rewrite `now()' to timestamp literal */
626 if (!is_select_query(node, query_context->original_query) ||
627 pool_has_function_call(node) || pool_config->replicate_select)
628 rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg);
629
630 /*
631 * If the query is BEGIN READ WRITE or
632 * BEGIN ... SERIALIZABLE in master/slave mode,
633 * we send BEGIN to slaves/standbys instead.
634 * original_query which is BEGIN READ WRITE is sent to primary.
635 * rewritten_query which is BEGIN is sent to standbys.
636 */
637 if (pool_need_to_treat_as_if_default_transaction(query_context))
638 {
639 rewrite_query = pstrdup("BEGIN");
640 }
641
642 if (rewrite_query != NULL)
643 {
644 query_context->rewritten_query = rewrite_query;
645 query_context->rewritten_length = strlen(rewrite_query) + 1;
646 }
647 }
648
649 /*
650 * Optimization effort: If there's only one session, we do
651 * not need to wait for the master node's response, and
652 * could execute the query concurrently.
653 */
654 if (pool_config->num_init_children == 1)
655 {
656 /* Send query to all DB nodes at once */
657 status = pool_send_and_wait(query_context, 0, 0);
658 /* free_parser(); */
659 return status;
660 }
661
662 /* Send the query to master node */
663 pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
664
665 /* Check specific errors */
666 specific_error = check_errors(backend, MASTER_NODE_ID);
667 if (specific_error)
668 {
669 /* log error message */
670 generate_error_message("SimpleQuery: ", specific_error, contents);
671 }
672 }
673
674 if (specific_error)
675 {
676 char msg[1024] = POOL_ERROR_QUERY; /* large enough */
677 int len = strlen(msg) + 1;
678
679 memset(msg + len, 0, sizeof(int));
680
681 /* send query to other nodes */
682 query_context->rewritten_query = msg;
683 query_context->rewritten_length = len;
684 pool_send_and_wait(query_context, -1, MASTER_NODE_ID);
685 }
686 else
687 {
688 /*
689 * Send the query to other than master node.
690 */
691 pool_send_and_wait(query_context, -1, MASTER_NODE_ID);
692 }
693
694 /* Send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
695 if (commit)
696 pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
697 }
698 else
699 {
700 pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
701 }
702
703 return POOL_CONTINUE;
704 }
705
706 /*
707 * process EXECUTE (V3 only)
708 */
Execute(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)709 POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
710 int len, char *contents)
711 {
712 int commit = 0;
713 char *query = NULL;
714 Node *node;
715 int specific_error = 0;
716 POOL_SESSION_CONTEXT *session_context;
717 POOL_QUERY_CONTEXT *query_context;
718 POOL_SENT_MESSAGE *bind_msg;
719 bool foundp = false;
720
721 /* Get session context */
722 session_context = pool_get_session_context(false);
723
724 ereport(DEBUG2,
725 (errmsg("Execute: portal name <%s>", contents)));
726
727 bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
728 if (!bind_msg)
729 ereport(FATAL,
730 (return_code(2),
731 errmsg("unable to Execute"),
732 errdetail("unable to get bind message")));
733
734 if(!bind_msg->query_context)
735 ereport(FATAL,
736 (return_code(2),
737 errmsg("unable to Execute"),
738 errdetail("unable to get query context")));
739
740 if (!bind_msg->query_context->original_query)
741 ereport(FATAL,
742 (return_code(2),
743 errmsg("unable to Execute"),
744 errdetail("unable to get original query")));
745
746 if (!bind_msg->query_context->parse_tree)
747 ereport(FATAL,
748 (return_code(2),
749 errmsg("unable to Execute"),
750 errdetail("unable to get parse tree")));
751
752 query_context = bind_msg->query_context;
753 node = bind_msg->query_context->parse_tree;
754 query = bind_msg->query_context->original_query;
755
756 strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
757
758 ereport(DEBUG2,(errmsg("Execute: query string = <%s>", query)));
759
760 ereport(DEBUG1,(errmsg("Execute: pool_is_likely_select: %d pool_is_writing_transaction: %d TSTATE: %c",
761 pool_is_likely_select(query), pool_is_writing_transaction(),
762 TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))));
763
764 /*
765 * Fetch memory cache if possible
766 */
767 if (pool_config->memory_cache_enabled && pool_is_likely_select(query) &&
768 !pool_is_writing_transaction() &&
769 (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != 'E'))
770 {
771 POOL_STATUS status;
772 char *search_query = NULL;
773 int len;
774
775 #define STR_ALLOC_SIZE 1024
776
777 len = strlen(query)+1;
778 search_query = MemoryContextStrdup(query_context->memory_context,query);
779
780 ereport(DEBUG1,(errmsg("Execute: checkig cache fetch condition")));
781
782 /*
783 * Add bind message's info to query to search.
784 */
785 if (query_context->is_cache_safe && bind_msg->param_offset && bind_msg->contents)
786 {
787 /* Extract binary contents from bind message */
788 char *query_in_bind_msg = bind_msg->contents + bind_msg->param_offset;
789 char hex_str[4]; /* 02X chars + white space + null end */
790 int i;
791 int alloc_len;
792
793 alloc_len = (len/STR_ALLOC_SIZE+1)*STR_ALLOC_SIZE;
794 search_query = repalloc(search_query, alloc_len);
795
796 for (i = 0; i < bind_msg->len - bind_msg->param_offset; i++)
797 {
798 int hexlen;
799
800 snprintf(hex_str, sizeof(hex_str), (i == 0) ? " %02X" : "%02X", 0xff & query_in_bind_msg[i]);
801 hexlen = strlen(hex_str);
802
803 if ((len+hexlen) >= alloc_len)
804 {
805 alloc_len += STR_ALLOC_SIZE;
806 search_query = repalloc(search_query, alloc_len);
807 }
808 strcat(search_query, hex_str);
809 len += hexlen;
810 }
811
812 /*
813 * If bind message is sent again to an existing prepared statement,
814 * it is possible that query_w_hex remains. Before setting newly
815 * allocated query_w_hex's pointer to the query context, free the
816 * previously allocated memory.
817 */
818 if (query_context->query_w_hex)
819 {
820 pfree(query_context->query_w_hex);
821 }
822 query_context->query_w_hex = search_query;
823
824 /*
825 * When a transaction is committed, query_context->temp_cache->query is used
826 * to create md5 hash to search for query cache.
827 * So overwrite the query text in temp cache to the one with the hex of bind message.
828 * If not, md5 hash will be created by the query text without bind message, and
829 * it will happen to find cache never or to get a wrong result.
830 *
831 * However, It is possible that temp_cache does not exist.
832 * Consider following scenario:
833 * - In the previous execute cache is overflowed, and
834 * temp_cache discarded.
835 * - In the subsequent bind/execute uses the same portal
836 */
837 if (query_context->temp_cache)
838 {
839 pfree(query_context->temp_cache->query);
840 query_context->temp_cache->query = MemoryContextStrdup(session_context->memory_context,search_query);
841 }
842 }
843
844 /* If the query is SELECT from table to cache, try to fetch cached result. */
845 status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
846
847 if (status != POOL_CONTINUE)
848 return status;
849
850 if (foundp)
851 {
852 #ifdef DEBUG
853 extern bool stop_now;
854 #endif
855 pool_ps_idle_display(backend);
856 pool_stats_count_up_num_cache_hits();
857 query_context->skip_cache_commit = true;
858 #ifdef DEBUG
859 stop_now = true;
860 #endif
861
862 if (!STREAM || !pool_is_doing_extended_query_message())
863 {
864 pool_set_skip_reading_from_backends();
865 pool_stats_count_up_num_cache_hits();
866 pool_unset_query_in_progress();
867 return POOL_CONTINUE;
868 }
869 }
870 else
871 query_context->skip_cache_commit = false;
872 }
873
874 session_context->query_context = query_context;
875 /*
876 * Calling pool_where_to_send here is dangerous because the node
877 * parse/bind has been sent could be change by
878 * pool_where_to_send() and it leads to "portal not found"
879 * etc. errors.
880 */
881
882 /* check if query is "COMMIT" or "ROLLBACK" */
883 commit = is_commit_or_rollback_query(node);
884
885 if (!STREAM)
886 {
887 /*
888 * Query is not commit/rollback
889 */
890 if (!commit)
891 {
892 /* Send the query to master node */
893 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, false);
894
895 /* Check specific errors */
896 specific_error = check_errors(backend, MASTER_NODE_ID);
897 if (specific_error)
898 {
899 /* log error message */
900 generate_error_message("Execute: ", specific_error, contents);
901 }
902 }
903
904 if (specific_error)
905 {
906 char msg[1024] = "pgpool_error_portal"; /* large enough */
907 int len = strlen(msg);
908
909 memset(msg + len, 0, sizeof(int));
910
911 /* send query to other nodes */
912 pool_extended_send_and_wait(query_context, "E", len, msg, -1, MASTER_NODE_ID, false);
913 }
914 else
915 {
916 pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, false);
917 }
918
919 /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
920 if (commit)
921 {
922 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, false);
923 }
924 }
925 else /* streaming replication mode */
926 {
927 POOL_PENDING_MESSAGE *pmsg;
928
929 if (!foundp)
930 {
931 pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, true);
932 pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, true);
933 }
934
935 /* Add pending message */
936 pmsg = pool_pending_message_create('E', len, contents);
937 pool_pending_message_dest_set(pmsg, query_context);
938 pool_pending_message_query_set(pmsg, query_context);
939 pool_pending_message_add(pmsg);
940 pool_pending_message_free_pending_message(pmsg);
941
942 /* Various take care at the transaction start */
943 handle_query_context(backend);
944
945 /*
946 * Take care of "writing transaction" flag.
947 */
948 if (!is_select_query(node, query) && !is_start_transaction_query(node) &&
949 !is_commit_or_rollback_query(node))
950 {
951 ereport(DEBUG1,
952 (errmsg("Execute: TSTATE:%c",
953 TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))));
954 /*
955 * If the query was not READ SELECT, and we are in an
956 * explicit transaction, remember that we had a write
957 * query in this transaction.
958 */
959 if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
960 {
961 /* However, if the query is "SET TRANSACTION READ ONLY" or its variant,
962 * don't set it.
963 */
964 if (!pool_is_transaction_read_only(node))
965 {
966 pool_set_writing_transaction();
967 }
968 }
969 }
970 pool_unset_query_in_progress();
971 }
972
973 return POOL_CONTINUE;
974 }
975
976 /*
977 * process Parse (V3 only)
978 */
Parse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)979 POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
980 int len, char *contents)
981 {
982 int deadlock_detected = 0;
983 int insert_stmt_with_lock = 0;
984 char *name;
985 char *stmt;
986 List *parse_tree_list;
987 Node *node = NULL;
988 POOL_SENT_MESSAGE *msg;
989 POOL_STATUS status;
990 POOL_SESSION_CONTEXT *session_context;
991 POOL_QUERY_CONTEXT *query_context;
992
993 bool error;
994
995 /* Get session context */
996 session_context = pool_get_session_context(false);
997
998 /* Create query context */
999 query_context = pool_init_query_context();
1000
1001 ereport(DEBUG1,
1002 (errmsg("Parse: statement name <%s>", contents)));
1003
1004 name = contents;
1005 stmt = contents + strlen(contents) + 1;
1006
1007 /* parse SQL string */
1008 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
1009 parse_tree_list = raw_parser(stmt, &error);
1010
1011 if (parse_tree_list == NIL)
1012 {
1013 /* is the query empty? */
1014 if (*stmt == '\0' || *stmt == ';' || error == false)
1015 {
1016 /*
1017 * JBoss sends empty queries for checking connections.
1018 * We replace the empty query with SELECT command not
1019 * to affect load balance.
1020 * [Pgpool-general] Confused about JDBC and load balancing
1021 */
1022 parse_tree_list = raw_parser(POOL_DUMMY_READ_QUERY, &error);
1023 }
1024 else
1025 {
1026 /*
1027 * Unable to parse the query. Probably syntax error or the
1028 * query is too new and our parser cannot understand. Treat as
1029 * if it were an DELETE command. Note that the DELETE command
1030 * does not execute, instead the original query will be sent
1031 * to backends, which may or may not cause an actual syntax errors.
1032 * The command will be sent to all backends in replication mode
1033 * or master/primary in master/slave mode.
1034 */
1035 if (!strcmp(remote_host, "[local]"))
1036 {
1037 ereport(LOG,
1038 (errmsg("Unable to parse the query: \"%s\" from local client", stmt)));
1039 }
1040 else
1041 {
1042 ereport(LOG,
1043 (errmsg("Unable to parse the query: \"%s\" from client %s(%s)", stmt, remote_host, remote_port)));
1044 }
1045 parse_tree_list = raw_parser(POOL_DUMMY_WRITE_QUERY, &error);
1046 query_context->is_parse_error = true;
1047 }
1048 }
1049 MemoryContextSwitchTo(old_context);
1050
1051 if (parse_tree_list != NIL)
1052 {
1053 /* Save last query string for logging purpose */
1054 snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
1055
1056 node = (Node *) lfirst(list_head(parse_tree_list));
1057
1058 /* If replication mode, check to see what kind of insert lock is
1059 * neccessary.
1060 */
1061 if (REPLICATION)
1062 insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
1063
1064 /*
1065 * Start query context
1066 */
1067 pool_start_query(query_context, stmt, strlen(stmt) + 1, node);
1068
1069 msg = pool_create_sent_message('P', len, contents, 0, name, query_context);
1070
1071 session_context->uncompleted_message = msg;
1072
1073 /*
1074 * If the table is to be cached, set is_cache_safe TRUE and register table oids.
1075 */
1076 if (pool_config->memory_cache_enabled)
1077 {
1078 bool is_likely_select = false;
1079 bool is_select_query = false;
1080 int num_oids;
1081 int *oids;
1082 int i;
1083
1084 /* Check if the query is actually SELECT */
1085 is_likely_select = pool_is_likely_select(query_context->original_query);
1086 if (is_likely_select && IsA(node, SelectStmt))
1087 {
1088 is_select_query = true;
1089 }
1090
1091 /* Switch the flag of is_cache_safe in session_context */
1092 if (is_select_query && !query_context->is_parse_error &&
1093 pool_is_allow_to_cache(query_context->parse_tree,
1094 query_context->original_query))
1095 {
1096 pool_set_cache_safe();
1097 }
1098 else
1099 {
1100 pool_unset_cache_safe();
1101 }
1102
1103 /* If table is to be cached and the query is DML, save the table oid */
1104 if (!is_select_query && !query_context->is_parse_error)
1105 {
1106 num_oids = pool_extract_table_oids(node, &oids);
1107
1108 if (num_oids > 0)
1109 {
1110 /* Save to oid buffer */
1111 for (i=0;i<num_oids;i++)
1112 {
1113 pool_add_dml_table_oid(oids[i]);
1114 }
1115 }
1116 }
1117 }
1118
1119 /*
1120 * Decide where to send query
1121 */
1122 pool_where_to_send(query_context, query_context->original_query,
1123 query_context->parse_tree);
1124
1125 if (REPLICATION)
1126 {
1127 char *rewrite_query;
1128 bool rewrite_to_params = true;
1129
1130 /*
1131 * rewrite `now()'.
1132 * if stmt is unnamed, we rewrite `now()' to timestamp constant.
1133 * else we rewrite `now()' to params and expand that at Bind
1134 * message.
1135 */
1136 if (*name == '\0')
1137 rewrite_to_params = false;
1138 msg->num_tsparams = 0;
1139 rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg);
1140 if (rewrite_query != NULL)
1141 {
1142 int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
1143 /* switch memory context */
1144 MemoryContext oldcxt = MemoryContextSwitchTo(session_context->memory_context);
1145 contents = repalloc(msg->contents,alloc_len);
1146 MemoryContextSwitchTo(oldcxt);
1147
1148 strcpy(contents, name);
1149 strcpy(contents + strlen(name) + 1, rewrite_query);
1150 memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
1151 stmt + strlen(stmt) + 1,
1152 len - (strlen(name) + strlen(stmt) + 2));
1153
1154 len = alloc_len;
1155 name = contents;
1156 stmt = contents + strlen(name) + 1;
1157 ereport(DEBUG1,
1158 (errmsg("Parse: rewrite query name:\"%s\" statement:\"%s\" len=%d", name, stmt, len)));
1159
1160 msg->len = len;
1161 msg->contents = contents;
1162
1163 query_context->rewritten_query = rewrite_query;
1164 }
1165 }
1166
1167 /*
1168 * If the query is BEGIN READ WRITE in master/slave mode,
1169 * we send BEGIN instead of it to slaves/standbys.
1170 * original_query which is BEGIN READ WRITE is sent to primary.
1171 * rewritten_query which is BEGIN is sent to standbys.
1172 */
1173 if (is_start_transaction_query(query_context->parse_tree) &&
1174 is_read_write((TransactionStmt *)query_context->parse_tree) &&
1175 MASTER_SLAVE)
1176 {
1177 query_context->rewritten_query = pstrdup("BEGIN");
1178 }
1179 }
1180
1181 /*
1182 * If in replication mode, send "SYNC" message if not in a transaction.
1183 */
1184 if (REPLICATION)
1185 {
1186 char kind;
1187
1188 if (TSTATE(backend, MASTER_NODE_ID) != 'T')
1189 {
1190 int i;
1191
1192 /* synchronize transaction state */
1193 for (i = 0; i < NUM_BACKENDS; i++)
1194 {
1195 if (!VALID_BACKEND(i))
1196 continue;
1197 /* send sync message */
1198 send_extended_protocol_message(backend, i, "S", 0, "");
1199 }
1200
1201 kind = pool_read_kind(backend);
1202 if (kind != 'Z')
1203 ereport(ERROR,
1204 (errmsg("unable to parse the query"),
1205 errdetail("invalid read kind")));
1206
1207 /*
1208 * SYNC message returns "Ready for Query" message.
1209 */
1210 if (ReadyForQuery(frontend, backend, 0, false) != POOL_CONTINUE)
1211 {
1212 pool_query_context_destroy(query_context);
1213 return POOL_END;
1214 }
1215
1216 /*
1217 * set in_progress flag, because ReadyForQuery unset it.
1218 * in_progress flag influences VALID_BACKEND.
1219 */
1220 if (!pool_is_query_in_progress())
1221 pool_set_query_in_progress();
1222 }
1223
1224 if (is_strict_query(query_context->parse_tree))
1225 {
1226 start_internal_transaction(frontend, backend, query_context->parse_tree);
1227 allow_close_transaction = 1;
1228 }
1229
1230 if (insert_stmt_with_lock)
1231 {
1232 /* start a transaction if needed and lock the table */
1233 status = insert_lock(frontend, backend, stmt, (InsertStmt *)query_context->parse_tree, insert_stmt_with_lock);
1234 if (status != POOL_CONTINUE)
1235 ereport(ERROR,
1236 (errmsg("unable to parse the query"),
1237 errdetail("unable to get insert lock")));
1238
1239 }
1240 }
1241
1242 if (REPLICATION || SLONY)
1243 {
1244 /*
1245 * We must synchronize because Parse message acquires table
1246 * locks.
1247 */
1248 ereport(DEBUG1,
1249 (errmsg("Parse: waiting for master completing the query")));
1250 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, false);
1251
1252
1253 /*
1254 * We must check deadlock error because a aborted transaction
1255 * by detecting deadlock isn't same on all nodes.
1256 * If a transaction is aborted on master node, pgpool send a
1257 * error query to another nodes.
1258 */
1259 deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1260 /*
1261 * Check if other than deadlock error detected. If so, emit
1262 * log. This is useful when invalid encoding error occurs. In
1263 * this case, PostgreSQL does not report what statement caused
1264 * that error and make users confused.
1265 */
1266 per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
1267
1268 if (deadlock_detected)
1269 {
1270 POOL_QUERY_CONTEXT *error_qc;
1271
1272 error_qc = pool_init_query_context();
1273
1274
1275 pool_start_query(error_qc, POOL_ERROR_QUERY, strlen(POOL_ERROR_QUERY) + 1, node);
1276 pool_copy_prep_where(query_context->where_to_send, error_qc->where_to_send);
1277
1278 ereport(LOG,
1279 (errmsg("Parse: received deadlock error message from master node")));
1280
1281 pool_send_and_wait(error_qc, -1, MASTER_NODE_ID);
1282
1283 pool_query_context_destroy(error_qc);
1284
1285
1286 pool_set_query_in_progress();
1287 session_context->query_context = query_context;
1288 }
1289 else
1290 {
1291 pool_extended_send_and_wait(query_context, "P", len, contents, -1, MASTER_NODE_ID, false);
1292 }
1293 }
1294 else if (STREAM)
1295 {
1296 POOL_PENDING_MESSAGE *pmsg;
1297
1298 /* XXX fix me:even with streaming replication mode, couldn't we have a deadlock */
1299 pool_set_query_in_progress();
1300 #ifdef NOT_USED
1301 pool_clear_sync_map();
1302 #endif
1303 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, true);
1304 pool_extended_send_and_wait(query_context, "P", len, contents, -1, MASTER_NODE_ID, true);
1305 pool_add_sent_message(session_context->uncompleted_message);
1306
1307 /* Add pending message */
1308 pmsg = pool_pending_message_create('P', len, contents);
1309 pool_pending_message_dest_set(pmsg, query_context);
1310 pool_pending_message_add(pmsg);
1311 pool_pending_message_free_pending_message(pmsg);
1312
1313 pool_unset_query_in_progress();
1314 }
1315 else
1316 {
1317 pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, false);
1318 }
1319
1320 return POOL_CONTINUE;
1321
1322 }
1323
Bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1324 POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1325 int len, char *contents)
1326 {
1327 char *pstmt_name;
1328 char *portal_name;
1329 char *rewrite_msg = NULL;
1330 POOL_SENT_MESSAGE *parse_msg;
1331 POOL_SENT_MESSAGE *bind_msg;
1332 POOL_SESSION_CONTEXT *session_context;
1333 POOL_QUERY_CONTEXT *query_context;
1334 int insert_stmt_with_lock = 0;
1335 bool nowait;
1336
1337 /* Get session context */
1338 session_context = pool_get_session_context(false);
1339 /*
1340 * Rewrite message
1341 */
1342 portal_name = contents;
1343 pstmt_name = contents + strlen(portal_name) + 1;
1344
1345 parse_msg = pool_get_sent_message('Q', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1346 if (!parse_msg)
1347 parse_msg = pool_get_sent_message('P', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1348 if (!parse_msg)
1349 {
1350 ereport(FATAL,
1351 (errmsg("unable to bind"),
1352 errdetail("cannot get parse message \"%s\"", pstmt_name)));
1353 }
1354
1355 bind_msg = pool_create_sent_message('B', len, contents,
1356 parse_msg->num_tsparams, portal_name,
1357 parse_msg->query_context);
1358
1359 query_context = parse_msg->query_context;
1360 if (!query_context)
1361 {
1362 ereport(FATAL,
1363 (errmsg("unable to bind"),
1364 errdetail("cannot get the query context")));
1365 }
1366
1367 /*
1368 * If the query can be cached, save its offset of query text in bind message's content.
1369 */
1370 if (query_context->is_cache_safe)
1371 {
1372 bind_msg->param_offset = sizeof(char) * (strlen(portal_name) + strlen(pstmt_name) + 2);
1373 }
1374
1375 session_context->uncompleted_message = bind_msg;
1376
1377 /* rewrite bind message */
1378 if (REPLICATION && bind_msg->num_tsparams > 0)
1379 {
1380 rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len);
1381 if (rewrite_msg != NULL)
1382 contents = rewrite_msg;
1383 }
1384
1385 session_context->query_context = query_context;
1386
1387 /*
1388 * Take care the case when the previous parse message has been sent to
1389 * other than primary node. In this case, we send a parse message to the
1390 * primary node.
1391 */
1392 if (pool_config->load_balance_mode && pool_is_writing_transaction() &&
1393 TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
1394 {
1395 if (!STREAM)
1396 {
1397 pool_where_to_send(query_context, query_context->original_query,
1398 query_context->parse_tree);
1399 }
1400
1401 if (parse_before_bind(frontend, backend, parse_msg, bind_msg) != POOL_CONTINUE)
1402 return POOL_END;
1403 }
1404
1405 /*
1406 * Start a transaction if necessary in replication mode
1407 */
1408 if (REPLICATION)
1409 {
1410 ereport(DEBUG1,
1411 (errmsg("Bind: checking strict query")));
1412 if (is_strict_query(query_context->parse_tree))
1413 {
1414 ereport(DEBUG1,
1415 (errmsg("Bind: strict query")));
1416 start_internal_transaction(frontend, backend, query_context->parse_tree);
1417 allow_close_transaction = 1;
1418 }
1419
1420 ereport(DEBUG1,
1421 (errmsg("Bind: checking insert lock")));
1422 insert_stmt_with_lock = need_insert_lock(backend, query_context->original_query, query_context->parse_tree);
1423 if (insert_stmt_with_lock)
1424 {
1425 ereport(DEBUG1,
1426 (errmsg("Bind: issuing insert lock")));
1427 /* issue a LOCK command to keep consistency */
1428 if (insert_lock(frontend, backend, query_context->original_query, (InsertStmt *)query_context->parse_tree, insert_stmt_with_lock) != POOL_CONTINUE)
1429 {
1430 pool_query_context_destroy(query_context);
1431 return POOL_END;
1432 }
1433 }
1434 }
1435
1436 ereport(DEBUG1,
1437 (errmsg("Bind: waiting for master completing the query")));
1438
1439 pool_set_query_in_progress();
1440
1441 if (STREAM)
1442 {
1443 nowait = true;
1444 session_context->query_context = query_context = bind_msg->query_context;
1445 }
1446 else
1447 nowait = false;
1448
1449 pool_extended_send_and_wait(query_context, "B", len, contents, 1, MASTER_NODE_ID, nowait);
1450 pool_extended_send_and_wait(query_context, "B", len, contents, -1, MASTER_NODE_ID, nowait);
1451
1452 if (STREAM)
1453 {
1454 POOL_PENDING_MESSAGE *pmsg;
1455
1456 pool_unset_query_in_progress();
1457 pool_add_sent_message(session_context->uncompleted_message);
1458
1459 /* Add pending message */
1460 pmsg = pool_pending_message_create('B', len, contents);
1461 pool_pending_message_dest_set(pmsg, query_context);
1462 pool_pending_message_query_set(pmsg, query_context);
1463 pool_pending_message_add(pmsg);
1464 pool_pending_message_free_pending_message(pmsg);
1465 }
1466
1467 if(rewrite_msg)
1468 pfree(rewrite_msg);
1469 return POOL_CONTINUE;
1470 }
1471
Describe(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1472 POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1473 int len, char *contents)
1474 {
1475 POOL_SENT_MESSAGE *msg;
1476 POOL_SESSION_CONTEXT *session_context;
1477 POOL_QUERY_CONTEXT *query_context;
1478
1479 bool nowait;
1480
1481 /* Get session context */
1482 session_context = pool_get_session_context(false);
1483
1484 /* Prepared Statement */
1485 if (*contents == 'S')
1486 {
1487 msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
1488 if (!msg)
1489 msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
1490 if (!msg)
1491 ereport(FATAL,
1492 (return_code(2),
1493 errmsg("unable to execute Describe"),
1494 errdetail("unable to get the parse message")));
1495 }
1496 /* Portal */
1497 else
1498 {
1499 msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
1500 if (!msg)
1501 ereport(FATAL,
1502 (return_code(2),
1503 errmsg("unable to execute Describe"),
1504 errdetail("unable to get the bind message")));
1505 }
1506
1507 query_context = msg->query_context;
1508
1509 if (query_context == NULL)
1510 ereport(FATAL,
1511 (return_code(2),
1512 errmsg("unable to execute Describe"),
1513 errdetail("unable to get the query context")));
1514
1515 session_context->query_context = query_context;
1516
1517 /*
1518 * Calling pool_where_to_send here is dangerous because the node
1519 * parse/bind has been sent could be change by
1520 * pool_where_to_send() and it leads to "portal not found"
1521 * etc. errors.
1522 */
1523 ereport(DEBUG1,
1524 (errmsg("Describe: waiting for master completing the query")));
1525
1526 nowait = (STREAM? true: false);
1527
1528 pool_set_query_in_progress();
1529 pool_extended_send_and_wait(query_context, "D", len, contents, 1, MASTER_NODE_ID, nowait);
1530 pool_extended_send_and_wait(query_context, "D", len, contents, -1, MASTER_NODE_ID, nowait);
1531
1532 if (STREAM)
1533 {
1534 POOL_PENDING_MESSAGE *pmsg;
1535
1536 /* Add pending message */
1537 pmsg = pool_pending_message_create('D', len, contents);
1538 pool_pending_message_dest_set(pmsg, query_context);
1539 pool_pending_message_query_set(pmsg, query_context);
1540 pool_pending_message_add(pmsg);
1541 pool_pending_message_free_pending_message(pmsg);
1542
1543 pool_unset_query_in_progress();
1544 }
1545
1546 return POOL_CONTINUE;
1547 }
1548
1549
Close(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1550 POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1551 int len, char *contents)
1552 {
1553 POOL_SENT_MESSAGE *msg;
1554 POOL_SESSION_CONTEXT *session_context;
1555 POOL_QUERY_CONTEXT *query_context;
1556
1557 /* Get session context */
1558 session_context = pool_get_session_context(false);
1559
1560 /* Prepared Statement */
1561 if (*contents == 'S')
1562 {
1563 msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
1564 if (!msg)
1565 msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
1566 }
1567 /* Portal */
1568 else if (*contents == 'P')
1569 {
1570 msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
1571 }
1572 else
1573 ereport(FATAL,
1574 (return_code(2),
1575 errmsg("unable to execute close, invalid message")));
1576 /*
1577 * As per the postgresql, calling close on non existing portals or
1578 * statements is not an error. So on the same footings we will ignore all
1579 * such calls and return the close complete message to clients with out
1580 * going to backend
1581 */
1582 if (!msg)
1583 {
1584 int len = htonl(4);
1585 pool_set_command_success();
1586 pool_unset_query_in_progress();
1587
1588 pool_write(frontend, "3", 1);
1589 pool_write_and_flush(frontend, &len, sizeof(len));
1590
1591 return POOL_CONTINUE;
1592 }
1593
1594 session_context->uncompleted_message = msg;
1595 query_context = msg->query_context;
1596
1597 if (!query_context)
1598 ereport(FATAL,
1599 (return_code(2),
1600 errmsg("unable to execute close"),
1601 errdetail("unable to get the query context")));
1602
1603 session_context->query_context = query_context;
1604 /* pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); */
1605
1606 ereport(DEBUG1,
1607 (errmsg("Close: waiting for master completing the query")));
1608
1609 pool_set_query_in_progress();
1610
1611 if (!STREAM)
1612 {
1613 pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, false);
1614 pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, false);
1615 }
1616 else
1617 {
1618 POOL_PENDING_MESSAGE *pmsg;
1619 bool where_to_send_save[MAX_NUM_BACKENDS];
1620
1621 /* Parse_before_bind() may have sent a bind message to the primary
1622 * node id. So send the close message to the primary node as well.
1623 * Even if not, sending a close message for non existing
1624 * statement/portal is harmless. No error will happen.
1625 */
1626 if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1627 {
1628 /* save where_to_send map */
1629 memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
1630
1631 query_context->where_to_send[PRIMARY_NODE_ID] = true;
1632 query_context->where_to_send[session_context->load_balance_node_id] = true;
1633 }
1634
1635 pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, true);
1636 pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, true);
1637
1638 /* Add pending message */
1639 pmsg = pool_pending_message_create('C', len, contents);
1640 pool_pending_message_dest_set(pmsg, query_context);
1641 pool_pending_message_query_set(pmsg, query_context);
1642 pool_pending_message_add(pmsg);
1643 pool_pending_message_free_pending_message(pmsg);
1644
1645 if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1646 {
1647 /* Restore where_to_send map */
1648 memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));
1649 }
1650
1651 #ifdef NOT_USED
1652 dump_pending_message();
1653 #endif
1654 pool_unset_query_in_progress();
1655
1656 /*
1657 * Remove sent message
1658 */
1659 ereport(DEBUG1,
1660 (errmsg("Close: removing sent message %c %s", *contents, contents+1)));
1661 pool_set_sent_message_state(msg);
1662 }
1663
1664 return POOL_CONTINUE;
1665 }
1666
1667
FunctionCall3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1668 POOL_STATUS FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1669 int len, char *contents)
1670 {
1671 /*
1672 * If Function call message for lo_creat, rewrite it
1673 */
1674 char *rewrite_lo;
1675 int rewrite_len;
1676
1677 rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
1678 backend, &rewrite_len);
1679
1680 if (rewrite_lo != NULL)
1681 {
1682 contents = rewrite_lo;
1683 len = rewrite_len;
1684 }
1685 return SimpleForwardToBackend('F', frontend, backend, len, contents);
1686 }
1687
1688 /*
1689 * Process ReadyForQuery('Z') message.
1690 * If send_ready is true, send 'Z' message to frontend.
1691 * If cache_commit is true, commit or discard query cache according to
1692 * transaction state.
1693 *
1694 * - if the error status "mismatch_ntuples" is set, send an error query
1695 * to all DB nodes to abort transaction or do failover.
1696 * - internal transaction is closed
1697 */
ReadyForQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool send_ready,bool cache_commit)1698 POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
1699 POOL_CONNECTION_POOL *backend, bool send_ready, bool cache_commit)
1700 {
1701 int i;
1702 int len;
1703 signed char kind;
1704 signed char state = 0;
1705 POOL_SESSION_CONTEXT *session_context;
1706 Node *node = NULL;
1707 char *query = NULL;
1708 bool got_estate = false;
1709
1710 /*
1711 * It is possible that the "ignore until sync is received" flag was set if
1712 * we send sync to backend and the backend returns error. Let's reset the
1713 * flag unconditionally because we apparently have received a "ready for
1714 * query" message from backend.
1715 */
1716 pool_unset_ignore_till_sync();
1717
1718 /* Reset previous message */
1719 pool_pending_message_reset_previous_message();
1720
1721 /* Get session context */
1722 session_context = pool_get_session_context(false);
1723
1724 /*
1725 * If the numbers of update tuples are differ and
1726 * failover_if_affected_tuples_mismatch is false, we abort
1727 * transactions by using do_error_command. If
1728 * failover_if_affected_tuples_mismatch is true, trigger failover.
1729 * This only works with PROTO_MAJOR_V3.
1730 */
1731 if (session_context->mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1732 {
1733 int i;
1734 char kind;
1735
1736 /*
1737 * If failover_if_affected_tuples_mismatch, is true, then
1738 * decide victim nodes by using find_victim_nodes and
1739 * degenerate them.
1740 */
1741 if (pool_config->failover_if_affected_tuples_mismatch)
1742 {
1743 int *victim_nodes;
1744 int number_of_nodes;
1745 char msgbuf[128];
1746
1747 victim_nodes = find_victim_nodes(session_context->ntuples, NUM_BACKENDS,
1748 MASTER_NODE_ID, &number_of_nodes);
1749 if (victim_nodes)
1750 {
1751 int i;
1752 String *msg;
1753
1754 msg = init_string("ReadyForQuery: Degenerate backends:");
1755
1756 for (i=0;i<number_of_nodes;i++)
1757 {
1758 snprintf(msgbuf, sizeof(msgbuf), " %d", victim_nodes[i]);
1759 string_append_char(msg, msgbuf);
1760 }
1761 ereport(LOG,
1762 (errmsg("processing ready for query message"),
1763 errdetail("%s", msg->data)));
1764
1765 free_string(msg);
1766
1767 msg = init_string("ReadyForQuery: Number of affected tuples are:");
1768
1769 for (i=0;i<NUM_BACKENDS;i++)
1770 {
1771 snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
1772 string_append_char(msg, msgbuf);
1773 }
1774 ereport(LOG,
1775 (errmsg("processing ready for query message"),
1776 errdetail("%s", msg->data)));
1777
1778 free_string(msg);
1779
1780 degenerate_backend_set(victim_nodes, number_of_nodes, true, 0);
1781 child_exit(POOL_EXIT_AND_RESTART);
1782 }
1783 else
1784 {
1785 ereport(LOG,
1786 (errmsg("processing ready for query message"),
1787 errdetail("find_victim_nodes returned no victim node")));
1788 }
1789 }
1790
1791 /*
1792 * XXX: discard rest of ReadyForQuery packet
1793 */
1794
1795 if (pool_read_message_length(backend) < 0)
1796 return POOL_END;
1797
1798 state = pool_read_kind(backend);
1799 if (state < 0)
1800 return POOL_END;
1801
1802 for (i = 0; i < NUM_BACKENDS; i++)
1803 {
1804 if (VALID_BACKEND(i))
1805 {
1806 /* abort transaction on all nodes. */
1807 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1808 }
1809 }
1810
1811 /* loop through until we get ReadyForQuery */
1812 for(;;)
1813 {
1814 kind = pool_read_kind(backend);
1815 if (kind < 0)
1816 return POOL_END;
1817
1818 if (kind == 'Z')
1819 break;
1820
1821
1822 /* put the message back to read buffer */
1823 for (i=0;i<NUM_BACKENDS;i++)
1824 {
1825 if (VALID_BACKEND(i))
1826 {
1827 pool_unread(CONNECTION(backend,i), &kind, 1);
1828 }
1829 }
1830
1831 /* discard rest of the packet */
1832 if (pool_discard_packet(backend) != POOL_CONTINUE)
1833 return POOL_END;
1834 }
1835 session_context->mismatch_ntuples = false;
1836 }
1837
1838 /*
1839 * if a transaction is started for insert lock, we need to close
1840 * the transaction.
1841 */
1842 /* if (pool_is_query_in_progress() && allow_close_transaction) */
1843 if (allow_close_transaction)
1844 {
1845 if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
1846 return POOL_END;
1847 }
1848
1849 if (MAJOR(backend) == PROTO_MAJOR_V3)
1850 {
1851 if ((len = pool_read_message_length(backend)) < 0)
1852 return POOL_END;
1853 /*
1854 * Set transaction state for each node
1855 */
1856 state = TSTATE(backend,
1857 MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID);
1858
1859 for (i=0;i<NUM_BACKENDS;i++)
1860 {
1861 if (!VALID_BACKEND(i))
1862 continue;
1863
1864 if (pool_read(CONNECTION(backend, i), &kind, sizeof(kind)))
1865 return POOL_END;
1866
1867 TSTATE(backend, i) = kind;
1868 ereport(DEBUG1,
1869 (errmsg("processing ReadyForQuery"),
1870 errdetail("transaction state '%c'(%02x)", state,state)));
1871 /*
1872 * The transaction state to be returned to frontend is
1873 * master's.
1874 */
1875 if (i == (MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))
1876 {
1877 state = kind;
1878 }
1879 /*
1880 * However, if the state is 'E', then frontend should had been
1881 * already reported an ERROR. So, to match with that, let be the
1882 * state to be returned to frontend.
1883 */
1884 if (kind == 'E')
1885 got_estate = true;
1886 }
1887 }
1888
1889 /*
1890 * Make sure that no message remains in the backend buffer. If something
1891 * remains, it could be an "out of band" ERROR or FATAL error, or a NOTICE
1892 * message, which was generated by backend itself for some reasons like
1893 * recovery conflict or SIGTERM received. If so, let's consume it and emit
1894 * a log message so that next read_kind_from_backend() will not hang in
1895 * trying to read from backend which may have not produced such a message.
1896 */
1897 if (pool_is_query_in_progress())
1898 {
1899 for (i = 0; i < NUM_BACKENDS; i++)
1900 {
1901 if (!VALID_BACKEND(i))
1902 continue;
1903 if (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
1904 per_node_error_log(backend, i,
1905 "(out of band message)",
1906 "ReadyForQuery: Error or notice message from backend: ", false);
1907 }
1908 }
1909
1910 if (send_ready)
1911 {
1912 pool_write(frontend, "Z", 1);
1913
1914 if (MAJOR(backend) == PROTO_MAJOR_V3)
1915 {
1916 len = htonl(len);
1917 pool_write(frontend, &len, sizeof(len));
1918 if (got_estate)
1919 state = 'E';
1920 pool_write(frontend, &state, 1);
1921 }
1922 pool_flush(frontend);
1923 }
1924
1925 if (pool_is_query_in_progress())
1926 {
1927 node = pool_get_parse_tree();
1928 query = pool_get_query_string();
1929
1930 if (pool_is_command_success())
1931 {
1932 if (node)
1933 pool_at_command_success(frontend, backend);
1934
1935 /* Memory cache enabled? */
1936 if (cache_commit && pool_config->memory_cache_enabled)
1937 {
1938
1939 /* If we are doing extended query and the state is after EXECUTE,
1940 * then we can commit cache.
1941 * We check latter condition by looking at query_context->query_w_hex.
1942 * This check is necessary for certain frame work such as PHP PDO.
1943 * It sends Sync message right after PARSE and it produces
1944 * "Ready for query" message from backend.
1945 */
1946 if (pool_is_doing_extended_query_message())
1947 {
1948 if (session_context->query_context &&
1949 session_context->query_context->query_state[MASTER_NODE_ID] == POOL_EXECUTE_COMPLETE)
1950 {
1951 pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
1952 if(session_context->query_context->query_w_hex)
1953 pfree(session_context->query_context->query_w_hex);
1954 session_context->query_context->query_w_hex = NULL;
1955 }
1956 }
1957 else
1958 {
1959 if (MAJOR(backend) != PROTO_MAJOR_V3)
1960 {
1961 state = 'I'; /* XXX I don't think query cache works with PROTO2 protocol */
1962 }
1963 pool_handle_query_cache(backend, query, node, state);
1964 }
1965 }
1966 }
1967 /*
1968 * If PREPARE or extended query protocol commands caused error,
1969 * remove the temporary saved message.
1970 * (except when ReadyForQuery() is called during Parse() of extended queries)
1971 */
1972 else
1973 {
1974 if ((pool_is_doing_extended_query_message() &&
1975 session_context->query_context &&
1976 session_context->query_context->query_state[MASTER_NODE_ID] != POOL_UNPARSED &&
1977 session_context->uncompleted_message) ||
1978 (!pool_is_doing_extended_query_message() && session_context->uncompleted_message &&
1979 session_context->uncompleted_message->kind != 0))
1980 {
1981 pool_add_sent_message(session_context->uncompleted_message);
1982 pool_remove_sent_message(session_context->uncompleted_message->kind,
1983 session_context->uncompleted_message->name);
1984 session_context->uncompleted_message = NULL;
1985 }
1986 }
1987
1988 pool_unset_query_in_progress();
1989 }
1990 if (!pool_is_doing_extended_query_message())
1991 {
1992 if (!(node && IsA(node, PrepareStmt)))
1993 {
1994 pool_query_context_destroy(pool_get_session_context(false)->query_context);
1995 }
1996 }
1997
1998 /*
1999 * Show ps idle status
2000 */
2001 pool_ps_idle_display(backend);
2002
2003 return POOL_CONTINUE;
2004 }
2005
2006 /*
2007 * Close running transactions on standbys.
2008 */
close_standby_transactions(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2009 static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend,
2010 POOL_CONNECTION_POOL *backend)
2011 {
2012 int i;
2013
2014 for (i=0;i<NUM_BACKENDS;i++)
2015 {
2016 if (CONNECTION_SLOT(backend, i) &&
2017 TSTATE(backend, i) == 'T' &&
2018 BACKEND_INFO(i).backend_status == CON_UP &&
2019 (MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != i)
2020 {
2021 per_node_statement_log(backend, i, "COMMIT");
2022 if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend),
2023 MASTER_CONNECTION(backend)->pid,
2024 MASTER_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
2025 ereport(ERROR,
2026 (errmsg("unable to close standby transactions"),
2027 errdetail("do_command returned DEADLOCK status")));
2028 }
2029 }
2030 return POOL_CONTINUE;
2031 }
2032
ParseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2033 POOL_STATUS ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2034 {
2035 POOL_SESSION_CONTEXT *session_context;
2036
2037 /* Get session context */
2038 session_context = pool_get_session_context(false);
2039
2040 if (!STREAM && session_context->uncompleted_message)
2041 {
2042 POOL_QUERY_CONTEXT *qc;
2043
2044 pool_add_sent_message(session_context->uncompleted_message);
2045
2046 qc = session_context->uncompleted_message->query_context;
2047 if (qc)
2048 pool_set_query_state(qc, POOL_PARSE_COMPLETE);
2049
2050 session_context->uncompleted_message = NULL;
2051 }
2052
2053 return SimpleForwardToFrontend('1', frontend, backend);
2054 }
2055
BindComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2056 POOL_STATUS BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2057 {
2058 POOL_SESSION_CONTEXT *session_context;
2059
2060 /* Get session context */
2061 session_context = pool_get_session_context(false);
2062
2063 if (!STREAM && session_context->uncompleted_message)
2064 {
2065 POOL_QUERY_CONTEXT *qc;
2066
2067 pool_add_sent_message(session_context->uncompleted_message);
2068
2069 qc = session_context->uncompleted_message->query_context;
2070 if (qc)
2071 pool_set_query_state(qc, POOL_BIND_COMPLETE);
2072
2073 session_context->uncompleted_message = NULL;
2074 }
2075
2076 return SimpleForwardToFrontend('2', frontend, backend);
2077 }
2078
CloseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2079 POOL_STATUS CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2080 {
2081 POOL_SESSION_CONTEXT *session_context;
2082 POOL_STATUS status;
2083 char kind = ' ';
2084 char *name = "";
2085
2086 /* Get session context */
2087 session_context = pool_get_session_context(false);
2088
2089 /* Send CloseComplete(3) to frontend before removing the target message */
2090 status = SimpleForwardToFrontend('3', frontend, backend);
2091
2092 /* Remove the target message */
2093 if (STREAM)
2094 {
2095 POOL_PENDING_MESSAGE *pmsg;
2096
2097 pmsg = pool_pending_message_pull_out();
2098
2099 if (pmsg)
2100 {
2101 /* Sanity check */
2102 if (pmsg->type != POOL_CLOSE)
2103 {
2104 ereport(LOG,
2105 (errmsg("CloseComplete: pending messge was not Close request: %s", pool_pending_message_type_to_string(pmsg->type))));
2106 }
2107 else
2108 {
2109 kind = pool_get_close_message_spec(pmsg);
2110 name = pool_get_close_message_name(pmsg);
2111 kind = kind=='S'?'P':'B';
2112 }
2113 pool_pending_message_free_pending_message(pmsg);
2114 }
2115 }
2116 else
2117 {
2118 if (session_context->uncompleted_message)
2119 {
2120 kind = session_context->uncompleted_message->kind;
2121 name = session_context->uncompleted_message->name;
2122 session_context->uncompleted_message = NULL;
2123 }
2124 else
2125 {
2126 ereport(ERROR,
2127 (errmsg("processing CloseComplete, uncompleted message not found")));
2128 }
2129 }
2130
2131 if (kind != ' ')
2132 {
2133 pool_remove_sent_message(kind, name);
2134 ereport(DEBUG1,
2135 (errmsg("CloseComplete: remove sent message. kind:%c, name:%s",
2136 kind, name)));
2137 if (pool_config->memory_cache_enabled)
2138 {
2139 POOL_QUERY_CONTEXT *query_context;
2140 POOL_TEMP_QUERY_CACHE *temp_cache;
2141
2142 query_context = session_context->query_context;
2143 if (query_context)
2144 {
2145 temp_cache = query_context->temp_cache;
2146 if (temp_cache)
2147 {
2148 pool_discard_temp_query_cache(temp_cache);
2149 query_context->temp_cache = NULL;
2150 }
2151 }
2152 }
2153 }
2154
2155 return status;
2156 }
2157
ParameterDescription(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2158 POOL_STATUS ParameterDescription(POOL_CONNECTION *frontend,
2159 POOL_CONNECTION_POOL *backend)
2160 {
2161 int len, len1 = 0;
2162 char *p = NULL;
2163 char *p1 = NULL;
2164 int sendlen;
2165 int i;
2166
2167 POOL_SESSION_CONTEXT *session_context;
2168 int num_params, send_num_params, num_dmy;
2169 char kind = 't';
2170
2171 session_context = pool_get_session_context(false);
2172
2173 /* only in replication mode and rewritten query */
2174 if (!REPLICATION || !session_context->query_context->rewritten_query)
2175 return SimpleForwardToFrontend('t', frontend, backend);
2176
2177 /* get number of parameters in original query */
2178 num_params = session_context->query_context->num_original_params;
2179
2180 pool_read(MASTER(backend), &len, sizeof(len));
2181
2182 len = ntohl(len);
2183 len -= sizeof(int32);
2184 len1 = len;
2185
2186 /* number of parameters in rewritten query is just discarded */
2187 pool_read(MASTER(backend), &num_dmy, sizeof(int16));
2188 len -= sizeof(int16);
2189
2190 p = pool_read2(MASTER(backend), len);
2191 if (p == NULL)
2192 ereport(ERROR,
2193 (errmsg("ParameterDescription. connection error"),
2194 errdetail("read from backend failed")));
2195
2196
2197 p1 = palloc(len);
2198 memcpy(p1, p, len);
2199
2200 for (i=0;i<NUM_BACKENDS;i++)
2201 {
2202 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
2203 {
2204 pool_read(CONNECTION(backend, i), &len, sizeof(len));
2205
2206 len = ntohl(len);
2207 len -= sizeof(int32);
2208
2209 p = pool_read2(CONNECTION(backend, i), len);
2210 if (p == NULL)
2211 ereport(ERROR,
2212 (errmsg("ParameterDescription. connection error"),
2213 errdetail("read from backend no %d failed",i)));
2214
2215 if (len != len1)
2216 ereport(DEBUG1,
2217 (errmsg("ParameterDescription. backends does not match"),
2218 errdetail("length does not match between backends master(%d) %d th backend(%d) kind:(%c)",len, i, len1, kind)));
2219 }
2220 }
2221
2222 pool_write(frontend, &kind, 1);
2223
2224 /* send back OIDs of parameters in original query and left are discarded */
2225 len = sizeof(int16) + num_params * sizeof(int32);
2226 sendlen = htonl(len + sizeof(int32));
2227 pool_write(frontend, &sendlen, sizeof(int32));
2228
2229 send_num_params = htons(num_params);
2230 pool_write(frontend, &send_num_params, sizeof(int16));
2231
2232 pool_write_and_flush(frontend, p1, num_params * sizeof(int32));
2233
2234 pfree(p1);
2235 return POOL_CONTINUE;
2236 }
2237
ErrorResponse3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2238 POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend,
2239 POOL_CONNECTION_POOL *backend)
2240 {
2241 POOL_STATUS ret;
2242
2243 ret = SimpleForwardToFrontend('E', frontend, backend);
2244 if (ret != POOL_CONTINUE)
2245 return ret;
2246
2247 if (!STREAM)
2248 raise_intentional_error_if_need(backend);
2249
2250 return POOL_CONTINUE;
2251 }
2252
FunctionCall(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2253 POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
2254 POOL_CONNECTION_POOL *backend)
2255 {
2256 char dummy[2];
2257 int oid;
2258 int argn;
2259 int i;
2260
2261 for (i=0;i<NUM_BACKENDS;i++)
2262 {
2263 if (VALID_BACKEND(i))
2264 {
2265 pool_write(CONNECTION(backend, i), "F", 1);
2266 }
2267 }
2268
2269 /* dummy */
2270 pool_read(frontend, dummy, sizeof(dummy));
2271
2272 for (i=0;i<NUM_BACKENDS;i++)
2273 {
2274 if (VALID_BACKEND(i))
2275 {
2276 pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
2277 }
2278 }
2279
2280 /* function object id */
2281 pool_read(frontend, &oid, sizeof(oid));
2282
2283 for (i=0;i<NUM_BACKENDS;i++)
2284 {
2285 if (VALID_BACKEND(i))
2286 {
2287 pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
2288 }
2289 }
2290
2291 /* number of arguments */
2292 pool_read(frontend, &argn, sizeof(argn));
2293
2294 for (i=0;i<NUM_BACKENDS;i++)
2295 {
2296 if (VALID_BACKEND(i))
2297 {
2298 pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
2299 }
2300 }
2301
2302 argn = ntohl(argn);
2303
2304 for (i=0;i<argn;i++)
2305 {
2306 int len;
2307 char *arg;
2308
2309 /* length of each argument in bytes */
2310 pool_read(frontend, &len, sizeof(len));
2311
2312 for (i=0;i<NUM_BACKENDS;i++)
2313 {
2314 if (VALID_BACKEND(i))
2315 {
2316 pool_write(CONNECTION(backend, i), &len, sizeof(len));
2317 }
2318 }
2319
2320 len = ntohl(len);
2321
2322 /* argument value itself */
2323 if ((arg = pool_read2(frontend, len)) == NULL)
2324 ereport(FATAL,
2325 (return_code(2),
2326 errmsg("failed to process function call"),
2327 errdetail("read from frontend failed")));
2328
2329 for (i=0;i<NUM_BACKENDS;i++)
2330 {
2331 if (VALID_BACKEND(i))
2332 {
2333 pool_write(CONNECTION(backend, i), arg, len);
2334 }
2335 }
2336 }
2337
2338 for (i=0;i<NUM_BACKENDS;i++)
2339 {
2340 if (VALID_BACKEND(i))
2341 {
2342 pool_flush(CONNECTION(backend, i));
2343 }
2344 }
2345 return POOL_CONTINUE;
2346 }
2347
ProcessFrontendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2348 POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
2349 POOL_CONNECTION_POOL *backend)
2350 {
2351 char fkind;
2352 char *bufp = NULL;
2353 char *contents;
2354 POOL_STATUS status;
2355 int len = 0;
2356
2357 /* Get session context */
2358 pool_get_session_context(false);
2359
2360 if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
2361 return POOL_CONTINUE;
2362
2363 pool_read(frontend, &fkind, 1);
2364
2365 ereport(DEBUG1,
2366 (errmsg("processing frontend response"),
2367 errdetail("received kind '%c'(%02x) from frontend",fkind,fkind)));
2368
2369
2370 if (MAJOR(backend) == PROTO_MAJOR_V3)
2371 {
2372 if (pool_read(frontend, &len, sizeof(len)) < 0)
2373 ereport(ERROR,
2374 (errmsg("unable to process frontend response"),
2375 errdetail("failed to read message length from frontend. frontend abnormally exited")));
2376
2377 len = ntohl(len) - 4;
2378 if (len > 0)
2379 bufp = pool_read2(frontend, len);
2380 else if (len < 0)
2381 ereport(ERROR,
2382 (errmsg("frontend message length is less than 4 (kind: %c)", fkind)));
2383 }
2384 else
2385 {
2386 if (fkind != 'F')
2387 bufp = pool_read_string(frontend, &len, 0);
2388 }
2389
2390 if (len > 0 && bufp == NULL)
2391 ereport(ERROR,
2392 (errmsg("unable to process frontend response"),
2393 errdetail("failed to read message from frontend. frontend abnormally exited")));
2394
2395 if (fkind != 'S' && pool_is_ignore_till_sync())
2396 {
2397 /*
2398 * Flag setting for calling ProcessBackendResponse()
2399 * in pool_process_query().
2400 */
2401 if (!pool_is_query_in_progress())
2402 pool_set_query_in_progress();
2403
2404 return POOL_CONTINUE;
2405 }
2406
2407 pool_unset_doing_extended_query_message();
2408
2409 /*
2410 * Allocate buffer and copy the packet contents. Because inside
2411 * these protocol modules, pool_read2 maybe called and modify its
2412 * buffer contents.
2413 */
2414 if (len > 0)
2415 {
2416 contents = palloc(len);
2417 memcpy(contents, bufp, len);
2418 }
2419 else
2420 {
2421 /*
2422 * Set dummy content if len <= 0.
2423 * this happens only when protocol version is 2.
2424 */
2425 contents = palloc(1);
2426 memcpy(contents, "", 1);
2427 }
2428
2429 switch (fkind)
2430 {
2431 POOL_QUERY_CONTEXT *query_context;
2432 char *query;
2433 Node *node;
2434 List *parse_tree_list;
2435 bool error;
2436
2437 case 'X': /* Terminate */
2438 if(contents)
2439 pfree(contents);
2440 ereport(DEBUG1,
2441 (errmsg("Frontend terminated"),
2442 errdetail("received message kind 'X' from frontend")));
2443 return POOL_END;
2444
2445 case 'Q': /* Query */
2446 allow_close_transaction = 1;
2447 status = SimpleQuery(frontend, backend, len, contents);
2448 break;
2449
2450 case 'E': /* Execute */
2451 allow_close_transaction = 1;
2452 pool_set_doing_extended_query_message();
2453 if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2454 pool_set_query_in_progress();
2455 status = Execute(frontend, backend, len, contents);
2456 break;
2457
2458 case 'P': /* Parse */
2459 allow_close_transaction = 0;
2460 pool_set_doing_extended_query_message();
2461 status = Parse(frontend, backend, len, contents);
2462 break;
2463
2464 case 'B': /* Bind */
2465 pool_set_doing_extended_query_message();
2466 status = Bind(frontend, backend, len, contents);
2467 break;
2468
2469 case 'C': /* Close */
2470 pool_set_doing_extended_query_message();
2471 if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2472 pool_set_query_in_progress();
2473 status = Close(frontend, backend, len, contents);
2474 break;
2475
2476 case 'D': /* Describe */
2477 pool_set_doing_extended_query_message();
2478 status = Describe(frontend, backend, len, contents);
2479 break;
2480
2481 case 'S': /* Sync */
2482 pool_set_doing_extended_query_message();
2483 if (pool_is_ignore_till_sync())
2484 pool_unset_ignore_till_sync();
2485
2486 if (STREAM)
2487 {
2488 POOL_PENDING_MESSAGE *msg;
2489
2490 pool_unset_query_in_progress();
2491 msg = pool_pending_message_create('S', 0, NULL);
2492 pool_pending_message_add(msg);
2493 pool_pending_message_free_pending_message(msg);
2494 }
2495 else if (!pool_is_query_in_progress())
2496 pool_set_query_in_progress();
2497 status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2498
2499 if (STREAM)
2500 {
2501 /* Wait till Ready for query received */
2502 pool_wait_till_ready_for_query(backend);
2503 }
2504 break;
2505
2506 case 'F': /* FunctionCall */
2507 /*
2508 * Create dummy query context as if it were an INSERT.
2509 */
2510 query_context = pool_init_query_context();
2511 query = "INSERT INTO foo VALUES(1)";
2512 MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
2513
2514 parse_tree_list = raw_parser(query, &error);
2515 node = (Node *) lfirst(list_head(parse_tree_list));
2516 pool_start_query(query_context, query, strlen(query) + 1, node);
2517
2518 MemoryContextSwitchTo(old_context);
2519
2520 pool_where_to_send(query_context, query_context->original_query,
2521 query_context->parse_tree);
2522
2523 if (MAJOR(backend) == PROTO_MAJOR_V3)
2524 status = FunctionCall3(frontend, backend, len, contents);
2525 else
2526 status = FunctionCall(frontend, backend);
2527
2528 break;
2529
2530 case 'c': /* CopyDone */
2531 case 'd': /* CopyData */
2532 case 'f': /* CopyFail */
2533 case 'H': /* Flush */
2534 if (MAJOR(backend) == PROTO_MAJOR_V3)
2535 {
2536 if (fkind == 'H')
2537 {
2538 pool_set_doing_extended_query_message();
2539 }
2540 status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2541
2542 /*
2543 * After flush message received, extended query mode should be
2544 * continued.
2545 */
2546 if (fkind != 'H' && pool_is_doing_extended_query_message())
2547 {
2548 pool_unset_doing_extended_query_message();
2549 }
2550
2551 break;
2552 }
2553
2554 default:
2555 ereport(FATAL,
2556 (return_code(2),
2557 errmsg("unable to process frontend response"),
2558 errdetail("unknown message type %c(%02x)", fkind, fkind)));
2559
2560 }
2561 if(contents)
2562 pfree(contents);
2563
2564 if (status != POOL_CONTINUE)
2565 ereport(FATAL,
2566 (return_code(2),
2567 errmsg("unable to process frontend response")));
2568
2569 return status;
2570 }
2571
ProcessBackendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int * state,short * num_fields)2572 POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
2573 POOL_CONNECTION_POOL *backend,
2574 int *state, short *num_fields)
2575 {
2576 int status = POOL_CONTINUE;
2577 char kind;
2578
2579 /* Get session context */
2580 pool_get_session_context(false);
2581
2582 if (pool_is_ignore_till_sync())
2583 {
2584 if (pool_is_query_in_progress())
2585 pool_unset_query_in_progress();
2586
2587 /*
2588 * Check if we have pending data in backend connection cache. If we
2589 * do, it is likely that a sync message has been sent to backend and
2590 * the backend replied back to us. So we need to process it.
2591 */
2592 if (is_backend_cache_empty(backend))
2593 {
2594 return POOL_CONTINUE;
2595 }
2596 }
2597
2598 if (pool_is_skip_reading_from_backends())
2599 {
2600 pool_unset_skip_reading_from_backends();
2601 return POOL_CONTINUE;
2602 }
2603
2604 read_kind_from_backend(frontend, backend, &kind);
2605
2606 /*
2607 * Sanity check
2608 */
2609 if (kind == 0)
2610 {
2611 ereport(FATAL,
2612 (return_code(2),
2613 errmsg("unable to process backend response"),
2614 errdetail("invalid message kind sent by backend connection")));
2615 }
2616 ereport(DEBUG1,
2617 (errmsg("processing backend response"),
2618 errdetail("received kind '%c'(%02x) from backend",kind,kind)));
2619
2620
2621 if (MAJOR(backend) == PROTO_MAJOR_V3)
2622 {
2623 switch (kind)
2624 {
2625 case 'G': /* CopyInResponse */
2626 status = CopyInResponse(frontend, backend);
2627 break;
2628
2629 case 'S': /* ParameterStatus */
2630 status = ParameterStatus(frontend, backend);
2631 break;
2632
2633 case 'Z': /* ReadyForQuery */
2634 ereport(DEBUG1,
2635 (errmsg("processing backend response"),
2636 errdetail("Ready For Query received")));
2637 status = ReadyForQuery(frontend, backend, true, true);
2638 #ifdef DEBUG
2639 extern bool stop_now;
2640 if (stop_now)
2641 exit(0);
2642 #endif
2643 break;
2644
2645 case '1': /* ParseComplete */
2646 if (STREAM)
2647 {
2648 POOL_PENDING_MESSAGE *pmsg;
2649 pmsg = pool_pending_message_get_previous_message();
2650 if (pmsg && pmsg->not_forward_to_frontend)
2651 {
2652 /* parse_before_bind() was called. Do not foward the
2653 * parse complete message to frontend. */
2654 ereport(DEBUG1,
2655 (errmsg("processing backend response"),
2656 errdetail("do not forward parse complete message to frontend")));
2657 pool_discard_packet_contents(backend);
2658 pool_unset_query_in_progress();
2659 pool_set_command_success();
2660 status = POOL_CONTINUE;
2661 break;
2662 }
2663 }
2664 status = ParseComplete(frontend, backend);
2665 pool_set_command_success();
2666 pool_unset_query_in_progress();
2667 break;
2668
2669 case '2': /* BindComplete */
2670 status = BindComplete(frontend, backend);
2671 pool_set_command_success();
2672 pool_unset_query_in_progress();
2673 break;
2674
2675 case '3': /* CloseComplete */
2676 if (STREAM)
2677 {
2678 POOL_PENDING_MESSAGE *pmsg;
2679 pmsg = pool_pending_message_get_previous_message();
2680 if (pmsg && pmsg->not_forward_to_frontend)
2681 {
2682 /* parse_before_bind() was called. Do not foward the
2683 * close complete message to frontend. */
2684 ereport(DEBUG1,
2685 (errmsg("processing backend response"),
2686 errdetail("do not forward close complete message to frontend")));
2687 pool_discard_packet_contents(backend);
2688
2689 /* Remove pending message here because
2690 * read_kind_from_backend() did not do it.
2691 */
2692 pmsg = pool_pending_message_pull_out();
2693 pool_pending_message_free_pending_message(pmsg);
2694
2695 if (pool_is_doing_extended_query_message())
2696 pool_unset_query_in_progress();
2697 pool_set_command_success();
2698 status = POOL_CONTINUE;
2699 break;
2700 }
2701 }
2702
2703 status = CloseComplete(frontend, backend);
2704 pool_set_command_success();
2705 if (pool_is_doing_extended_query_message())
2706 pool_unset_query_in_progress();
2707 break;
2708
2709 case 'E': /* ErrorResponse */
2710 status = ErrorResponse3(frontend, backend);
2711 pool_unset_command_success();
2712 if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID :
2713 REAL_MASTER_NODE_ID) != 'I')
2714 pool_set_failed_transaction();
2715 if (pool_is_doing_extended_query_message())
2716 {
2717 pool_set_ignore_till_sync();
2718 pool_unset_query_in_progress();
2719 if (STREAM)
2720 pool_discard_except_sync_and_ready_for_query(frontend, backend);
2721 }
2722 break;
2723
2724 case 'C': /* CommandComplete */
2725 status = CommandComplete(frontend, backend, true);
2726 pool_set_command_success();
2727 if (pool_is_doing_extended_query_message())
2728 pool_unset_query_in_progress();
2729 break;
2730
2731 case 't': /* ParameterDescription */
2732 status = ParameterDescription(frontend, backend);
2733 break;
2734
2735 case 'I': /* EmptyQueryResponse */
2736 status = CommandComplete(frontend, backend, false);
2737 /* Empty query response message should be treated same as
2738 * Command complete message. When we receive the Command
2739 * complete message, we unset the query in progress flag if
2740 * operated in streaming replication mode. So we unset the
2741 * flag as well. See bug 190 for more details.
2742 */
2743 if (pool_is_doing_extended_query_message())
2744 pool_unset_query_in_progress();
2745 break;
2746
2747 case 'T': /* RowDescription */
2748 status = SimpleForwardToFrontend(kind, frontend, backend);
2749 if (pool_is_doing_extended_query_message())
2750 pool_unset_query_in_progress();
2751 break;
2752
2753 case 'n': /* NoData */
2754 status = SimpleForwardToFrontend(kind, frontend, backend);
2755 if (pool_is_doing_extended_query_message())
2756 pool_unset_query_in_progress();
2757 break;
2758
2759 case 's': /* PortalSuspended */
2760 status = SimpleForwardToFrontend(kind, frontend, backend);
2761 if (pool_is_doing_extended_query_message())
2762 pool_unset_query_in_progress();
2763 break;
2764
2765 default:
2766 status = SimpleForwardToFrontend(kind, frontend, backend);
2767 break;
2768 }
2769
2770 if (STREAM && pool_is_doing_extended_query_message())
2771 pool_reset_preferred_master_node_id();
2772 }
2773 else
2774 {
2775 switch (kind)
2776 {
2777 case 'A': /* NotificationResponse */
2778 status = NotificationResponse(frontend, backend);
2779 break;
2780
2781 case 'B': /* BinaryRow */
2782 status = BinaryRow(frontend, backend, *num_fields);
2783 break;
2784
2785 case 'C': /* CompletedResponse */
2786 status = CompletedResponse(frontend, backend);
2787 break;
2788
2789 case 'D': /* AsciiRow */
2790 status = AsciiRow(frontend, backend, *num_fields);
2791 break;
2792
2793 case 'E': /* ErrorResponse */
2794 status = ErrorResponse(frontend, backend);
2795 if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID :
2796 REAL_MASTER_NODE_ID) != 'I')
2797 pool_set_failed_transaction();
2798 break;
2799
2800 case 'G': /* CopyInResponse */
2801 status = CopyInResponse(frontend, backend);
2802 break;
2803
2804 case 'H': /* CopyOutResponse */
2805 status = CopyOutResponse(frontend, backend);
2806 break;
2807
2808 case 'I': /* EmptyQueryResponse */
2809 EmptyQueryResponse(frontend, backend);
2810 break;
2811
2812 case 'N': /* NoticeResponse */
2813 NoticeResponse(frontend, backend);
2814 break;
2815
2816 case 'P': /* CursorResponse */
2817 status = CursorResponse(frontend, backend);
2818 break;
2819
2820 case 'T': /* RowDescription */
2821 status = RowDescription(frontend, backend, num_fields);
2822 break;
2823
2824 case 'V': /* FunctionResultResponse and FunctionVoidResponse */
2825 status = FunctionResultResponse(frontend, backend);
2826 break;
2827
2828 case 'Z': /* ReadyForQuery */
2829 status = ReadyForQuery(frontend, backend, true, true);
2830 break;
2831
2832 default:
2833 ereport(FATAL,
2834 (return_code(1),
2835 errmsg("Unknown message type %c(%02x)", kind, kind)));
2836 }
2837 }
2838
2839 /* Do we receive ready for query while processing reset
2840 * request?
2841 */
2842 if (kind == 'Z' && frontend->no_forward && *state == 1)
2843 {
2844 *state = 0;
2845 }
2846
2847 if (status != POOL_CONTINUE)
2848 ereport(FATAL,
2849 (return_code(2),
2850 errmsg("unable to process backend response for message kind '%c'",kind)));
2851
2852 return status;
2853 }
2854
CopyInResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2855 POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
2856 POOL_CONNECTION_POOL *backend)
2857 {
2858 POOL_STATUS status;
2859
2860 /* forward to the frontend */
2861 if (MAJOR(backend) == PROTO_MAJOR_V3)
2862 {
2863 SimpleForwardToFrontend('G', frontend, backend);
2864 pool_flush(frontend);
2865 }
2866 else
2867 pool_write_and_flush(frontend, "G", 1);
2868
2869 status = CopyDataRows(frontend, backend, 1);
2870 return status;
2871 }
2872
CopyOutResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2873 POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
2874 POOL_CONNECTION_POOL *backend)
2875 {
2876 POOL_STATUS status;
2877
2878 /* forward to the frontend */
2879 if (MAJOR(backend) == PROTO_MAJOR_V3)
2880 {
2881 SimpleForwardToFrontend('H', frontend, backend);
2882 pool_flush(frontend);
2883 }
2884 else
2885 pool_write_and_flush(frontend, "H", 1);
2886
2887 status = CopyDataRows(frontend, backend, 0);
2888 return status;
2889 }
2890
CopyDataRows(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int copyin)2891 POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
2892 POOL_CONNECTION_POOL *backend, int copyin)
2893 {
2894 char *string = NULL;
2895 int len;
2896 int i;
2897
2898 #ifdef DEBUG
2899 int j = 0;
2900 char buf[1024];
2901 #endif
2902
2903 for (;;)
2904 {
2905 if (copyin)
2906 {
2907 if (MAJOR(backend) == PROTO_MAJOR_V3)
2908 {
2909 char kind;
2910 char *contents = NULL;
2911
2912 pool_read(frontend, &kind, 1);
2913
2914 ereport(DEBUG1,
2915 (errmsg("copy data rows"),
2916 errdetail("read kind from frontend %c(%02x)", kind, kind)));
2917
2918 pool_read(frontend, &len, sizeof(len));
2919 len = ntohl(len) - 4;
2920 if (len > 0)
2921 contents = pool_read2(frontend, len);
2922
2923 SimpleForwardToBackend(kind, frontend, backend, len, contents);
2924
2925 /* CopyData? */
2926 if (kind == 'd')
2927 continue;
2928 else
2929 {
2930 ereport(DEBUG1,
2931 (errmsg("copy data rows"),
2932 errdetail("invalid copyin kind. expected 'd' got '%c'", kind)));
2933 break;
2934 }
2935 }
2936 else
2937 string = pool_read_string(frontend, &len, 1);
2938 }
2939 else
2940 {
2941 /* CopyOut */
2942 if (MAJOR(backend) == PROTO_MAJOR_V3)
2943 {
2944 signed char kind;
2945
2946 kind = pool_read_kind(backend);
2947
2948 SimpleForwardToFrontend(kind, frontend, backend);
2949
2950 /* CopyData? */
2951 if (kind == 'd')
2952 continue;
2953 else
2954 break;
2955 }
2956 else
2957 {
2958 for (i=0;i<NUM_BACKENDS;i++)
2959 {
2960 if (VALID_BACKEND(i))
2961 {
2962 string = pool_read_string(CONNECTION(backend, i), &len, 1);
2963 }
2964 }
2965 }
2966 }
2967
2968 if (string == NULL)
2969 ereport(FATAL,
2970 (errmsg("unable to copy data rows"),
2971 errdetail("cannot read string message from backend")));
2972
2973 #ifdef DEBUG
2974 strlcpy(buf, string, sizeof(buf));
2975 ereport(DEBUG1,
2976 (errmsg("copy data rows"),
2977 errdetail("copy line %d %d bytes :%s:", j++, len, buf)));
2978 #endif
2979
2980 if (copyin)
2981 {
2982 for (i=0;i<NUM_BACKENDS;i++)
2983 {
2984 if (VALID_BACKEND(i))
2985 {
2986 pool_write(CONNECTION(backend, i), string, len);
2987 }
2988 }
2989 }
2990 else
2991 pool_write(frontend, string, len);
2992
2993 if (len == PROTO_MAJOR_V3)
2994 {
2995 /* end of copy? */
2996 if (string[0] == '\\' &&
2997 string[1] == '.' &&
2998 string[2] == '\n')
2999 {
3000 break;
3001 }
3002 }
3003 }
3004
3005 /*
3006 * Wait till backend responds
3007 */
3008 if (copyin)
3009 {
3010 for (i=0;i<NUM_BACKENDS;i++)
3011 {
3012 if (VALID_BACKEND(i))
3013 {
3014 pool_flush(CONNECTION(backend, i));
3015
3016 /*
3017 * Check response from the backend. First check SSL and read
3018 * buffer of the backend. It is possible that there's an error
3019 * message in the buffer if the COPY command went wrong.
3020 * Otherwise wait for data arrival to the backend socket.
3021 */
3022 if (!pool_ssl_pending(CONNECTION(backend, i)) &&
3023 pool_read_buffer_is_empty(CONNECTION(backend, i)) &&
3024 synchronize(CONNECTION(backend, i)))
3025 ereport(FATAL,
3026 (return_code(2),
3027 errmsg("unable to copy data rows"),
3028 errdetail("failed to synchronize")));
3029 }
3030 }
3031 }
3032 else
3033 pool_flush(frontend);
3034
3035 return POOL_CONTINUE;
3036 }
3037
3038 /*
3039 * This function raises intentional error to make backends the same
3040 * transaction state.
3041 */
raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)3042 void raise_intentional_error_if_need(POOL_CONNECTION_POOL *backend)
3043 {
3044 int i;
3045 POOL_SESSION_CONTEXT *session_context;
3046 POOL_QUERY_CONTEXT *query_context;
3047
3048 /* Get session context */
3049 session_context = pool_get_session_context(false);
3050
3051 query_context = session_context->query_context;
3052
3053 if (MASTER_SLAVE &&
3054 TSTATE(backend, PRIMARY_NODE_ID) == 'T' &&
3055 PRIMARY_NODE_ID != MASTER_NODE_ID &&
3056 query_context &&
3057 is_select_query(query_context->parse_tree, query_context->original_query))
3058 {
3059 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
3060 if (pool_is_doing_extended_query_message())
3061 {
3062 do_error_execute_command(backend, PRIMARY_NODE_ID, PROTO_MAJOR_V3);
3063 }
3064 else
3065 {
3066 do_error_command(CONNECTION(backend, PRIMARY_NODE_ID), MAJOR(backend));
3067 }
3068 ereport(DEBUG1,
3069 (errmsg("raising intentional error"),
3070 errdetail("generating intentional error to sync backends transaction states")));
3071 }
3072
3073 if (REPLICATION &&
3074 TSTATE(backend, REAL_MASTER_NODE_ID) == 'T' &&
3075 !pool_config->replicate_select &&
3076 query_context &&
3077 is_select_query(query_context->parse_tree, query_context->original_query))
3078 {
3079 for (i = 0; i < NUM_BACKENDS; i++)
3080 {
3081 /*
3082 * Send a syntax error query to all backends except the node
3083 * which the original query was sent.
3084 */
3085 if (pool_is_node_to_be_sent(query_context, i))
3086 continue;
3087 else
3088 pool_set_node_to_be_sent(query_context, i);
3089
3090 if (VALID_BACKEND(i))
3091 {
3092 /*
3093 * We must abort transaction to sync transaction state.
3094 * If the error was caused by an Execute message,
3095 * we must send invalid Execute message to abort
3096 * transaction.
3097 *
3098 * Because extended query protocol ignores all
3099 * messages before receiving Sync message inside error state.
3100 */
3101 if (pool_is_doing_extended_query_message())
3102 {
3103 do_error_execute_command(backend, i, PROTO_MAJOR_V3);
3104 }
3105 else
3106 {
3107 do_error_command(CONNECTION(backend, i), MAJOR(backend));
3108 }
3109 }
3110 }
3111 }
3112 }
3113
3114 /*
3115 * Check various errors from backend. return values:
3116 * 0: no error
3117 * 1: deadlock detected
3118 * 2: serialization error detected
3119 * 3: query cancel
3120 * detected: 4
3121 */
check_errors(POOL_CONNECTION_POOL * backend,int backend_id)3122 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id)
3123 {
3124
3125 /*
3126 * Check dead lock error on the master node and abort
3127 * transactions on all nodes if so.
3128 */
3129 if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3130 return 1;
3131
3132 /*
3133 * Check serialization failure error and abort
3134 * transactions on all nodes if so. Otherwise we allow
3135 * data inconsistency among DB nodes. See following
3136 * scenario: (M:master, S:slave)
3137 *
3138 * M:S1:BEGIN;
3139 * M:S2:BEGIN;
3140 * S:S1:BEGIN;
3141 * S:S2:BEGIN;
3142 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3143 * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3144 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3145 * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3146 * M:S1:UPDATE t1 SET i = i + 1;
3147 * S:S1:UPDATE t1 SET i = i + 1;
3148 * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
3149 * S:S1:COMMIT;
3150 * M:S1:COMMIT;
3151 * M:S2:ERROR: could not serialize access due to concurrent update
3152 * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
3153 */
3154 if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend), true) == SPECIFIED_ERROR)
3155 return 2;
3156
3157 /*
3158 * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
3159 * This happens in following scenario:
3160 *
3161 * M:S1:BEGIN;
3162 * S:S1:BEGIN;
3163 * M:S1:SELECT 1; <-- only sent to MASTER
3164 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3165 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3166 * M: <-- error
3167 * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
3168 */
3169 if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3170 return 3;
3171
3172 /* check query cancel error */
3173 if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3174 return 4;
3175
3176 return 0;
3177 }
3178
generate_error_message(char * prefix,int specific_error,char * query)3179 static void generate_error_message(char *prefix, int specific_error, char *query)
3180 {
3181 POOL_SESSION_CONTEXT *session_context;
3182
3183 session_context = pool_get_session_context(true);
3184 if (!session_context)
3185 return;
3186
3187 static char *error_messages[] = {
3188 "received deadlock error message from master node. query: %s",
3189 "received serialization failure error message from master node. query: %s",
3190 "received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
3191 "received query cancel error message from master node. query: %s"
3192 };
3193
3194 String *msg;
3195
3196 if (specific_error < 1 || specific_error > sizeof(error_messages)/sizeof(char *))
3197 {
3198 ereport(LOG,
3199 (errmsg("generate_error_message: invalid specific_error: %d", specific_error)));
3200 return;
3201 }
3202
3203 specific_error--;
3204
3205 msg = init_string(prefix);
3206 string_append_char(msg, error_messages[specific_error]);
3207 ereport(LOG,
3208 (errmsg(msg->data, query)));
3209 free_string(msg);
3210 }
3211
3212 /*
3213 * Make per DB node statement log
3214 */
per_node_statement_log(POOL_CONNECTION_POOL * backend,int node_id,char * query)3215 void per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query)
3216 {
3217 POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3218
3219 if (pool_config->log_per_node_statement)
3220 ereport(LOG,
3221 (errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query)));
3222 }
3223
3224 /*
3225 * Check kind and produce error message
3226 * All data read in this function is returned to stream.
3227 */
per_node_error_log(POOL_CONNECTION_POOL * backend,int node_id,char * query,char * prefix,bool unread)3228 void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread)
3229 {
3230 POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3231 char *message;
3232
3233 if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), unread, &message) == 1)
3234 {
3235 ereport(LOG,
3236 (errmsg("%s: DB node id: %d backend pid: %d statement: \"%s\" message: \"%s\"",
3237 prefix, node_id, ntohl(slot->pid), query, message)));
3238 pfree(message);
3239 }
3240 }
3241
3242 /*
3243 * Send parse message to primary/master node and wait for reply if particular
3244 * message is not yet parsed on the primary/master node but parsed on other
3245 * node. Caller must provide the parse message data as "message".
3246 */
parse_before_bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,POOL_SENT_MESSAGE * message,POOL_SENT_MESSAGE * bind_message)3247 static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
3248 POOL_CONNECTION_POOL *backend,
3249 POOL_SENT_MESSAGE *message,
3250 POOL_SENT_MESSAGE *bind_message)
3251 {
3252 int i;
3253 int len = message->len;
3254 char kind = '\0';
3255 char *contents = message->contents;
3256 bool parse_was_sent = false;
3257 bool backup[MAX_NUM_BACKENDS];
3258 POOL_QUERY_CONTEXT *qc = message->query_context;
3259
3260 memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
3261
3262 if (STREAM)
3263 {
3264 if (message->kind == 'P' && qc->where_to_send[PRIMARY_NODE_ID] == 0)
3265 {
3266 POOL_PENDING_MESSAGE *pmsg;
3267 POOL_QUERY_CONTEXT *new_qc;
3268 char message_body[1024];
3269 int offset;
3270 int message_len;
3271
3272 /* we are in streaming replication mode and the parse message has not
3273 * been sent to primary yet */
3274
3275 /* Prepare modified query context */
3276 new_qc = pool_query_context_shallow_copy(qc);
3277 memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send));
3278 new_qc->where_to_send[PRIMARY_NODE_ID] = 1;
3279 new_qc->virtual_master_node_id = PRIMARY_NODE_ID;
3280
3281 /* Before sending the parse message to the primary, we need to
3282 * close the named statement. Otherwise we will get an error from
3283 * backend if the named statement already exists. This could
3284 * happend if parse_before_bind is called with a bind message
3285 * using same named statement. If the named statement does not
3286 * exist, it's fine. PostgreSQL just ignores a request trying to
3287 * close a non-existing statement. If the statement is unnamed
3288 * one, we do not need it because unnamed statement can be
3289 * overwritten anytime.
3290 */
3291 message_body[0] = 'S';
3292 offset = strlen(bind_message->contents)+1;
3293
3294 ereport(DEBUG1,
3295 (errmsg("parse before bind"),
3296 errdetail("close statement: %s", bind_message->contents+offset)));
3297
3298 if (bind_message->contents[offset] != '\0')
3299 {
3300 message_len = 1 + strlen(bind_message->contents+offset) + 1;
3301 StrNCpy(message_body+1, bind_message->contents+offset, sizeof(message_body)-1);
3302 pool_extended_send_and_wait(qc, "C", message_len, message_body, 1, PRIMARY_NODE_ID, false);
3303 /* Add pending message */
3304 pmsg = pool_pending_message_create('C', message_len, message_body);
3305 pmsg->not_forward_to_frontend = true;
3306 pool_pending_message_dest_set(pmsg, new_qc);
3307 pool_pending_message_add(pmsg);
3308 pool_pending_message_free_pending_message(pmsg);
3309 }
3310
3311 /* Send parse message to primary node */
3312 ereport(DEBUG1,
3313 (errmsg("parse before bind"),
3314 errdetail("waiting for primary completing parse")));
3315
3316 pool_extended_send_and_wait(qc, "P", len, contents, 1, PRIMARY_NODE_ID, false);
3317
3318 /* Add pending message */
3319 pmsg = pool_pending_message_create('P', len, contents);
3320 pmsg->not_forward_to_frontend = true;
3321 pool_pending_message_dest_set(pmsg, new_qc);
3322 pool_pending_message_add(pmsg);
3323 pool_pending_message_free_pending_message(pmsg);
3324
3325 /* Replace the query context of bind message */
3326 bind_message->query_context = new_qc;
3327
3328 #ifdef NOT_USED
3329 /*
3330 * XXX pool_remove_sent_message() will pfree memory allocated by "contents".
3331 */
3332
3333 /* Remove old sent message */
3334 pool_remove_sent_message('P', contents);
3335 /* Create and add sent message of this parse message */
3336 msg = pool_create_sent_message('P', len, contents, 0, contents, new_qc);
3337 pool_add_sent_message(msg);
3338 #endif
3339 /* Replace the query context of parse message */
3340 message->query_context = new_qc;
3341
3342 return POOL_CONTINUE;
3343 }
3344 else
3345 {
3346 ereport(DEBUG1,
3347 (errmsg("parse before bind"),
3348 errdetail("no need to re-send parse")));
3349 return POOL_CONTINUE;
3350 }
3351 }
3352 else
3353 {
3354 /* expect to send to master node only */
3355 for (i = 0; i < NUM_BACKENDS; i++)
3356 {
3357 if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
3358 {
3359 ereport(DEBUG1,
3360 (errmsg("parse before bind"),
3361 errdetail("waiting for backend %d completing parse", i)));
3362
3363 pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
3364 }
3365 else
3366 {
3367 qc->where_to_send[i] = 0;
3368 }
3369 }
3370 }
3371
3372 for (i = 0; i < NUM_BACKENDS; i++)
3373 {
3374 if (qc->where_to_send[i])
3375 {
3376 parse_was_sent = true;
3377 break;
3378 }
3379 }
3380
3381 if (!STREAM && parse_was_sent)
3382 {
3383 pool_set_query_in_progress();
3384
3385 while (kind != '1')
3386 {
3387 PG_TRY();
3388 {
3389 read_kind_from_backend(frontend, backend, &kind);
3390 pool_discard_packet_contents(backend);
3391 }
3392 PG_CATCH();
3393 {
3394 memcpy(qc->where_to_send, backup, sizeof(backup));
3395 PG_RE_THROW();
3396 }
3397 PG_END_TRY();
3398 }
3399 }
3400
3401 memcpy(qc->where_to_send, backup, sizeof(backup));
3402 return POOL_CONTINUE;
3403 }
3404
3405 /*
3406 * Find victim nodes by "decide by majority" rule and returns array
3407 * of victim node ids. If no victim is found, return NULL.
3408 *
3409 * Arguments:
3410 * ntuples: Array of number of affected tuples. -1 represents down node.
3411 * nmembers: Number of elements in ntuples.
3412 * master_node: The master node id. Less than 0 means ignore this parameter.
3413 * number_of_nodes: Number of elements in victim nodes array.
3414 *
3415 * Note: If no one wins and master_node >= 0, winner would be the
3416 * master and other nodes who has same number of tuples as the master.
3417 *
3418 * Caution: Returned victim node array is allocated in static memory
3419 * of this function. Subsequent calls to this function will overwrite
3420 * the memory.
3421 */
find_victim_nodes(int * ntuples,int nmembers,int master_node,int * number_of_nodes)3422 static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes)
3423 {
3424 static int victim_nodes[MAX_NUM_BACKENDS];
3425 static int votes[MAX_NUM_BACKENDS];
3426 int maxvotes;
3427 int majority_ntuples;
3428 int me;
3429 int cnt;
3430 int healthy_nodes;
3431 int i, j;
3432
3433 healthy_nodes = 0;
3434 *number_of_nodes = 0;
3435 maxvotes = 0;
3436 majority_ntuples = 0;
3437
3438 for (i=0;i<nmembers;i++)
3439 {
3440 me = ntuples[i];
3441
3442 /* Health node? */
3443 if (me < 0)
3444 {
3445 votes[i] = -1;
3446 continue;
3447 }
3448
3449 healthy_nodes++;
3450 votes[i] = 1;
3451
3452 for (j=0;j<nmembers;j++)
3453 {
3454 if (i != j && me == ntuples[j])
3455 {
3456 votes[i]++;
3457
3458 if (votes[i] > maxvotes)
3459 {
3460 maxvotes = votes[i];
3461 majority_ntuples = me;
3462 }
3463 }
3464 }
3465 }
3466
3467 /* Everyone is different */
3468 if (maxvotes == 1)
3469 {
3470 /* Master node is specified? */
3471 if (master_node < 0)
3472 return NULL;
3473
3474 /*
3475 * If master node is specified, let it and others who has same
3476 * ntuples win.
3477 */
3478 majority_ntuples = ntuples[master_node];
3479 }
3480 else
3481 {
3482 /* Find number of majority */
3483 cnt = 0;
3484 for (i=0;i<nmembers;i++)
3485 {
3486 if (votes[i] == maxvotes)
3487 {
3488 cnt++;
3489 }
3490 }
3491
3492 if (cnt <= healthy_nodes / 2.0)
3493 {
3494 /* No one wins */
3495
3496 /* Master node is specified? */
3497 if (master_node < 0)
3498 return NULL;
3499
3500 /*
3501 * If master node is specified, let it and others who has same
3502 * ntuples win.
3503 */
3504 majority_ntuples = ntuples[master_node];
3505 }
3506 }
3507
3508 /* Make victim nodes list */
3509 for (i=0;i<nmembers;i++)
3510 {
3511 if (ntuples[i] >= 0 && ntuples[i] != majority_ntuples)
3512 {
3513 victim_nodes[(*number_of_nodes)++] = i;
3514 }
3515 }
3516
3517 return victim_nodes;
3518 }
3519
3520
3521 /*
3522 * flatten_set_variable_args
3523 * Given a parsenode List as emitted by the grammar for SET,
3524 * convert to the flat string representation used by GUC.
3525 *
3526 * We need to be told the name of the variable the args are for, because
3527 * the flattening rules vary (ugh).
3528 *
3529 * The result is NULL if args is NIL (ie, SET ... TO DEFAULT), otherwise
3530 * a palloc'd string.
3531 */
3532 static char *
flatten_set_variable_args(const char * name,List * args)3533 flatten_set_variable_args(const char *name, List *args)
3534 {
3535 StringInfoData buf;
3536 ListCell *l;
3537
3538 /* Fast path if just DEFAULT */
3539 if (args == NIL)
3540 return NULL;
3541
3542 initStringInfo(&buf);
3543
3544 /*
3545 * Each list member may be a plain A_Const node, or an A_Const within a
3546 * TypeCast; the latter case is supported only for ConstInterval arguments
3547 * (for SET TIME ZONE).
3548 */
3549 foreach(l, args)
3550 {
3551 Node *arg = (Node *) lfirst(l);
3552 char *val;
3553 A_Const *con;
3554
3555 if (l != list_head(args))
3556 appendStringInfoString(&buf, ", ");
3557
3558 if (IsA(arg, TypeCast))
3559 {
3560 TypeCast *tc = (TypeCast *) arg;
3561 arg = tc->arg;
3562 }
3563
3564 if (!IsA(arg, A_Const))
3565 elog(ERROR, "unrecognized node type: %d", (int) nodeTag(arg));
3566 con = (A_Const *) arg;
3567
3568 switch (nodeTag(&con->val))
3569 {
3570 case T_Integer:
3571 appendStringInfo(&buf, "%ld", intVal(&con->val));
3572 break;
3573 case T_Float:
3574 /* represented as a string, so just copy it */
3575 appendStringInfoString(&buf, strVal(&con->val));
3576 break;
3577 case T_String:
3578 val = strVal(&con->val);
3579 appendStringInfoString(&buf, val);
3580 break;
3581 default:
3582 ereport(ERROR, (errmsg("unrecognized node type: %d",
3583 (int) nodeTag(&con->val))));
3584 break;
3585 }
3586 }
3587
3588 return buf.data;
3589 }
3590
3591 /* Called when sync message is received.
3592 * Wait till ready for query received.
3593 */
pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)3594 static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend)
3595 {
3596 char kind;
3597 int len;
3598 int poplen;
3599 char *buf;
3600 int i;
3601
3602 for (i=0;i<NUM_BACKENDS;i++)
3603 {
3604 if (VALID_BACKEND(i))
3605 {
3606 for (;;)
3607 {
3608 pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3609 ereport(DEBUG1,
3610 (errmsg("pool_wait_till_ready_for_query: kind: %c", kind)));
3611 pool_push(CONNECTION(backend, i), &kind, sizeof(kind));
3612 pool_read(CONNECTION(backend, i), &len, sizeof(len));
3613 pool_push(CONNECTION(backend, i), &len, sizeof(len));
3614 if ((ntohl(len)-sizeof(len)) > 0)
3615 {
3616 buf = pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len));
3617 pool_push(CONNECTION(backend, i), buf, ntohl(len)-sizeof(len));
3618 }
3619
3620 if (kind == 'Z') /* Ready for query? */
3621 {
3622 pool_pop(CONNECTION(backend, i), &poplen);
3623 ereport(DEBUG1,
3624 (errmsg("pool_wait_till_ready_for_query: backend:%d ready for query found. buffer length:%d",
3625 i, CONNECTION(backend, i)->len)));
3626 break;
3627 }
3628 }
3629 }
3630 }
3631 }
3632
3633 /*
3634 * Called when error response received in streaming replication mode and doing
3635 * extended query. Remove all pending messages and backend message buffer data
3636 * except POOL_SYNC pending message and ready for query. If sync message is
3637 * not received yet, continue to read data from frontend until a sync message
3638 * is read.
3639 */
pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3640 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
3641 POOL_CONNECTION_POOL *backend)
3642 {
3643 POOL_PENDING_MESSAGE *pmsg;
3644 int i;
3645
3646 if (!pool_is_doing_extended_query_message() || !STREAM)
3647 return;
3648
3649 /*
3650 * Check to see if we aready received a sync
3651 * message. If not, call ProcessFrontendResponse() to
3652 * get the sync message from client.
3653 */
3654 pmsg = pool_pending_message_get(POOL_SYNC);
3655 if (pmsg == NULL)
3656 {
3657 char kind;
3658 int len;
3659 POOL_PENDING_MESSAGE *msg;
3660 char *contents;
3661
3662 for(;;)
3663 {
3664 pool_read(frontend, &kind, sizeof(kind));
3665 pool_read(frontend, &len, sizeof(len));
3666 len = ntohl(len) - sizeof(len);
3667 if (len > 0)
3668 contents = pool_read2(frontend, len);
3669 if (kind == 'S')
3670 {
3671 msg = pool_pending_message_create('S', 0, NULL);
3672 pool_pending_message_add(msg);
3673 pool_pending_message_free_pending_message(msg);
3674 SimpleForwardToBackend(kind, frontend, backend, len, contents);
3675 break;
3676 }
3677 }
3678 }
3679 else
3680 pool_pending_message_free_pending_message(pmsg);
3681
3682 /* Remove all pending messages except sync message */
3683 do
3684 {
3685 pmsg = pool_pending_message_head_message();
3686 if (pmsg && pmsg->type == POOL_SYNC)
3687 {
3688 ereport(DEBUG1,
3689 (errmsg("Process backend response: sync pending message found after receiving error response")));
3690 pool_unset_ignore_till_sync();
3691 pool_pending_message_free_pending_message(pmsg);
3692 break;
3693 }
3694 pool_pending_message_free_pending_message(pmsg);
3695 pmsg = pool_pending_message_pull_out();
3696 pool_pending_message_free_pending_message(pmsg);
3697 }
3698 while (pmsg);
3699
3700 pool_pending_message_reset_previous_message();
3701
3702 /* Discard read buffer execpt "Ready for query" */
3703 for (i=0;i<NUM_BACKENDS;i++)
3704 {
3705 if (VALID_BACKEND(i))
3706 {
3707 char kind;
3708 int len;
3709 int sts;
3710
3711 while (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
3712 {
3713 sts = pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3714 if (sts < 0 || kind == '\0')
3715 {
3716 ereport(DEBUG1,
3717 (errmsg("pool_discard_except_sync_and_ready_for_query: EOF detected while reading from backend: %d buffer length: %d sts: %d",
3718 i, CONNECTION(backend, i)->len, sts)));
3719 pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
3720 break;
3721 }
3722
3723 if (kind == 'Z') /* Ready for query? */
3724 {
3725 pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
3726 ereport(DEBUG1,
3727 (errmsg("pool_discard_except_sync_and_ready_for_query: Ready for query found. backend:%d",
3728 i)));
3729 break;
3730 }
3731 else
3732 {
3733 /* Read and discard packet */
3734 pool_read(CONNECTION(backend, i), &len, sizeof(len));
3735 if ((ntohl(len)-sizeof(len)) > 0)
3736 {
3737 pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len));
3738 }
3739 ereport(DEBUG1,
3740 (errmsg("pool_discard_except_sync_and_ready_for_query: discarding packet %c (len:%lu) of backend:%d", kind, ntohl(len)-sizeof(len), i)));
3741 }
3742 }
3743 }
3744 }
3745 }
3746
3747 /*
3748 * Handle misc treatment when a command successfully completed.
3749 * Preconditions: query is in progress. The command is succeeded.
3750 */
pool_at_command_success(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3751 void pool_at_command_success(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
3752 {
3753 Node *node;
3754 char *query;
3755
3756 /* Sanity checks */
3757 if (!pool_is_query_in_progress())
3758 {
3759 ereport(ERROR,
3760 (errmsg("pool_at_command_success: query is not in progress")));
3761 }
3762
3763 if (!pool_is_command_success())
3764 {
3765 ereport(ERROR,
3766 (errmsg("pool_at_command_success: command did not succeed")));
3767 }
3768
3769 node = pool_get_parse_tree();
3770
3771 if (!node)
3772 {
3773 ereport(ERROR,
3774 (errmsg("pool_at_command_success: no parse tree found")));
3775 }
3776
3777 query = pool_get_query_string();
3778
3779 if (query == NULL)
3780 {
3781 ereport(ERROR,
3782 (errmsg("pool_at_command_success: no query found")));
3783 }
3784
3785 /*
3786 * If the query was BEGIN/START TRANSACTION, clear the
3787 * history that we had a writing command in the transaction
3788 * and forget the transaction isolation level.
3789 *
3790 * XXX If BEGIN is received while we are already in an
3791 * explicit transaction, the command *successes*
3792 * (just with a NOTICE message). In this case we lose
3793 * "writing_transaction" etc. info.
3794 */
3795 if (is_start_transaction_query(node))
3796 {
3797 pool_unset_writing_transaction();
3798 pool_unset_failed_transaction();
3799 pool_unset_transaction_isolation();
3800 }
3801
3802 /*
3803 * If the query was COMMIT/ABORT, clear the history
3804 * that we had a writing command in the transaction
3805 * and forget the transaction isolation level. This
3806 * is necessary if succeeding transaction is not an
3807 * explicit one.
3808 */
3809 else if (is_commit_or_rollback_query(node))
3810 {
3811 pool_unset_writing_transaction();
3812 pool_unset_failed_transaction();
3813 pool_unset_transaction_isolation();
3814 }
3815
3816 /*
3817 * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET
3818 * SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL
3819 * SERIALIZABLE, remember it.
3820 */
3821 else if (is_set_transaction_serializable(node))
3822 {
3823 pool_set_transaction_isolation(POOL_SERIALIZABLE);
3824 }
3825
3826 /*
3827 * If 2PC commands has been executed, automatically close
3828 * transactions on standbys if there is any open
3829 * transaction since 2PC commands close transaction on
3830 * primary.
3831 */
3832 else if (is_2pc_transaction_query(node))
3833 {
3834 close_standby_transactions(frontend, backend);
3835 }
3836
3837 else if (!is_select_query(node, query))
3838 {
3839 /*
3840 * If the query was not READ SELECT, and we are in an
3841 * explicit transaction, remember that we had a write
3842 * query in this transaction.
3843 */
3844 if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
3845 {
3846 /* However, if the query is "SET TRANSACTION READ ONLY" or its variant,
3847 * don't set it.
3848 */
3849 if (!pool_is_transaction_read_only(node))
3850 {
3851 ereport(DEBUG1,
3852 (errmsg("not SET TRANSACTION READ ONLY")));
3853
3854 pool_set_writing_transaction();
3855 }
3856 }
3857
3858 /*
3859 * If the query was CREATE TEMP TABLE, discard
3860 * temp table relcache because we might have had
3861 * persistent table relation cache which has table
3862 * name as the temp table.
3863 */
3864 if (IsA(node, CreateStmt))
3865 {
3866 CreateStmt *create_table_stmt = (CreateStmt *)node;
3867 if (create_table_stmt->relation->relpersistence == 't')
3868 discard_temp_table_relcache();
3869 }
3870 }
3871 }
3872