1 /* -*-pgsql-c-*- */
2 /*
3 *
4 * $Header$
5 *
6 * pgpool: a language independent connection pool server for PostgreSQL
7 * written by Tatsuo Ishii
8 *
9 * Copyright (c) 2003-2017 PgPool Global Development Group
10 *
11 * Permission to use, copy, modify, and distribute this software and
12 * its documentation for any purpose and without fee is hereby
13 * granted, provided that the above copyright notice appear in all
14 * copies and that both that copyright notice and this permission
15 * notice appear in supporting documentation, and that the name of the
16 * author not be used in advertising or publicity pertaining to
17 * distribution of the software without specific, written prior
18 * permission. The author makes no representations about the
19 * suitability of this software for any purpose. It is provided "as
20 * is" without express or implied warranty.
21 *
22 */
23 #include "pool.h"
24 #include "pool_config.h"
25 #include "protocol/pool_proto_modules.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "utils/pool_select_walker.h"
30 #include "utils/pool_stream.h"
31 #include "context/pool_session_context.h"
32 #include "context/pool_query_context.h"
33 #include "parser/nodes.h"
34
35 #include <string.h>
36 #include <netinet/in.h>
37 #include <stdlib.h>
38
39 /*
40 * Where to send query
41 */
42 typedef enum {
43 POOL_PRIMARY,
44 POOL_STANDBY,
45 POOL_EITHER,
46 POOL_BOTH
47 } POOL_DEST;
48
49 #define CHECK_QUERY_CONTEXT_IS_VALID \
50 do { \
51 if (!query_context) \
52 ereport(ERROR, \
53 (errmsg("setting db node for query to be sent, no query context")));\
54 } while (0)
55
56 static POOL_DEST send_to_where(Node *node, char *query);
57 static void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node);
58 static char* remove_read_write(int len, const char *contents, int *rewritten_len);
59
60 /*
61 * Create and initialize per query session context
62 */
pool_init_query_context(void)63 POOL_QUERY_CONTEXT *pool_init_query_context(void)
64 {
65 MemoryContext memory_context = AllocSetContextCreate(QueryContext,
66 "QueryContextMemoryContext",
67 ALLOCSET_SMALL_MINSIZE,
68 ALLOCSET_SMALL_INITSIZE,
69 ALLOCSET_SMALL_MAXSIZE);
70
71 MemoryContext oldcontext = MemoryContextSwitchTo(memory_context);
72 POOL_QUERY_CONTEXT *qc;
73 qc = palloc0(sizeof(*qc));
74 qc->memory_context = memory_context;
75 MemoryContextSwitchTo(oldcontext);
76 return qc;
77 }
78
79 /*
80 * Destroy query context
81 */
pool_query_context_destroy(POOL_QUERY_CONTEXT * query_context)82 void pool_query_context_destroy(POOL_QUERY_CONTEXT *query_context)
83 {
84 POOL_SESSION_CONTEXT *session_context;
85
86 if (query_context)
87 {
88 MemoryContext memory_context = query_context->memory_context;
89
90 ereport(DEBUG1,
91 (errmsg("pool_query_context_destroy: query context:%p", query_context)));
92
93 session_context = pool_get_session_context(false);
94 pool_unset_query_in_progress();
95 if (!pool_is_command_success() && query_context->pg_terminate_backend_conn)
96 {
97 ereport(DEBUG1,
98 (errmsg("clearing the connection flag for pg_terminate_backend")));
99 pool_unset_connection_will_be_terminated(query_context->pg_terminate_backend_conn);
100 }
101 query_context->pg_terminate_backend_conn = NULL;
102 query_context->original_query = NULL;
103 session_context->query_context = NULL;
104 pfree(query_context);
105 MemoryContextDelete(memory_context);
106 }
107 }
108
109 /*
110 * Perform shallow copy of given query context. Used in parse_before_bind.
111 */
pool_query_context_shallow_copy(POOL_QUERY_CONTEXT * query_context)112 POOL_QUERY_CONTEXT *pool_query_context_shallow_copy(POOL_QUERY_CONTEXT *query_context)
113 {
114 POOL_QUERY_CONTEXT *qc;
115 MemoryContext memory_context;
116
117 qc = pool_init_query_context();
118 memory_context = qc->memory_context;
119 memcpy(qc, query_context, sizeof(POOL_QUERY_CONTEXT));
120 qc->memory_context = memory_context;
121 return qc;
122 }
123
124 /*
125 * Start query
126 */
pool_start_query(POOL_QUERY_CONTEXT * query_context,char * query,int len,Node * node)127 void pool_start_query(POOL_QUERY_CONTEXT *query_context, char *query, int len, Node *node)
128 {
129 POOL_SESSION_CONTEXT *session_context;
130
131 if (query_context)
132 {
133 MemoryContext old_context;
134 session_context = pool_get_session_context(false);
135 old_context = MemoryContextSwitchTo(query_context->memory_context);
136 query_context->original_length = len;
137 query_context->rewritten_length = -1;
138 query_context->original_query = pstrdup(query);
139 query_context->rewritten_query = NULL;
140 query_context->parse_tree = node;
141 query_context->virtual_master_node_id = my_master_node_id;
142 query_context->is_cache_safe = false;
143 query_context->num_original_params = -1;
144 if (pool_config->memory_cache_enabled)
145 query_context->temp_cache = pool_create_temp_query_cache(query);
146 pool_set_query_in_progress();
147 query_context->skip_cache_commit = false;
148 session_context->query_context = query_context;
149 MemoryContextSwitchTo(old_context);
150 }
151 }
152
153 /*
154 * Specify DB node to send query
155 */
pool_set_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)156 void pool_set_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
157 {
158 CHECK_QUERY_CONTEXT_IS_VALID;
159
160 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
161 ereport(ERROR,
162 (errmsg("setting db node for query to be sent, invalid node id:%d",node_id),
163 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
164
165 query_context->where_to_send[node_id] = true;
166
167 return;
168 }
169
170 /*
171 * Unspecify DB node to send query
172 */
pool_unset_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)173 void pool_unset_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
174 {
175 CHECK_QUERY_CONTEXT_IS_VALID;
176
177 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
178 ereport(ERROR,
179 (errmsg("un setting db node for query to be sent, invalid node id:%d",node_id),
180 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
181
182 query_context->where_to_send[node_id] = false;
183
184 return;
185 }
186
187 /*
188 * Clear DB node map
189 */
pool_clear_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)190 void pool_clear_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
191 {
192 CHECK_QUERY_CONTEXT_IS_VALID;
193
194 memset(query_context->where_to_send, false, sizeof(query_context->where_to_send));
195 return;
196 }
197
198 /*
199 * Set all DB node map entry
200 */
pool_setall_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)201 void pool_setall_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
202 {
203 int i;
204 POOL_SESSION_CONTEXT *sc;
205
206 sc = pool_get_session_context(false);
207
208 CHECK_QUERY_CONTEXT_IS_VALID;
209
210 for (i=0;i<NUM_BACKENDS;i++)
211 {
212 if (private_backend_status[i] == CON_UP ||
213 (private_backend_status[i] == CON_CONNECT_WAIT))
214 {
215 /*
216 * In streaming replication mode, if the node is not
217 * primary node nor load balance node, there's no point to
218 * send query.
219 */
220 if (pool_config->master_slave_mode &&
221 pool_config->master_slave_sub_mode == STREAM_MODE &&
222 i != PRIMARY_NODE_ID && i != sc->load_balance_node_id)
223 {
224 continue;
225 }
226 query_context->where_to_send[i] = true;
227 }
228 }
229 return;
230 }
231
232 /*
233 * Return true if multiple nodes are targets
234 */
pool_multi_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)235 bool pool_multi_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
236 {
237 int i;
238 int cnt = 0;
239
240 CHECK_QUERY_CONTEXT_IS_VALID;
241
242 for (i=0;i<NUM_BACKENDS;i++)
243 {
244 if (((BACKEND_INFO(i)).backend_status == CON_UP ||
245 BACKEND_INFO((i)).backend_status == CON_CONNECT_WAIT) &&
246 query_context->where_to_send[i])
247 {
248 cnt++;
249 if (cnt > 1)
250 {
251 return true;
252 }
253 }
254 }
255 return false;
256 }
257
258 /*
259 * Return if the DB node is needed to send query
260 */
pool_is_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)261 bool pool_is_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
262 {
263 CHECK_QUERY_CONTEXT_IS_VALID;
264
265 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
266 ereport(ERROR,
267 (errmsg("checking if db node is needed to be sent, invalid node id:%d",node_id),
268 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
269
270 return query_context->where_to_send[node_id];
271 }
272
273 /*
274 * Returns true if the DB node is needed to send query.
275 * Intended to be called from VALID_BACKEND
276 */
pool_is_node_to_be_sent_in_current_query(int node_id)277 bool pool_is_node_to_be_sent_in_current_query(int node_id)
278 {
279 POOL_SESSION_CONTEXT *sc;
280
281 if (RAW_MODE)
282 return node_id == REAL_MASTER_NODE_ID;
283
284 sc = pool_get_session_context(true);
285 if (!sc)
286 return true;
287
288 if (pool_is_query_in_progress() && sc->query_context)
289 {
290 return pool_is_node_to_be_sent(sc->query_context, node_id);
291 }
292 return true;
293 }
294
295 /*
296 * Returns virtual master DB node id,
297 */
pool_virtual_master_db_node_id(void)298 int pool_virtual_master_db_node_id(void)
299 {
300 POOL_SESSION_CONTEXT *sc;
301
302 /*
303 * Check whether failover is in progress. If so, just abort this session.
304 */
305 if (Req_info->switching)
306 {
307 #ifdef NOT_USED
308 POOL_SETMASK(&BlockSig);
309 ereport(WARNING,
310 (errmsg("failover/failback is in progress"),
311 errdetail("executing failover or failback on backend"),
312 errhint("In a moment you should be able to reconnect to the database")));
313 POOL_SETMASK(&UnBlockSig);
314 #endif
315 child_exit(POOL_EXIT_AND_RESTART);
316 }
317
318 sc = pool_get_session_context(true);
319 if (!sc)
320 {
321 return REAL_MASTER_NODE_ID;
322 }
323
324 if (sc->in_progress && sc->query_context)
325 {
326 int node_id = sc->query_context->virtual_master_node_id;
327
328 if (STREAM)
329 {
330 /*
331 * Make sure that virtual_master_node_id is either primary node
332 * id or load balance node id. If not, it is likely that
333 * virtual_master_node_id is not set up yet. Let's use the
334 * primary node id. except for the special case where we need
335 * to send the query to the node which is not primary nor the
336 * load balance node. Currently there is only one special such
337 * case that is handling of pg_terminate_backend() function, which
338 * may refer to the backend connection that is neither hosted by
339 * the primary or load balance node for current child process, but
340 * the query must be forwarded to that node. Since only that backend
341 * node can handle that pg_terminate_backend query
342 *
343 */
344
345 ereport(DEBUG1,
346 (errmsg("pool_virtual_master_db_node_id: virtual_master_node_id:%d load_balance_node_id:%d PRIMARY_NODE_ID:%d",
347 node_id, sc->load_balance_node_id, PRIMARY_NODE_ID)));
348
349 if (node_id != sc->load_balance_node_id && node_id != PRIMARY_NODE_ID)
350 {
351 /*
352 * Only return the primary node id if we are not processing the
353 * pg_terminate_backend query
354 */
355 if (sc->query_context->pg_terminate_backend_conn == NULL)
356 node_id = PRIMARY_NODE_ID;
357 }
358 }
359
360 return node_id;
361 }
362
363 /*
364 * No query context exists. If in master/slave mode, returns
365 * primary node if exists. Otherwise returns my_master_node_id,
366 * which represents the last REAL_MASTER_NODE_ID.
367 */
368 if (MASTER_SLAVE)
369 {
370 int node_id;
371
372 node_id = pool_get_preferred_master_node_id();
373 if (node_id >= 0)
374 return node_id;
375
376 return PRIMARY_NODE_ID;
377 }
378 return my_master_node_id;
379 }
380
381 /*
382 * The function sets the destination for the current query to the specific backend node
383 */
pool_force_query_node_to_backend(POOL_QUERY_CONTEXT * query_context,int backend_id)384 void pool_force_query_node_to_backend(POOL_QUERY_CONTEXT *query_context, int backend_id)
385 {
386 int i;
387 CHECK_QUERY_CONTEXT_IS_VALID;
388
389 ereport(DEBUG1,
390 (errmsg("forcing query destination node to backend node:%d",backend_id)));
391
392 pool_set_node_to_be_sent(query_context,backend_id);
393 for (i=0;i<NUM_BACKENDS;i++)
394 {
395 if (query_context->where_to_send[i])
396 {
397 query_context->virtual_master_node_id = i;
398 break;
399 }
400 }
401 }
402
403 /*
404 * Decide where to send queries(thus expecting response)
405 */
pool_where_to_send(POOL_QUERY_CONTEXT * query_context,char * query,Node * node)406 void pool_where_to_send(POOL_QUERY_CONTEXT *query_context, char *query, Node *node)
407 {
408 POOL_SESSION_CONTEXT *session_context;
409 POOL_CONNECTION_POOL *backend;
410 int i;
411
412 CHECK_QUERY_CONTEXT_IS_VALID;
413
414 session_context = pool_get_session_context(false);
415 backend = session_context->backend;
416
417 /*
418 * Zap out DB node map
419 */
420 pool_clear_node_to_be_sent(query_context);
421
422 /*
423 * If there is "NO LOAD BALANCE" comment, we send only to master node.
424 */
425 if (!strncasecmp(query, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ))
426 {
427 pool_set_node_to_be_sent(query_context,
428 MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID);
429 for (i=0;i<NUM_BACKENDS;i++)
430 {
431 if (query_context->where_to_send[i])
432 {
433 query_context->virtual_master_node_id = i;
434 break;
435 }
436 }
437 return;
438 }
439
440 /*
441 * In raw mode, we send only to master node. Simple enough.
442 */
443 if (RAW_MODE)
444 {
445 pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
446 }
447 else if (MASTER_SLAVE && query_context->is_multi_statement)
448 {
449 /*
450 * If we are in master/slave mode and we have multi statement
451 * query, we should send it to primary server only. Otherwise
452 * it is possible to send a write query to standby servers
453 * because we only use the first element of the multi
454 * statement query and don't care about the rest. Typical
455 * situation where we are bugged by this is, "BEGIN;DELETE
456 * FROM table;END". Note that from pgpool-II 3.1.0
457 * transactional statements such as "BEGIN" is unconditionally
458 * sent to all nodes(see send_to_where() for more details).
459 * Someday we might be able to understand all part of multi
460 * statement queries, but until that day we need this band
461 * aid.
462 */
463 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
464 }
465 else if (MASTER_SLAVE)
466 {
467 POOL_DEST dest;
468
469 dest = send_to_where(node, query);
470
471 ereport(DEBUG1,
472 (errmsg("decide where to send the query"),
473 errdetail("destination = %d for query= \"%s\"", dest, query)));
474
475 /* Should be sent to primary only? */
476 if (dest == POOL_PRIMARY)
477 {
478 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
479 }
480 /* Should be sent to both primary and standby? */
481 else if (dest == POOL_BOTH)
482 {
483 pool_setall_node_to_be_sent(query_context);
484 }
485
486 /*
487 * Ok, we might be able to load balance the SELECT query.
488 */
489 else
490 {
491 if (pool_config->load_balance_mode &&
492 is_select_query(node, query) &&
493 MAJOR(backend) == PROTO_MAJOR_V3)
494 {
495 /*
496 * If (we are outside of an explicit transaction) OR
497 * (the transaction has not issued a write query yet, AND
498 * transaction isolation level is not SERIALIZABLE)
499 * we might be able to load balance.
500 */
501
502 ereport(DEBUG1,
503 (errmsg("checking load balance precondtions. TSTATE:%c wrting_trancation:%d failed_transaction:%d isolation:%d",
504 TSTATE(backend, PRIMARY_NODE_ID),
505 pool_is_writing_transaction(),
506 pool_is_failed_transaction(),
507 pool_get_transaction_isolation()),
508 errdetail("destination = %d for query= \"%s\"", dest, query)));
509
510 if (TSTATE(backend, PRIMARY_NODE_ID) == 'I' ||
511 (!pool_is_writing_transaction() &&
512 !pool_is_failed_transaction() &&
513 pool_get_transaction_isolation() != POOL_SERIALIZABLE))
514 {
515 BackendInfo *bkinfo = pool_get_node_info(session_context->load_balance_node_id);
516
517 /*
518 * Load balance if possible
519 */
520
521 /*
522 * If replication delay is too much, we prefer to send to the primary.
523 */
524 if (pool_config->master_slave_sub_mode == STREAM_MODE &&
525 pool_config->delay_threshold &&
526 bkinfo->standby_delay > pool_config->delay_threshold)
527 {
528 ereport(DEBUG1,
529 (errmsg("could not load balance because of too much replication delay"),
530 errdetail("destination = %d for query= \"%s\"", dest, query)));
531
532 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
533 }
534
535 /*
536 * If a writing function call is used,
537 * we prefer to send to the primary.
538 */
539 else if (pool_has_function_call(node))
540 {
541 ereport(DEBUG1,
542 (errmsg("could not load balance because writing functions are used"),
543 errdetail("destination = %d for query= \"%s\"", dest, query)));
544
545 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
546 }
547
548 /*
549 * If system catalog is used in the SELECT, we
550 * prefer to send to the primary. Example: SELECT
551 * * FROM pg_class WHERE relname = 't1'; Because
552 * 't1' is a constant, it's hard to recognize as
553 * table name. Most use case such query is
554 * against system catalog, and the table name can
555 * be a temporary table, it's best to query
556 * against primary system catalog.
557 * Please note that this test must be done *before*
558 * test using pool_has_temp_table.
559 */
560 else if (pool_has_system_catalog(node))
561 {
562 ereport(DEBUG1,
563 (errmsg("could not load balance because systems catalogs are used"),
564 errdetail("destination = %d for query= \"%s\"", dest, query)));
565
566 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
567 }
568
569 /*
570 * If temporary table is used in the SELECT,
571 * we prefer to send to the primary.
572 */
573 else if (pool_config->check_temp_table && pool_has_temp_table(node))
574 {
575 ereport(DEBUG1,
576 (errmsg("could not load balance because temporary tables are used"),
577 errdetail("destination = %d for query= \"%s\"", dest, query)));
578
579 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
580 }
581
582 /*
583 * If unlogged table is used in the SELECT,
584 * we prefer to send to the primary.
585 */
586 else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
587 {
588 ereport(DEBUG1,
589 (errmsg("could not load balance because unlogged tables are used"),
590 errdetail("destination = %d for query= \"%s\"", dest, query)));
591
592 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
593 }
594
595 else
596 {
597 pool_set_node_to_be_sent(query_context,
598 session_context->load_balance_node_id);
599 }
600 }
601 else
602 {
603 /* Send to the primary only */
604 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
605 }
606 }
607 else
608 {
609 /* Send to the primary only */
610 pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
611 }
612 }
613 }
614 else if (REPLICATION)
615 {
616 if (pool_config->load_balance_mode &&
617 is_select_query(node, query) &&
618 MAJOR(backend) == PROTO_MAJOR_V3)
619 {
620 /*
621 * If a writing function call is used or replicate_select is true,
622 * we prefer to send to all nodes.
623 */
624 if (pool_has_function_call(node) || pool_config->replicate_select)
625 {
626 pool_setall_node_to_be_sent(query_context);
627 }
628 /*
629 * If (we are outside of an explicit transaction) OR
630 * (the transaction has not issued a write query yet, AND
631 * transaction isolation level is not SERIALIZABLE)
632 * we might be able to load balance.
633 */
634 else if (TSTATE(backend, MASTER_NODE_ID) == 'I' ||
635 (!pool_is_writing_transaction() &&
636 !pool_is_failed_transaction() &&
637 pool_get_transaction_isolation() != POOL_SERIALIZABLE))
638 {
639 /* load balance */
640 pool_set_node_to_be_sent(query_context,
641 session_context->load_balance_node_id);
642 }
643 else
644 {
645 /* only send to master node */
646 pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
647 }
648 }
649 else
650 {
651 if (is_select_query(node, query) && !pool_config->replicate_select &&
652 !pool_has_function_call(node))
653 {
654 /* only send to master node */
655 pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
656 }
657 else
658 {
659 /* send to all nodes */
660 pool_setall_node_to_be_sent(query_context);
661 }
662 }
663 }
664 else
665 {
666 ereport(WARNING,
667 (errmsg("unknown pgpool-II mode while deciding for where to send query")));
668 return;
669 }
670
671 /*
672 * EXECUTE?
673 */
674 if (IsA(node, ExecuteStmt))
675 {
676 POOL_SENT_MESSAGE *msg;
677
678 msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
679 if (!msg)
680 msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
681 if (msg)
682 pool_copy_prep_where(msg->query_context->where_to_send,
683 query_context->where_to_send);
684 }
685
686 /*
687 * DEALLOCATE?
688 */
689 else if (IsA(node, DeallocateStmt))
690 {
691 where_to_send_deallocate(query_context, node);
692 }
693
694 for (i=0;i<NUM_BACKENDS;i++)
695 {
696 if (query_context->where_to_send[i])
697 {
698 query_context->virtual_master_node_id = i;
699 break;
700 }
701 }
702
703 return;
704 }
705
706 /*
707 * Send simple query and wait for response
708 * send_type:
709 * -1: do not send this node_id
710 * 0: send to all nodes
711 * >0: send to this node_id
712 */
pool_send_and_wait(POOL_QUERY_CONTEXT * query_context,int send_type,int node_id)713 POOL_STATUS pool_send_and_wait(POOL_QUERY_CONTEXT *query_context,
714 int send_type, int node_id)
715 {
716 POOL_SESSION_CONTEXT *session_context;
717 POOL_CONNECTION *frontend;
718 POOL_CONNECTION_POOL *backend;
719 bool is_commit;
720 bool is_begin_read_write;
721 int i;
722 int len;
723 char *string;
724
725 session_context = pool_get_session_context(false);
726 frontend = session_context->frontend;
727 backend = session_context->backend;
728 is_commit = is_commit_or_rollback_query(query_context->parse_tree);
729 is_begin_read_write = false;
730 len = 0;
731 string = NULL;
732
733 /*
734 * If the query is BEGIN READ WRITE or
735 * BEGIN ... SERIALIZABLE in master/slave mode,
736 * we send BEGIN to slaves/standbys instead.
737 * original_query which is BEGIN READ WRITE is sent to primary.
738 * rewritten_query which is BEGIN is sent to standbys.
739 */
740 if (pool_need_to_treat_as_if_default_transaction(query_context))
741 {
742 is_begin_read_write = true;
743 }
744 else
745 {
746 if (query_context->rewritten_query)
747 {
748 len = query_context->rewritten_length;
749 string = query_context->rewritten_query;
750 }
751 else
752 {
753 len = query_context->original_length;
754 string = query_context->original_query;
755 }
756 }
757
758 /* Send query */
759 for (i=0;i<NUM_BACKENDS;i++)
760 {
761 if (!VALID_BACKEND(i))
762 continue;
763 else if (send_type < 0 && i == node_id)
764 continue;
765 else if (send_type > 0 && i != node_id)
766 continue;
767
768 /*
769 * If in master/slave mode, we do not send COMMIT/ABORT to
770 * slaves/standbys if it's in I(idle) state.
771 */
772 if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
773 {
774 pool_unset_node_to_be_sent(query_context, i);
775 continue;
776 }
777
778 /*
779 * If in reset context, we send COMMIT/ABORT to nodes those
780 * are not in I(idle) state. This will ensure that
781 * transactions are closed.
782 */
783 if (is_commit && session_context->reset_context && TSTATE(backend, i) == 'I')
784 {
785 pool_unset_node_to_be_sent(query_context, i);
786 continue;
787 }
788
789 if (is_begin_read_write)
790 {
791 if (REAL_PRIMARY_NODE_ID == i)
792 {
793 len = query_context->original_length;
794 string = query_context->original_query;
795 }
796 else
797 {
798 len = query_context->rewritten_length;
799 string = query_context->rewritten_query;
800 }
801 }
802
803 per_node_statement_log(backend, i, string);
804 stat_count_up(i, query_context->parse_tree);
805 send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend));
806 }
807
808 /* Wait for response */
809 for (i=0;i<NUM_BACKENDS;i++)
810 {
811 if (!VALID_BACKEND(i))
812 continue;
813 else if (send_type < 0 && i == node_id)
814 continue;
815 else if (send_type > 0 && i != node_id)
816 continue;
817
818 #ifdef NOT_USED
819 /*
820 * If in master/slave mode, we do not send COMMIT/ABORT to
821 * slaves/standbys if it's in I(idle) state.
822 */
823 if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
824 {
825 continue;
826 }
827 #endif
828
829 if (is_begin_read_write)
830 {
831 if(REAL_PRIMARY_NODE_ID == i)
832 string = query_context->original_query;
833 else
834 string = query_context->rewritten_query;
835 }
836
837 wait_for_query_response_with_trans_cleanup(frontend,
838 CONNECTION(backend, i),
839 MAJOR(backend),
840 MASTER_CONNECTION(backend)->pid,
841 MASTER_CONNECTION(backend)->key);
842
843 /*
844 * Check if some error detected. If so, emit
845 * log. This is useful when invalid encoding error
846 * occurs. In this case, PostgreSQL does not report
847 * what statement caused that error and make users
848 * confused.
849 */
850 per_node_error_log(backend, i, string, "pool_send_and_wait: Error or notice message from backend: ", true);
851 }
852
853 return POOL_CONTINUE;
854 }
855
856 /*
857 * Send extended query and wait for response
858 * send_type:
859 * -1: do not send this node_id
860 * 0: send to all nodes
861 * >0: send to this node_id
862 */
pool_extended_send_and_wait(POOL_QUERY_CONTEXT * query_context,char * kind,int len,char * contents,int send_type,int node_id,bool nowait)863 POOL_STATUS pool_extended_send_and_wait(POOL_QUERY_CONTEXT *query_context,
864 char *kind, int len, char *contents,
865 int send_type, int node_id, bool nowait)
866 {
867 POOL_SESSION_CONTEXT *session_context;
868 POOL_CONNECTION *frontend;
869 POOL_CONNECTION_POOL *backend;
870 bool is_commit;
871 bool is_begin_read_write;
872 int i;
873 int str_len;
874 int rewritten_len;
875 char *str;
876 char *rewritten_begin;
877
878 session_context = pool_get_session_context(false);
879 frontend = session_context->frontend;
880 backend = session_context->backend;
881 is_commit = is_commit_or_rollback_query(query_context->parse_tree);
882 is_begin_read_write = false;
883 str_len = 0;
884 rewritten_len = 0;
885 str = NULL;
886 rewritten_begin = NULL;
887
888 /*
889 * If the query is BEGIN READ WRITE or
890 * BEGIN ... SERIALIZABLE in master/slave mode,
891 * we send BEGIN to slaves/standbys instead.
892 * original_query which is BEGIN READ WRITE is sent to primary.
893 * rewritten_query which is BEGIN is sent to standbys.
894 */
895 if (pool_need_to_treat_as_if_default_transaction(query_context))
896 {
897 is_begin_read_write = true;
898
899 if (*kind == 'P')
900 rewritten_begin = remove_read_write(len, contents, &rewritten_len);
901 }
902
903 if (!rewritten_begin)
904 {
905 str_len = len;
906 str = contents;
907 }
908
909 /* Send query */
910 for (i=0;i<NUM_BACKENDS;i++)
911 {
912 if (!VALID_BACKEND(i))
913 continue;
914 else if (send_type < 0 && i == node_id)
915 continue;
916 else if (send_type > 0 && i != node_id)
917 continue;
918
919 /*
920 * If in reset context, we send COMMIT/ABORT to nodes those
921 * are not in I(idle) state. This will ensure that
922 * transactions are closed.
923 */
924 if (is_commit && session_context->reset_context && TSTATE(backend, i) == 'I')
925 {
926 pool_unset_node_to_be_sent(query_context, i);
927 continue;
928 }
929
930 if (rewritten_begin)
931 {
932 if (REAL_PRIMARY_NODE_ID == i)
933 {
934 str = contents;
935 str_len = len;
936 }
937 else
938 {
939 str = rewritten_begin;
940 str_len = rewritten_len;
941 }
942 }
943
944 if (pool_config->log_per_node_statement)
945 {
946 char msgbuf[QUERY_STRING_BUFFER_LEN];
947 char *stmt;
948
949 if (*kind == 'P' || *kind == 'E')
950 {
951 if (query_context->rewritten_query)
952 {
953 if (is_begin_read_write)
954 {
955 if (REAL_PRIMARY_NODE_ID == i)
956 stmt = query_context->original_query;
957 else
958 stmt = query_context->rewritten_query;
959 }
960 else
961 {
962 stmt = query_context->rewritten_query;
963 }
964 }
965 else
966 {
967 stmt = query_context->original_query;
968 }
969
970 if (*kind == 'P')
971 snprintf(msgbuf, sizeof(msgbuf), "Parse: %s", stmt);
972 else
973 snprintf(msgbuf, sizeof(msgbuf), "Execute: %s", stmt);
974 }
975 else
976 {
977 snprintf(msgbuf, sizeof(msgbuf), "%c message", *kind);
978 }
979
980 per_node_statement_log(backend, i, msgbuf);
981 }
982
983 /* if Execute message, count up stats count */
984 if (*kind == 'E')
985 {
986 stat_count_up(i, query_context->parse_tree);
987 }
988
989 send_extended_protocol_message(backend, i, kind, str_len, str);
990
991 if ((*kind == 'P' || *kind == 'E' || *kind == 'C') && STREAM)
992 {
993 /*
994 * Send flush message to backend to make sure that we get any response
995 * from backend in Streaming replication mode.
996 */
997
998 POOL_CONNECTION *cp = CONNECTION(backend, i);
999 int len;
1000
1001 pool_write(cp, "H", 1);
1002 len = htonl(sizeof(len));
1003 pool_write_and_flush(cp, &len, sizeof(len));
1004
1005 ereport(DEBUG1,
1006 (errmsg("pool_send_and_wait: send flush message to %d", i)));
1007 }
1008 }
1009
1010 if (!is_begin_read_write)
1011 {
1012 if (query_context->rewritten_query)
1013 str = query_context->rewritten_query;
1014 else
1015 str = query_context->original_query;
1016 }
1017
1018 if (!nowait)
1019 {
1020 /* Wait for response */
1021 for (i=0;i<NUM_BACKENDS;i++)
1022 {
1023 if (!VALID_BACKEND(i))
1024 continue;
1025 else if (send_type < 0 && i == node_id)
1026 continue;
1027 else if (send_type > 0 && i != node_id)
1028 continue;
1029
1030 /*
1031 * If in master/slave mode, we do not send COMMIT/ABORT to
1032 * slaves/standbys if it's in I(idle) state.
1033 */
1034 if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
1035 {
1036 continue;
1037 }
1038
1039 if (is_begin_read_write)
1040 {
1041 if (REAL_PRIMARY_NODE_ID == i)
1042 str = query_context->original_query;
1043 else
1044 str = query_context->rewritten_query;
1045 }
1046
1047 wait_for_query_response_with_trans_cleanup(frontend,
1048 CONNECTION(backend, i),
1049 MAJOR(backend),
1050 MASTER_CONNECTION(backend)->pid,
1051 MASTER_CONNECTION(backend)->key);
1052
1053 /*
1054 * Check if some error detected. If so, emit
1055 * log. This is useful when invalid encoding error
1056 * occurs. In this case, PostgreSQL does not report
1057 * what statement caused that error and make users
1058 * confused.
1059 */
1060 per_node_error_log(backend, i, str, "pool_send_and_wait: Error or notice message from backend: ", true);
1061 }
1062 }
1063
1064 if(rewritten_begin)
1065 pfree(rewritten_begin);
1066 return POOL_CONTINUE;
1067 }
1068
1069 /*
1070 * From syntactically analysis decide the statement to be sent to the
1071 * primary, the standby or either or both in master/slave+HR/SR mode.
1072 */
send_to_where(Node * node,char * query)1073 static POOL_DEST send_to_where(Node *node, char *query)
1074
1075 {
1076 /* From storage/lock.h */
1077 #define NoLock 0
1078 #define AccessShareLock 1 /* SELECT */
1079 #define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */
1080 #define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */
1081 #define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL),ANALYZE, CREATE
1082 * INDEX CONCURRENTLY */
1083 #define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */
1084 #define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW
1085 * SHARE */
1086 #define ExclusiveLock 7 /* blocks ROW SHARE/SELECT...FOR
1087 * UPDATE */
1088 #define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM
1089 * FULL, and unqualified LOCK TABLE */
1090
1091 /* From 9.5 include/nodes/node.h ("TAGS FOR STATEMENT NODES" part) */
1092 static NodeTag nodemap[] = {
1093 T_PlannedStmt,
1094 T_InsertStmt,
1095 T_DeleteStmt,
1096 T_UpdateStmt,
1097 T_SelectStmt,
1098 T_AlterTableStmt,
1099 T_AlterTableCmd,
1100 T_AlterDomainStmt,
1101 T_SetOperationStmt,
1102 T_GrantStmt,
1103 T_GrantRoleStmt,
1104 T_AlterDefaultPrivilegesStmt,
1105 T_ClosePortalStmt,
1106 T_ClusterStmt,
1107 T_CopyStmt,
1108 T_CreateStmt, /* CREATE TABLE */
1109 T_DefineStmt, /* CREATE AGGREGATE, OPERATOR, TYPE */
1110 T_DropStmt, /* DROP TABLE etc. */
1111 T_TruncateStmt,
1112 T_CommentStmt,
1113 T_FetchStmt,
1114 T_IndexStmt, /* CREATE INDEX */
1115 T_CreateFunctionStmt,
1116 T_AlterFunctionStmt,
1117 T_DoStmt,
1118 T_RenameStmt, /* ALTER AGGREGATE etc. */
1119 T_RuleStmt, /* CREATE RULE */
1120 T_NotifyStmt,
1121 T_ListenStmt,
1122 T_UnlistenStmt,
1123 T_TransactionStmt,
1124 T_ViewStmt, /* CREATE VIEW */
1125 T_LoadStmt,
1126 T_CreateDomainStmt,
1127 T_CreatedbStmt,
1128 T_DropdbStmt,
1129 T_VacuumStmt,
1130 T_ExplainStmt,
1131 T_CreateTableAsStmt,
1132 T_CreateSeqStmt,
1133 T_AlterSeqStmt,
1134 T_VariableSetStmt, /* SET */
1135 T_VariableShowStmt,
1136 T_DiscardStmt,
1137 T_CreateTrigStmt,
1138 T_CreatePLangStmt,
1139 T_CreateRoleStmt,
1140 T_AlterRoleStmt,
1141 T_DropRoleStmt,
1142 T_LockStmt,
1143 T_ConstraintsSetStmt,
1144 T_ReindexStmt,
1145 T_CheckPointStmt,
1146 T_CreateSchemaStmt,
1147 T_AlterDatabaseStmt,
1148 T_AlterDatabaseSetStmt,
1149 T_AlterRoleSetStmt,
1150 T_CreateConversionStmt,
1151 T_CreateCastStmt,
1152 T_CreateOpClassStmt,
1153 T_CreateOpFamilyStmt,
1154 T_AlterOpFamilyStmt,
1155 T_PrepareStmt,
1156 T_ExecuteStmt,
1157 T_DeallocateStmt, /* DEALLOCATE */
1158 T_DeclareCursorStmt, /* DECLARE */
1159 T_CreateTableSpaceStmt,
1160 T_DropTableSpaceStmt,
1161 T_AlterObjectSchemaStmt,
1162 T_AlterOwnerStmt,
1163 T_DropOwnedStmt,
1164 T_ReassignOwnedStmt,
1165 T_CompositeTypeStmt, /* CREATE TYPE */
1166 T_CreateEnumStmt,
1167 T_CreateRangeStmt,
1168 T_AlterEnumStmt,
1169 T_AlterTSDictionaryStmt,
1170 T_AlterTSConfigurationStmt,
1171 T_CreateFdwStmt,
1172 T_AlterFdwStmt,
1173 T_CreateForeignServerStmt,
1174 T_AlterForeignServerStmt,
1175 T_CreateUserMappingStmt,
1176 T_AlterUserMappingStmt,
1177 T_DropUserMappingStmt,
1178 T_AlterTableSpaceOptionsStmt,
1179 T_AlterTableMoveAllStmt,
1180 T_SecLabelStmt,
1181 T_CreateForeignTableStmt,
1182 T_ImportForeignSchemaStmt,
1183 T_CreateExtensionStmt,
1184 T_AlterExtensionStmt,
1185 T_AlterExtensionContentsStmt,
1186 T_CreateEventTrigStmt,
1187 T_AlterEventTrigStmt,
1188 T_RefreshMatViewStmt,
1189 T_ReplicaIdentityStmt,
1190 T_AlterSystemStmt,
1191 T_CreatePolicyStmt,
1192 T_AlterPolicyStmt,
1193 T_CreateTransformStmt,
1194 };
1195
1196 if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]),
1197 sizeof(NodeTag), compare) != NULL)
1198 {
1199 /*
1200 * SELECT INTO
1201 * SELECT FOR SHARE or UPDATE
1202 */
1203 if (IsA(node, SelectStmt))
1204 {
1205 /* SELECT INTO or SELECT FOR SHARE or UPDATE ? */
1206 if (pool_has_insertinto_or_locking_clause(node))
1207 return POOL_PRIMARY;
1208
1209 /* non-SELECT query in WITH clause ? */
1210 if (((SelectStmt *)node)->withClause)
1211 {
1212 List *ctes = ((SelectStmt *)node)->withClause->ctes;
1213 ListCell *cte_item;
1214 foreach(cte_item, ctes)
1215 {
1216 CommonTableExpr *cte = (CommonTableExpr *)lfirst(cte_item);
1217 if (!IsA(cte->ctequery, SelectStmt))
1218 return POOL_PRIMARY;
1219 }
1220 }
1221
1222 return POOL_EITHER;
1223 }
1224
1225 /*
1226 * COPY
1227 */
1228 else if (IsA(node, CopyStmt))
1229 {
1230 if (((CopyStmt *)node)->is_from)
1231 return POOL_PRIMARY;
1232 else
1233 {
1234 if (((CopyStmt *)node)->query == NULL)
1235 return POOL_EITHER;
1236 else
1237 return (IsA(((CopyStmt *)node)->query, SelectStmt))?POOL_EITHER:POOL_PRIMARY;
1238 }
1239 }
1240
1241 /*
1242 * LOCK
1243 */
1244 else if (IsA(node, LockStmt))
1245 {
1246 return (((LockStmt *)node)->mode >= RowExclusiveLock)?POOL_PRIMARY:POOL_BOTH;
1247 }
1248
1249 /*
1250 * Transaction commands
1251 */
1252 else if (IsA(node, TransactionStmt))
1253 {
1254 /*
1255 * Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"
1256 */
1257 if (is_start_transaction_query(node))
1258 {
1259 /* But actually, we send BEGIN to standby if it's
1260 BEGIN READ WRITE or START TRANSACTION READ WRITE */
1261 if (is_read_write((TransactionStmt *)node))
1262 return POOL_BOTH;
1263 /* Other TRANSACTION start commands are sent to both primary
1264 and standby */
1265 else
1266 return POOL_BOTH;
1267 }
1268 /* SAVEPOINT related commands are sent to both primary and standby */
1269 else if (is_savepoint_query(node))
1270 return POOL_BOTH;
1271 /*
1272 * 2PC commands
1273 */
1274 else if (is_2pc_transaction_query(node))
1275 return POOL_PRIMARY;
1276 else
1277 /* COMMIT etc. */
1278 return POOL_BOTH;
1279 }
1280
1281 /*
1282 * SET
1283 */
1284 else if (IsA(node, VariableSetStmt))
1285 {
1286 ListCell *list_item;
1287 bool ret = POOL_BOTH;
1288
1289 /*
1290 * SET transaction_read_only TO off
1291 */
1292 if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1293 !strcmp(((VariableSetStmt *)node)->name, "transaction_read_only"))
1294 {
1295 List *options = ((VariableSetStmt *)node)->args;
1296 foreach(list_item, options)
1297 {
1298 A_Const *v = (A_Const *)lfirst(list_item);
1299
1300 switch (v->val.type)
1301 {
1302 case T_String:
1303 if (!strcasecmp(v->val.val.str, "off") ||
1304 !strcasecmp(v->val.val.str, "f") ||
1305 !strcasecmp(v->val.val.str, "false"))
1306 ret = POOL_PRIMARY;
1307 break;
1308 case T_Integer:
1309 if (v->val.val.ival)
1310 ret = POOL_PRIMARY;
1311 default:
1312 break;
1313 }
1314 }
1315 return ret;
1316 }
1317
1318 /* SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1319 * SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1320 * SET transaction_isolation TO 'serializable'
1321 * SET default_transaction_isolation TO 'serializable'
1322 */
1323 else if (is_set_transaction_serializable(node))
1324 {
1325 return POOL_PRIMARY;
1326 }
1327
1328 /*
1329 * Check "SET TRANSACTION READ WRITE" "SET SESSION
1330 * CHARACTERISTICS AS TRANSACTION READ WRITE"
1331 */
1332 else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1333 (!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1334 !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1335 {
1336 List *options = ((VariableSetStmt *)node)->args;
1337 foreach(list_item, options)
1338 {
1339 DefElem *opt = (DefElem *) lfirst(list_item);
1340
1341 if (!strcmp("transaction_read_only", opt->defname))
1342 {
1343 bool read_only;
1344
1345 read_only = ((A_Const *)opt->arg)->val.val.ival;
1346 if (!read_only)
1347 return POOL_PRIMARY;
1348 }
1349 }
1350 return POOL_BOTH;
1351 }
1352 else
1353 {
1354 /*
1355 * All other SET command sent to both primary and
1356 * standby
1357 */
1358 return POOL_BOTH;
1359 }
1360 }
1361
1362 /*
1363 * DISCARD
1364 */
1365 else if (IsA(node, DiscardStmt))
1366 {
1367 return POOL_BOTH;
1368 }
1369
1370 /*
1371 * PREPARE
1372 */
1373 else if (IsA(node, PrepareStmt))
1374 {
1375 PrepareStmt *prepare_statement = (PrepareStmt *)node;
1376
1377 char *string = nodeToString(prepare_statement->query);
1378
1379 /* Note that this is a recursive call */
1380 return send_to_where((Node *)(prepare_statement->query), string);
1381 }
1382
1383 /*
1384 * EXECUTE
1385 */
1386 else if (IsA(node, ExecuteStmt))
1387 {
1388 /* This is temporary decision. where_to_send will inherit
1389 * same destination AS PREPARE.
1390 */
1391 return POOL_PRIMARY;
1392 }
1393
1394 /*
1395 * DEALLOCATE
1396 */
1397 else if (IsA(node, DeallocateStmt))
1398 {
1399 /* This is temporary decision. where_to_send will inherit
1400 * same destination AS PREPARE.
1401 */
1402 return POOL_PRIMARY;
1403 }
1404 /*
1405 * SHOW
1406 */
1407 else if (IsA(node, VariableShowStmt))
1408 {
1409 return POOL_EITHER;
1410 }
1411
1412 /*
1413 * Other statements are sent to primary
1414 */
1415 return POOL_PRIMARY;
1416 }
1417
1418 /*
1419 * All unknown statements are sent to primary
1420 */
1421 return POOL_PRIMARY;
1422 }
1423
1424 static
where_to_send_deallocate(POOL_QUERY_CONTEXT * query_context,Node * node)1425 void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node)
1426 {
1427 DeallocateStmt *d = (DeallocateStmt *)node;
1428 POOL_SENT_MESSAGE *msg;
1429
1430 /* DEALLOCATE ALL? */
1431 if (d->name == NULL)
1432 {
1433 pool_setall_node_to_be_sent(query_context);
1434 }
1435 else
1436 {
1437 msg = pool_get_sent_message('Q', d->name, POOL_SENT_MESSAGE_CREATED);
1438 if (!msg)
1439 msg = pool_get_sent_message('P', d->name, POOL_SENT_MESSAGE_CREATED);
1440 if (msg)
1441 {
1442 /* Inherit same map from PREPARE or PARSE */
1443 pool_copy_prep_where(msg->query_context->where_to_send,
1444 query_context->where_to_send);
1445 return;
1446 }
1447 /* prepared statement was not found */
1448 pool_setall_node_to_be_sent(query_context);
1449 }
1450 }
1451
1452 /*
1453 * Returns parse tree for current query.
1454 * Precondition: the query is in progress state.
1455 */
pool_get_parse_tree(void)1456 Node *pool_get_parse_tree(void)
1457 {
1458 POOL_SESSION_CONTEXT *sc;
1459
1460 sc = pool_get_session_context(true);
1461 if (!sc)
1462 return NULL;
1463
1464 if (pool_is_query_in_progress() && sc->query_context)
1465 {
1466 return sc->query_context->parse_tree;
1467 }
1468 return NULL;
1469 }
1470
1471 /*
1472 * Returns raw query string for current query.
1473 * Precondition: the query is in progress state.
1474 */
pool_get_query_string(void)1475 char *pool_get_query_string(void)
1476 {
1477 POOL_SESSION_CONTEXT *sc;
1478
1479 sc = pool_get_session_context(true);
1480 if (!sc)
1481 return NULL;
1482
1483 if (pool_is_query_in_progress() && sc->query_context)
1484 {
1485 return sc->query_context->original_query;
1486 }
1487 return NULL;
1488 }
1489
1490 /*
1491 * Returns true if the query is one of:
1492 *
1493 * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1494 * SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1495 * SET transaction_isolation TO 'serializable'
1496 * SET default_transaction_isolation TO 'serializable'
1497 */
is_set_transaction_serializable(Node * node)1498 bool is_set_transaction_serializable(Node *node)
1499 {
1500 ListCell *list_item;
1501
1502 if (!IsA(node, VariableSetStmt))
1503 return false;
1504
1505 if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1506 (!strcmp(((VariableSetStmt *)node)->name, "transaction_isolation") ||
1507 !strcmp(((VariableSetStmt *)node)->name, "default_transaction_isolation")))
1508 {
1509 List *options = ((VariableSetStmt *)node)->args;
1510 foreach(list_item, options)
1511 {
1512 A_Const *v = (A_Const *)lfirst(list_item);
1513
1514 switch (v->val.type)
1515 {
1516 case T_String:
1517 if (!strcasecmp(v->val.val.str, "serializable"))
1518 return true;
1519 break;
1520 default:
1521 break;
1522 }
1523 }
1524 return false;
1525 }
1526
1527 else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1528 (!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1529 !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1530 {
1531 List *options = ((VariableSetStmt *)node)->args;
1532 foreach(list_item, options)
1533 {
1534 DefElem *opt = (DefElem *) lfirst(list_item);
1535 if (!strcmp("transaction_isolation", opt->defname) ||
1536 !strcmp("default_transaction_isolation", opt->defname))
1537 {
1538 A_Const *v = (A_Const *)opt->arg;
1539
1540 if (!strcasecmp(v->val.val.str, "serializable"))
1541 return true;
1542 }
1543 }
1544 }
1545 return false;
1546 }
1547
1548 /*
1549 * Returns true if SQL is transaction starting command (START
1550 * TRANSACTION or BEGIN)
1551 */
is_start_transaction_query(Node * node)1552 bool is_start_transaction_query(Node *node)
1553 {
1554 TransactionStmt *stmt;
1555
1556 if (node == NULL || !IsA(node, TransactionStmt))
1557 return false;
1558
1559 stmt = (TransactionStmt *)node;
1560 return stmt->kind == TRANS_STMT_START || stmt->kind == TRANS_STMT_BEGIN;
1561 }
1562
1563 /*
1564 * Return true if start transaction query with "READ WRITE" option.
1565 */
is_read_write(TransactionStmt * node)1566 bool is_read_write(TransactionStmt *node)
1567 {
1568 ListCell *list_item;
1569
1570 List *options = node->options;
1571 foreach(list_item, options)
1572 {
1573 DefElem *opt = (DefElem *) lfirst(list_item);
1574
1575 if (!strcmp("transaction_read_only", opt->defname))
1576 {
1577 bool read_only;
1578
1579 read_only = ((A_Const *)opt->arg)->val.val.ival;
1580 if (read_only)
1581 return false; /* TRANSACTION READ ONLY */
1582 else
1583 /*
1584 * TRANSACTION READ WRITE specified. This sounds a little bit strange,
1585 * but actually the parse code works in the way.
1586 */
1587 return true;
1588 }
1589 }
1590
1591 /*
1592 * No TRANSACTION READ ONLY/READ WRITE clause specified.
1593 */
1594 return false;
1595 }
1596
1597 /*
1598 * Return true if start transaction query with "SERIALIZABLE" option.
1599 */
is_serializable(TransactionStmt * node)1600 bool is_serializable(TransactionStmt *node)
1601 {
1602 ListCell *list_item;
1603
1604 List *options = node->options;
1605 foreach(list_item, options)
1606 {
1607 DefElem *opt = (DefElem *) lfirst(list_item);
1608
1609 if (!strcmp("transaction_isolation", opt->defname) &&
1610 IsA(opt->arg, A_Const) &&
1611 ((A_Const *)opt->arg)->val.type == T_String &&
1612 !strcmp("serializable", ((A_Const *)opt->arg)->val.val.str))
1613 return true;
1614 }
1615 return false;
1616 }
1617
1618 /*
1619 * If the query is BEGIN READ WRITE or
1620 * BEGIN ... SERIALIZABLE in master/slave mode,
1621 * we send BEGIN to slaves/standbys instead.
1622 * original_query which is BEGIN READ WRITE is sent to primary.
1623 * rewritten_query which is BEGIN is sent to standbys.
1624 */
pool_need_to_treat_as_if_default_transaction(POOL_QUERY_CONTEXT * query_context)1625 bool pool_need_to_treat_as_if_default_transaction(POOL_QUERY_CONTEXT *query_context)
1626 {
1627 return (MASTER_SLAVE &&
1628 is_start_transaction_query(query_context->parse_tree) &&
1629 (is_read_write((TransactionStmt *)query_context->parse_tree) ||
1630 is_serializable((TransactionStmt *)query_context->parse_tree)));
1631 }
1632
1633 /*
1634 * Return true if the query is SAVEPOINT related query.
1635 */
is_savepoint_query(Node * node)1636 bool is_savepoint_query(Node *node)
1637 {
1638 if (((TransactionStmt *)node)->kind == TRANS_STMT_SAVEPOINT ||
1639 ((TransactionStmt *)node)->kind == TRANS_STMT_ROLLBACK_TO ||
1640 ((TransactionStmt *)node)->kind == TRANS_STMT_RELEASE)
1641 return true;
1642
1643 return false;
1644 }
1645
1646 /*
1647 * Return true if the query is 2PC transaction query.
1648 */
is_2pc_transaction_query(Node * node)1649 bool is_2pc_transaction_query(Node *node)
1650 {
1651 if (((TransactionStmt *)node)->kind == TRANS_STMT_PREPARE ||
1652 ((TransactionStmt *)node)->kind == TRANS_STMT_COMMIT_PREPARED ||
1653 ((TransactionStmt *)node)->kind == TRANS_STMT_ROLLBACK_PREPARED)
1654 return true;
1655
1656 return false;
1657 }
1658
1659 /*
1660 * Set query state, if a current state is before it than the specified state.
1661 */
pool_set_query_state(POOL_QUERY_CONTEXT * query_context,POOL_QUERY_STATE state)1662 void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state)
1663 {
1664 int i;
1665
1666 CHECK_QUERY_CONTEXT_IS_VALID;
1667
1668 for (i = 0; i < NUM_BACKENDS; i++)
1669 {
1670 if (query_context->where_to_send[i] &&
1671 statecmp(query_context->query_state[i], state) < 0)
1672 query_context->query_state[i] = state;
1673 }
1674 }
1675
1676 /*
1677 * Return -1, 0 or 1 according to s1 is "before, equal or after" s2 in terms of state
1678 * transition order.
1679 * The State transition order is defined as: UNPARSED < PARSE_COMPLETE < BIND_COMPLETE < EXECUTE_COMPLETE
1680 */
statecmp(POOL_QUERY_STATE s1,POOL_QUERY_STATE s2)1681 int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2)
1682 {
1683 int ret;
1684
1685 switch (s2) {
1686 case POOL_UNPARSED:
1687 ret = (s1 == s2) ? 0 : 1;
1688 break;
1689 case POOL_PARSE_COMPLETE:
1690 if (s1 == POOL_UNPARSED)
1691 ret = -1;
1692 else
1693 ret = (s1 == s2) ? 0 : 1;
1694 break;
1695 case POOL_BIND_COMPLETE:
1696 if (s1 == POOL_UNPARSED || s1 == POOL_PARSE_COMPLETE)
1697 ret = -1;
1698 else
1699 ret = (s1 == s2) ? 0 : 1;
1700 break;
1701 case POOL_EXECUTE_COMPLETE:
1702 ret = (s1 == s2) ? 0 : -1;
1703 break;
1704 default:
1705 ret = -2;
1706 break;
1707 }
1708
1709 return ret;
1710 }
1711
1712 /*
1713 * Remove READ WRITE option from the packet of START TRANSACTION command.
1714 * To free the return value is required.
1715 */
1716 static
remove_read_write(int len,const char * contents,int * rewritten_len)1717 char* remove_read_write(int len, const char* contents, int *rewritten_len)
1718 {
1719 char *rewritten_query;
1720 char *rewritten_contents;
1721 const char *name;
1722 const char *stmt;
1723
1724 rewritten_query = "BEGIN";
1725 name = contents;
1726 stmt = contents + strlen(name) + 1;
1727
1728 *rewritten_len = len - strlen(stmt) + strlen(rewritten_query);
1729 if (len < *rewritten_len)
1730 {
1731 ereport(ERROR,
1732 (errmsg("invalid message length of transaction packet")));
1733 }
1734
1735 rewritten_contents = palloc(*rewritten_len);
1736
1737 strcpy(rewritten_contents, name);
1738 strcpy(rewritten_contents + strlen(name) + 1, rewritten_query);
1739 memcpy(rewritten_contents + strlen(name) + strlen(rewritten_query) + 2,
1740 stmt + strlen(stmt) + 1,
1741 len - (strlen(name) + strlen(stmt) + 2));
1742
1743 return rewritten_contents;
1744 }
1745
1746 /*
1747 * Return true if current query is safe to cache.
1748 */
pool_is_cache_safe(void)1749 bool pool_is_cache_safe(void)
1750 {
1751 POOL_SESSION_CONTEXT *sc;
1752
1753 sc = pool_get_session_context(true);
1754 if (!sc)
1755 return false;
1756
1757 if (pool_is_query_in_progress() && sc->query_context)
1758 {
1759 return sc->query_context->is_cache_safe;
1760 }
1761 return false;
1762 }
1763
1764 /*
1765 * Set safe to cache.
1766 */
pool_set_cache_safe(void)1767 void pool_set_cache_safe(void)
1768 {
1769 POOL_SESSION_CONTEXT *sc;
1770
1771 sc = pool_get_session_context(true);
1772 if (!sc)
1773 return;
1774
1775 if (sc->query_context)
1776 {
1777 sc->query_context->is_cache_safe = true;
1778 }
1779 }
1780
1781 /*
1782 * Unset safe to cache.
1783 */
pool_unset_cache_safe(void)1784 void pool_unset_cache_safe(void)
1785 {
1786 POOL_SESSION_CONTEXT *sc;
1787
1788 sc = pool_get_session_context(true);
1789 if (!sc)
1790 return;
1791
1792 if (sc->query_context)
1793 {
1794 sc->query_context->is_cache_safe = false;
1795 }
1796 }
1797
1798 /*
1799 * Return true if current temporary query cache is exceeded
1800 */
pool_is_cache_exceeded(void)1801 bool pool_is_cache_exceeded(void)
1802 {
1803 POOL_SESSION_CONTEXT *sc;
1804
1805 sc = pool_get_session_context(true);
1806 if (!sc)
1807 return false;
1808
1809 if (pool_is_query_in_progress() && sc->query_context)
1810 {
1811 if (sc->query_context->temp_cache)
1812 return sc->query_context->temp_cache->is_exceeded;
1813 return true;
1814 }
1815 return false;
1816 }
1817
1818 /*
1819 * Set current temporary query cache is exceeded
1820 */
pool_set_cache_exceeded(void)1821 void pool_set_cache_exceeded(void)
1822 {
1823 POOL_SESSION_CONTEXT *sc;
1824
1825 sc = pool_get_session_context(true);
1826 if (!sc)
1827 return;
1828
1829 if (sc->query_context && sc->query_context->temp_cache)
1830 {
1831 sc->query_context->temp_cache->is_exceeded = true;
1832 }
1833 }
1834
1835 /*
1836 * Unset current temporary query cache is exceeded
1837 */
pool_unset_cache_exceeded(void)1838 void pool_unset_cache_exceeded(void)
1839 {
1840 POOL_SESSION_CONTEXT *sc;
1841
1842 sc = pool_get_session_context(true);
1843 if (!sc)
1844 return;
1845
1846 if (sc->query_context && sc->query_context->temp_cache)
1847 {
1848 sc->query_context->temp_cache->is_exceeded = false;
1849 }
1850 }
1851
1852 /*
1853 * Return true if one of followings is true
1854 *
1855 * SET transaction_read_only TO on
1856 * SET TRANSACTION READ ONLY
1857 * SET TRANSACTION CHARACTERISTICS AS TRANSACTION READ ONLY
1858 *
1859 * Note that if the node is not a variable statement, returns false.
1860 */
pool_is_transaction_read_only(Node * node)1861 bool pool_is_transaction_read_only(Node *node)
1862 {
1863 ListCell *list_item;
1864 bool ret = false;
1865
1866 if (!IsA(node, VariableSetStmt))
1867 return ret;
1868
1869 /*
1870 * SET transaction_read_only TO on
1871 */
1872 if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1873 !strcmp(((VariableSetStmt *)node)->name, "transaction_read_only"))
1874 {
1875 List *options = ((VariableSetStmt *)node)->args;
1876 foreach(list_item, options)
1877 {
1878 A_Const *v = (A_Const *)lfirst(list_item);
1879
1880 switch (v->val.type)
1881 {
1882 case T_String:
1883 if (!strcasecmp(v->val.val.str, "on") ||
1884 !strcasecmp(v->val.val.str, "t") ||
1885 !strcasecmp(v->val.val.str, "true"))
1886 ret = true;
1887 break;
1888 case T_Integer:
1889 if (v->val.val.ival)
1890 ret = true;
1891 default:
1892 break;
1893 }
1894 }
1895 }
1896
1897 /*
1898 * SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY
1899 * SET TRANSACTION READ ONLY
1900 */
1901 else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1902 (!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1903 !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1904 {
1905 List *options = ((VariableSetStmt *)node)->args;
1906 foreach(list_item, options)
1907 {
1908 DefElem *opt = (DefElem *) lfirst(list_item);
1909
1910 if (!strcmp("transaction_read_only", opt->defname))
1911 {
1912 bool read_only;
1913
1914 read_only = ((A_Const *)opt->arg)->val.val.ival;
1915 if (read_only)
1916 {
1917 ret = true;
1918 break;
1919 }
1920 }
1921 }
1922 }
1923 return ret;
1924 }
1925