1 /* -*-pgsql-c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2020 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 */
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "pool_config.h"
30 #include "protocol/pool_proto_modules.h"
31 #include "protocol/pool_process_query.h"
32 #include "protocol/pool_connection_pool.h"
33 #include "protocol/pool_pg_utils.h"
34 #include "context/pool_session_context.h"
35
36 static POOL_SESSION_CONTEXT session_context_d;
37 static POOL_SESSION_CONTEXT * session_context = NULL;
38 static void GetTranIsolationErrorCb(void *arg);
39 static void init_sent_message_list(void);
40 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * messag);
41 static void dump_sent_message(char *caller, POOL_SENT_MESSAGE * m);
42 static void dml_adaptive_init(void);
43 static void dml_adaptive_destroy(void);
44
45 #ifdef PENDING_MESSAGE_DEBUG
46 static int Elevel = LOG;
47 #else
48 static int Elevel = DEBUG2;
49 #endif
50
51 /*
52 * Initialize per session context
53 */
54 void
pool_init_session_context(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)55 pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
56 {
57 session_context = &session_context_d;
58 ProcessInfo *process_info;
59 int node_id;
60 int i;
61
62 /* Clear session context memory */
63 memset(&session_context_d, 0, sizeof(session_context_d));
64
65 /* Get Process context */
66 session_context->process_context = pool_get_process_context();
67 if (!session_context->process_context)
68 ereport(ERROR,
69 (errmsg("failed to get process context")));
70
71 /* Set connection info */
72 session_context->frontend = frontend;
73 session_context->backend = backend;
74
75 /* Initialize query context */
76 session_context->query_context = NULL;
77
78 /* Initialize local session id */
79 pool_incremnet_local_session_id();
80
81 /* Create memory context */
82 /* TODO re-think about the parent for this context ?? */
83 session_context->memory_context = AllocSetContextCreate(ProcessLoopContext,
84 "SessionContext",
85 ALLOCSET_SMALL_MINSIZE,
86 ALLOCSET_SMALL_INITSIZE,
87 ALLOCSET_SMALL_MAXSIZE);
88
89 /* Initialize sent message list */
90 init_sent_message_list();
91
92 process_info = pool_get_my_process_info();
93
94 if (!process_info)
95 ereport(ERROR,
96 (errmsg("failed to get process info for current process")));
97
98 /* Choose load balancing node if necessary */
99 if (pool_config->load_balance_mode)
100 {
101 node_id = select_load_balancing_node();
102 }
103 else
104 {
105 node_id = SL_MODE ? PRIMARY_NODE_ID : MAIN_NODE_ID;
106 }
107
108 session_context->load_balance_node_id = node_id;
109
110 for (i = 0; i < NUM_BACKENDS; i++)
111 {
112 pool_coninfo(session_context->process_context->proc_id,
113 pool_pool_index(), i)->load_balancing_node = node_id;
114 }
115
116 ereport(DEBUG5,
117 (errmsg("initializing session context"),
118 errdetail("selected load balancing node: %d", node_id)));
119
120 /* Unset query is in progress */
121 pool_unset_query_in_progress();
122
123 /* The command in progress has not succeeded yet */
124 pool_unset_command_success();
125
126 /* We don't have a write query in this transaction yet */
127 pool_unset_writing_transaction();
128
129 /* Error doesn't occur in this transaction yet */
130 pool_unset_failed_transaction();
131
132 /* Forget transaction isolation mode */
133 pool_unset_transaction_isolation();
134
135 /* We don't skip reading from backends */
136 pool_unset_skip_reading_from_backends();
137
138 /* Backends have not ignored messages yet */
139 pool_unset_ignore_till_sync();
140
141 /* Unset suspend reading from frontend flag */
142 pool_unset_suspend_reading_from_frontend();
143
144 /* Initialize where to send map for PREPARE statements */
145 #ifdef NOT_USED
146 memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
147 session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS;
148 #endif /* NOT_USED */
149
150 /*
151 * Reset flag to indicate difference in number of affected tuples in
152 * UPDATE/DELETE.
153 */
154 session_context->mismatch_ntuples = false;
155
156 if (pool_config->memory_cache_enabled)
157 {
158 session_context->query_cache_array = pool_create_query_cache_array();
159 session_context->num_selects = 0;
160 }
161
162 /* Initialize pending message list */
163 pool_pending_messages_init();
164
165 /* Initialize previous pending message */
166 pool_pending_message_reset_previous_message();
167
168 /* Initialize temp tables */
169 pool_temp_tables_init();
170
171 #ifdef NOT_USED
172 /* Initialize preferred main node id */
173 pool_reset_preferred_main_node_id();
174 #endif
175
176 /* Snapshot isolation state */
177 session_context->si_state = SI_NO_SNAPSHOT;
178
179 /* Transaction read only */
180 session_context->transaction_read_only = false;
181
182 dml_adaptive_init();
183 }
184
185 /*
186 * Destroy session context.
187 */
188 void
pool_session_context_destroy(void)189 pool_session_context_destroy(void)
190 {
191 if (session_context)
192 {
193 pool_clear_sent_message_list();
194 pfree(session_context->message_list.sent_messages);
195 if (pool_config->memory_cache_enabled)
196 {
197 pool_discard_query_cache_array(session_context->query_cache_array);
198 session_context->num_selects = 0;
199 }
200
201 if (session_context->query_context)
202 pool_query_context_destroy(session_context->query_context);
203 MemoryContextDelete(session_context->memory_context);
204
205 dml_adaptive_destroy();
206 }
207 /* XXX For now, just zap memory */
208 memset(&session_context_d, 0, sizeof(session_context_d));
209 session_context = NULL;
210 }
211
212 /*
213 * Return session context
214 */
215 POOL_SESSION_CONTEXT *
pool_get_session_context(bool noerror)216 pool_get_session_context(bool noerror)
217 {
218 if (!session_context && !noerror)
219 {
220 ereport(FATAL,
221 (return_code(2),
222 errmsg("unable to get session context")));
223 }
224 return session_context;
225 }
226
227 /*
228 * Return local session id
229 */
230 int
pool_get_local_session_id(void)231 pool_get_local_session_id(void)
232 {
233 return pool_get_session_context(false)->process_context->local_session_id;
234 }
235
236 /*
237 * Return true if query is in progress
238 */
239 bool
pool_is_query_in_progress(void)240 pool_is_query_in_progress(void)
241 {
242 return pool_get_session_context(false)->in_progress;
243 }
244
245 /*
246 * Set query is in progress
247 */
248 void
pool_set_query_in_progress(void)249 pool_set_query_in_progress(void)
250 {
251 ereport(DEBUG5,
252 (errmsg("session context: setting query in progress. DONE")));
253
254 pool_get_session_context(false)->in_progress = true;
255 }
256
257 /*
258 * Unset query is in progress
259 */
260 void
pool_unset_query_in_progress(void)261 pool_unset_query_in_progress(void)
262 {
263 POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
264
265 ereport(DEBUG5,
266 (errmsg("session context: unsetting query in progress. DONE")));
267
268 s->in_progress = false;
269
270 /* Restore where_to_send map if necessary */
271 if (s->need_to_restore_where_to_send)
272 {
273 memcpy(s->query_context->where_to_send, s->where_to_send_save, sizeof(s->where_to_send_save));
274 }
275 s->need_to_restore_where_to_send = false;
276 }
277
278 /*
279 * Return true if we skip reading from backends
280 */
281 bool
pool_is_skip_reading_from_backends(void)282 pool_is_skip_reading_from_backends(void)
283 {
284 return pool_get_session_context(false)->skip_reading_from_backends;
285 }
286
287 /*
288 * Set skip_reading_from_backends
289 */
290 void
pool_set_skip_reading_from_backends(void)291 pool_set_skip_reading_from_backends(void)
292 {
293 ereport(DEBUG5,
294 (errmsg("session context: setting skip reading from backends. DONE")));
295
296
297 pool_get_session_context(false)->skip_reading_from_backends = true;
298 }
299
300 /*
301 * Unset skip_reading_from_backends
302 */
303 void
pool_unset_skip_reading_from_backends(void)304 pool_unset_skip_reading_from_backends(void)
305 {
306 ereport(DEBUG5,
307 (errmsg("session context: clearing skip reading from backends. DONE")));
308
309 pool_get_session_context(false)->skip_reading_from_backends = false;
310 }
311
312 /*
313 * Return true if we are doing extended query message
314 */
315 bool
pool_is_doing_extended_query_message(void)316 pool_is_doing_extended_query_message(void)
317 {
318 return pool_get_session_context(false)->doing_extended_query_message;
319 }
320
321 /*
322 * Set doing_extended_query_message
323 */
324 void
pool_set_doing_extended_query_message(void)325 pool_set_doing_extended_query_message(void)
326 {
327 ereport(DEBUG5,
328 (errmsg("session context: setting doing extended query messaging. DONE")));
329
330 pool_get_session_context(false)->doing_extended_query_message = true;
331 }
332
333 /*
334 * Unset doing_extended_query_message
335 */
336 void
pool_unset_doing_extended_query_message(void)337 pool_unset_doing_extended_query_message(void)
338 {
339 ereport(DEBUG5,
340 (errmsg("session context: clearing doing extended query messaging. DONE")));
341
342 pool_get_session_context(false)->doing_extended_query_message = false;
343 }
344
345 /*
346 * Return true if backends ignore extended query message
347 */
348 bool
pool_is_ignore_till_sync(void)349 pool_is_ignore_till_sync(void)
350 {
351 return pool_get_session_context(false)->ignore_till_sync;
352 }
353
354 /*
355 * Set ignore_till_sync
356 */
357 void
pool_set_ignore_till_sync(void)358 pool_set_ignore_till_sync(void)
359 {
360 ereport(DEBUG5,
361 (errmsg("session context: setting ignore till sync. DONE")));
362
363 pool_get_session_context(false)->ignore_till_sync = true;
364 }
365
366 /*
367 * Unset ignore_till_sync
368 */
369 void
pool_unset_ignore_till_sync(void)370 pool_unset_ignore_till_sync(void)
371 {
372 ereport(DEBUG5,
373 (errmsg("session context: clearing ignore till sync. DONE")));
374
375 pool_get_session_context(false)->ignore_till_sync = false;
376 }
377
378 /*
379 * Remove a sent message
380 */
381 bool
pool_remove_sent_message(char kind,const char * name)382 pool_remove_sent_message(char kind, const char *name)
383 {
384 int i;
385 POOL_SENT_MESSAGE_LIST *msglist;
386
387 if (kind == 0 || name == NULL)
388 return false;
389
390 msglist = &pool_get_session_context(false)->message_list;
391
392 for (i = 0; i < msglist->size; i++)
393 {
394 if (msglist->sent_messages[i]->kind == kind &&
395 !strcmp(msglist->sent_messages[i]->name, name))
396 {
397 pool_sent_message_destroy(msglist->sent_messages[i]);
398 break;
399 }
400 }
401
402 /* sent message not found */
403 if (i == msglist->size)
404 return false;
405
406 if (i != msglist->size - 1)
407 {
408 memmove(&msglist->sent_messages[i], &msglist->sent_messages[i + 1],
409 sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1));
410 }
411
412 msglist->size--;
413
414 return true;
415 }
416
417 /*
418 * Remove same kind of sent messages
419 */
420 void
pool_remove_sent_messages(char kind)421 pool_remove_sent_messages(char kind)
422 {
423 int i;
424 POOL_SENT_MESSAGE_LIST *msglist;
425
426 msglist = &pool_get_session_context(false)->message_list;
427
428 for (i = 0; i < msglist->size; i++)
429 {
430 if (msglist->sent_messages[i]->kind == kind)
431 {
432 if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name))
433 i--; /* for relocation by removing */
434 }
435 }
436 }
437
438 /*
439 * Destroy sent message
440 */
441 void
pool_sent_message_destroy(POOL_SENT_MESSAGE * message)442 pool_sent_message_destroy(POOL_SENT_MESSAGE * message)
443 {
444 bool in_progress;
445 POOL_QUERY_CONTEXT *qc = NULL;
446
447 dump_sent_message("pool_sent_message_destroy", message);
448
449 in_progress = pool_is_query_in_progress();
450
451 if (message)
452 {
453 if (message->contents)
454 pfree(message->contents);
455
456 if (message->name)
457 pfree(message->name);
458
459 if (message->query_context)
460 {
461 if (session_context->query_context != message->query_context)
462 qc = session_context->query_context;
463
464 if (can_query_context_destroy(message->query_context))
465 {
466 pool_query_context_destroy(message->query_context);
467
468 /*
469 * set in_progress flag, because pool_query_context_destroy()
470 * unsets in_progress flag
471 */
472 if (in_progress)
473 pool_set_query_in_progress();
474
475 /*
476 * set query_context of session_context, because
477 * pool_query_context_destroy() sets it to NULL.
478 */
479 if (qc)
480 session_context->query_context = qc;
481 }
482 }
483
484 if (session_context->memory_context)
485 pfree(message);
486 }
487 }
488
489 /*
490 * Clear sent message list
491 */
492 void
pool_clear_sent_message_list(void)493 pool_clear_sent_message_list(void)
494 {
495 POOL_SENT_MESSAGE_LIST *msglist;
496
497 msglist = &pool_get_session_context(false)->message_list;
498
499 while (msglist->size > 0)
500 {
501 pool_remove_sent_messages(msglist->sent_messages[0]->kind);
502 }
503 }
504
505 /*
506 * Zap query context info in sent messages to indicate that the query context
507 * has been already removed.
508 */
509 void
pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT * query_context)510 pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT *query_context)
511 {
512 int i;
513 POOL_SENT_MESSAGE_LIST *msglist;
514
515 msglist = &pool_get_session_context(false)->message_list;
516
517 for (i = 0; i < msglist->size; i++)
518 {
519 elog(DEBUG5, "checking zapping sent message: %p query_context: %p",
520 &msglist->sent_messages[i], msglist->sent_messages[i]->query_context);
521 if (msglist->sent_messages[i]->query_context == query_context)
522 {
523 msglist->sent_messages[i]->query_context = NULL;
524 elog(DEBUG5, "Zap sent message: %p", &msglist->sent_messages[i]);
525 }
526 }
527 }
528
529 static void
dump_sent_message(char * caller,POOL_SENT_MESSAGE * m)530 dump_sent_message(char *caller, POOL_SENT_MESSAGE * m)
531 {
532 ereport(DEBUG5,
533 (errmsg("called by %s: sent message: address: %p kind: %c name: =%s= state:%d",
534 caller, m, m->kind, m->name, m->state)));
535 }
536
537 static void
dml_adaptive_init(void)538 dml_adaptive_init(void)
539 {
540 if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
541 {
542 session_context->is_in_transaction = false;
543 session_context->transaction_temp_write_list = NIL;
544 }
545 }
546
547 static void
dml_adaptive_destroy(void)548 dml_adaptive_destroy(void)
549 {
550 if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
551 {
552 if (session_context->transaction_temp_write_list != NIL)
553 list_free_deep(session_context->transaction_temp_write_list);
554 }
555 }
556
557 /*
558 * Create a sent message.
559 * kind: one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE)
560 * len: message length in host order
561 * contents: message contents
562 * num_tsparams: number of timestamp parameters
563 * name: prepared statement name or portal name
564 */
565 POOL_SENT_MESSAGE *
pool_create_sent_message(char kind,int len,char * contents,int num_tsparams,const char * name,POOL_QUERY_CONTEXT * query_context)566 pool_create_sent_message(char kind, int len, char *contents,
567 int num_tsparams, const char *name,
568 POOL_QUERY_CONTEXT * query_context)
569 {
570 POOL_SENT_MESSAGE *msg;
571
572 if (!session_context)
573 ereport(ERROR,
574 (errmsg("unable to create message"),
575 errdetail("cannot get the session context")));
576
577 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
578
579 msg = palloc(sizeof(POOL_SENT_MESSAGE));
580 msg->kind = kind;
581 msg->len = len;
582 msg->contents = palloc(len);
583 memcpy(msg->contents, contents, len);
584 msg->state = POOL_SENT_MESSAGE_CREATED;
585 msg->num_tsparams = num_tsparams;
586 msg->name = pstrdup(name);
587 msg->query_context = query_context;
588 MemoryContextSwitchTo(old_context);
589
590 return msg;
591 }
592
593 /*
594 * Add a sent message to sent message list
595 */
596 void
pool_add_sent_message(POOL_SENT_MESSAGE * message)597 pool_add_sent_message(POOL_SENT_MESSAGE * message)
598 {
599 POOL_SENT_MESSAGE *old_msg;
600 POOL_SENT_MESSAGE_LIST *msglist;
601
602 dump_sent_message("pool_add_sent_message", message);
603
604 if (!message)
605 {
606 ereport(DEBUG5,
607 (errmsg("adding sent message to list"),
608 errdetail("message is null")));
609 return;
610 }
611
612 old_msg = pool_get_sent_message(message->kind, message->name, POOL_SENT_MESSAGE_CREATED);
613
614 if (old_msg == message)
615 {
616 /*
617 * It is likely caller tries to add the exact same message previously
618 * added. We should ignore this because pool_remove_sent_message()
619 * will free memory allocated in the message.
620 */
621 ereport(DEBUG5,
622 (errmsg("adding sent message to list"),
623 errdetail("adding exactly same message is prohibited")));
624 return;
625 }
626
627 msglist = &session_context->message_list;
628
629 if (old_msg)
630 {
631 if (message->kind == 'B')
632 ereport(DEBUG5,
633 (errmsg("adding sent message to list"),
634 errdetail("portal \"%s\" already exists", message->name)));
635 else
636 ereport(DEBUG5,
637 (errmsg("adding sent message to list"),
638 errdetail("prepared statement \"%s\" already exists", message->name)));
639
640 if (*message->name == '\0')
641 pool_remove_sent_message(old_msg->kind, old_msg->name);
642 else
643 return;
644 }
645
646 if (msglist->size == msglist->capacity)
647 {
648 msglist->capacity *= 2;
649
650 MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
651
652 msglist->sent_messages = repalloc(msglist->sent_messages,
653 sizeof(POOL_SENT_MESSAGE *) * msglist->capacity);
654
655 MemoryContextSwitchTo(oldContext);
656 }
657
658 msglist->sent_messages[msglist->size++] = message;
659 }
660
661 /*
662 * Find a sent message by kind and name.
663 */
664 POOL_SENT_MESSAGE *
pool_get_sent_message(char kind,const char * name,POOL_SENT_MESSAGE_STATE state)665 pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state)
666 {
667 int i;
668 POOL_SENT_MESSAGE_LIST *msglist;
669
670 msglist = &pool_get_session_context(false)->message_list;
671
672 if (kind == 0 || name == NULL)
673 return NULL;
674
675 for (i = 0; i < msglist->size; i++)
676 {
677 if (msglist->sent_messages[i]->kind == kind &&
678 !strcmp(msglist->sent_messages[i]->name, name) &&
679 msglist->sent_messages[i]->state == state)
680 return msglist->sent_messages[i];
681 }
682
683 return NULL;
684 }
685
686 /*
687 * Find a sent message by query context.
688 */
689 POOL_SENT_MESSAGE *
pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)690 pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)
691 {
692 int i;
693 POOL_SENT_MESSAGE_LIST *msglist;
694
695 msglist = &pool_get_session_context(false)->message_list;
696
697 if (query_context == NULL)
698 return NULL;
699
700 for (i = 0; i < msglist->size; i++)
701 {
702 if (msglist->sent_messages[i]->query_context == query_context)
703 return msglist->sent_messages[i];
704 }
705
706 return NULL;
707 }
708
709 /*
710 * Set message state to POOL_SENT_MESSAGE_STATE to POOL_SENT_MESSAGE_CLOSED.
711 */
712 void
pool_set_sent_message_state(POOL_SENT_MESSAGE * message)713 pool_set_sent_message_state(POOL_SENT_MESSAGE * message)
714 {
715 ereport(DEBUG5,
716 (errmsg("pool_set_sent_message_state: name:%s kind:%c previous state: %d",
717 message->name, message->kind, message->state)));
718 message->state = POOL_SENT_MESSAGE_CLOSED;
719 }
720
721 /*
722 * We don't have a write query in this transaction yet.
723 */
724 void
pool_unset_writing_transaction(void)725 pool_unset_writing_transaction(void)
726 {
727 /*
728 * If disable_transaction_on_write is 'always', then never turn off
729 * writing transaction flag.
730 */
731 if (pool_config->disable_load_balance_on_write != DLBOW_ALWAYS)
732 {
733 pool_get_session_context(false)->writing_transaction = false;
734 ereport(DEBUG5,
735 (errmsg("session context: clearing writing transaction. DONE")));
736 }
737 }
738
739 /*
740 * We have a write query in this transaction.
741 */
742 void
pool_set_writing_transaction(void)743 pool_set_writing_transaction(void)
744 {
745 /*
746 * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never turn on writing
747 * transaction flag.
748 */
749 if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
750 {
751 pool_get_session_context(false)->writing_transaction = true;
752 ereport(DEBUG5,
753 (errmsg("session context: setting writing transaction. DONE")));
754 }
755 }
756
757 /*
758 * Do we have a write query in this transaction?
759 */
760 bool
pool_is_writing_transaction(void)761 pool_is_writing_transaction(void)
762 {
763 return pool_get_session_context(false)->writing_transaction;
764 }
765
766 /*
767 * Error doesn't occur in this transaction yet.
768 */
769 void
pool_unset_failed_transaction(void)770 pool_unset_failed_transaction(void)
771 {
772 ereport(DEBUG5,
773 (errmsg("session context: clearing failed transaction. DONE")));
774
775 pool_get_session_context(false)->failed_transaction = false;
776 }
777
778 /*
779 * Error occurred in this transaction.
780 */
781 void
pool_set_failed_transaction(void)782 pool_set_failed_transaction(void)
783 {
784 ereport(DEBUG5,
785 (errmsg("session context: setting failed transaction. DONE")));
786
787 pool_get_session_context(false)->failed_transaction = true;
788 }
789
790 /*
791 * Did error occur in this transaction?
792 */
793 bool
pool_is_failed_transaction(void)794 pool_is_failed_transaction(void)
795 {
796 return pool_get_session_context(false)->failed_transaction;
797 }
798
799 /*
800 * Forget transaction isolation mode
801 */
802 void
pool_unset_transaction_isolation(void)803 pool_unset_transaction_isolation(void)
804 {
805 ereport(DEBUG5,
806 (errmsg("session context: clearing failed transaction. DONE")));
807 pool_get_session_context(false)->transaction_isolation = POOL_UNKNOWN;
808 }
809
810 /*
811 * Set transaction isolation mode
812 */
813 void
pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)814 pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)
815 {
816 ereport(DEBUG5,
817 (errmsg("session context: setting transaction isolation. DONE")));
818 pool_get_session_context(false)->transaction_isolation = isolation_level;
819 }
820
821 /*
822 * Get or return cached transaction isolation mode
823 */
824 POOL_TRANSACTION_ISOLATION
pool_get_transaction_isolation(void)825 pool_get_transaction_isolation(void)
826 {
827 POOL_SELECT_RESULT *res;
828 POOL_TRANSACTION_ISOLATION ret;
829 ErrorContextCallback callback;
830
831 if (!session_context)
832 {
833 ereport(WARNING,
834 (errmsg("error while getting transaction isolation, session context is not initialized")));
835 return POOL_UNKNOWN;
836 }
837
838 /* It seems cached result is usable. Return it. */
839 if (session_context->transaction_isolation != POOL_UNKNOWN)
840 return session_context->transaction_isolation;
841
842 /*
843 * Register a error context callback to throw proper context message
844 */
845 callback.callback = GetTranIsolationErrorCb;
846 callback.arg = NULL;
847 callback.previous = error_context_stack;
848 error_context_stack = &callback;
849
850 /* No cached data is available. Ask backend. */
851
852 do_query(MAIN(session_context->backend),
853 "SELECT current_setting('transaction_isolation')", &res, MAJOR(session_context->backend));
854
855 error_context_stack = callback.previous;
856
857 if (res->numrows <= 0)
858 {
859 ereport(WARNING,
860 (errmsg("error while getting transaction isolation, do_query returns no rows")));
861 free_select_result(res);
862 return POOL_UNKNOWN;
863 }
864 if (res->data[0] == NULL)
865 {
866 ereport(WARNING,
867 (errmsg("error while getting transaction isolation, do_query returns no data")));
868
869 free_select_result(res);
870 return POOL_UNKNOWN;
871 }
872 if (res->nullflags[0] == -1)
873 {
874 ereport(WARNING,
875 (errmsg("error while getting transaction isolation, do_query returns NULL")));
876 free_select_result(res);
877 return POOL_UNKNOWN;
878 }
879
880 if (!strcmp(res->data[0], "read uncommitted"))
881 ret = POOL_READ_UNCOMMITTED;
882 else if (!strcmp(res->data[0], "read committed"))
883 ret = POOL_READ_COMMITTED;
884 else if (!strcmp(res->data[0], "repeatable read"))
885 ret = POOL_REPEATABLE_READ;
886 else if (!strcmp(res->data[0], "serializable"))
887 ret = POOL_SERIALIZABLE;
888 else
889 {
890 ereport(WARNING,
891 (errmsg("error while getting transaction isolation, unknown transaction isolation level:%s", res->data[0])));
892
893 ret = POOL_UNKNOWN;
894 }
895
896 free_select_result(res);
897
898 if (ret != POOL_UNKNOWN)
899 session_context->transaction_isolation = ret;
900
901 return ret;
902 }
903
904 static void
GetTranIsolationErrorCb(void * arg)905 GetTranIsolationErrorCb(void *arg)
906 {
907 errcontext("while getting transaction isolation");
908 }
909
910
911 /*
912 * The command in progress has not succeeded yet.
913 */
914 void
pool_unset_command_success(void)915 pool_unset_command_success(void)
916 {
917 ereport(DEBUG5,
918 (errmsg("session context: clearing transaction isolation. DONE")));
919 pool_get_session_context(false)->command_success = false;
920 }
921
922 /*
923 * The command in progress has succeeded.
924 */
925 void
pool_set_command_success(void)926 pool_set_command_success(void)
927 {
928 ereport(DEBUG5,
929 (errmsg("session context: setting command success. DONE")));
930
931 pool_get_session_context(false)->command_success = true;
932 }
933
934 /*
935 * Has the command in progress succeeded?
936 */
937 bool
pool_is_command_success(void)938 pool_is_command_success(void)
939 {
940 return pool_get_session_context(false)->command_success;
941 }
942
943 /*
944 * Copy send map
945 */
946 void
pool_copy_prep_where(bool * src,bool * dest)947 pool_copy_prep_where(bool *src, bool *dest)
948 {
949 memcpy(dest, src, sizeof(bool) * MAX_NUM_BACKENDS);
950 }
951 #ifdef NOT_USED
952 /*
953 * Add to send map a PREPARED statement
954 */
955 void
pool_add_prep_where(char * name,bool * map)956 pool_add_prep_where(char *name, bool *map)
957 {
958 int i;
959
960 if (!session_context)
961 {
962 ereport(ERROR,
963 (errmsg("pool_add_prep_where: session context is not initialized")));
964 return;
965 }
966
967 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
968 {
969 if (*session_context->prep_where.name[i] == '\0')
970 {
971 strncpy(session_context->prep_where.name[i], name, POOL_MAX_PREPARED_NAME);
972 pool_copy_prep_where(map, session_context->prep_where.where_to_send[i]);
973 return;
974 }
975 }
976 ereport(ERROR,
977 (errmsg("pool_add_prep_where: no empty slot found")));
978 }
979
980 /*
981 * Search send map by PREPARED statement name
982 */
983 bool *
pool_get_prep_where(char * name)984 pool_get_prep_where(char *name)
985 {
986 int i;
987
988 if (!session_context)
989 ereport(ERROR,
990 (errmsg("pool_get_prep_where: session context is not initialized")));
991
992
993 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
994 {
995 if (!strcmp(session_context->prep_where.name[i], name))
996 {
997 return session_context->prep_where.where_to_send[i];
998 }
999 }
1000 return NULL;
1001 }
1002
1003 /*
1004 * Remove PREPARED statement by name
1005 */
1006 void
pool_delete_prep_where(char * name)1007 pool_delete_prep_where(char *name)
1008 {
1009 int i;
1010
1011 if (!session_context)
1012 ereport(ERROR,
1013 (errmsg("pool_delete_prep_where: session context is not initialized")));
1014
1015 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
1016 {
1017 if (!strcmp(session_context->prep_where.name[i], name))
1018 {
1019 memset(&session_context->prep_where.where_to_send[i], 0, sizeof(bool) * MAX_NUM_BACKENDS);
1020 *session_context->prep_where.name[i] = '\0';
1021 return;
1022 }
1023 }
1024 }
1025 #endif /* NOT_USED */
1026 /*
1027 * Initialize sent message list
1028 */
1029 static void
init_sent_message_list(void)1030 init_sent_message_list(void)
1031 {
1032 POOL_SENT_MESSAGE_LIST *msglist;
1033
1034 msglist = &session_context->message_list;
1035 msglist->size = 0;
1036 msglist->capacity = INIT_LIST_SIZE;
1037
1038 MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
1039
1040 msglist->sent_messages = palloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE);
1041
1042 MemoryContextSwitchTo(oldContext);
1043 }
1044
1045 /*
1046 * Look for extended message list to check if given query context qc
1047 * is used. Returns true if it is not used.
1048 */
1049 bool
can_query_context_destroy(POOL_QUERY_CONTEXT * qc)1050 can_query_context_destroy(POOL_QUERY_CONTEXT * qc)
1051 {
1052 int i;
1053 int count = 0;
1054 POOL_SENT_MESSAGE_LIST *msglist;
1055 ListCell *cell;
1056 ListCell *next;
1057
1058 msglist = &session_context->message_list;
1059
1060 for (i = 0; i < msglist->size; i++)
1061 {
1062 if (msglist->sent_messages[i]->query_context == qc)
1063 count++;
1064 }
1065 if (count > 1)
1066 {
1067 ereport(DEBUG5,
1068 (errmsg("checking if query context can be safely destroyed"),
1069 errdetail("query context %p is still used %d times in sent message list. query:\"%s\"",
1070 qc, count, qc->original_query)));
1071 return false;
1072 }
1073
1074 count = 0;
1075
1076 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1077 {
1078 POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1079
1080 if (message->query_context == qc)
1081 {
1082 count++;
1083 }
1084 next = lnext(session_context->pending_messages, cell);
1085 }
1086
1087 if (count >= 1)
1088 {
1089 ereport(DEBUG5,
1090 (errmsg("checking if query context can be safely destroyed"),
1091 errdetail("query context %p is still used %d times in pending message list. query:%s", qc, count, qc->original_query)));
1092 return false;
1093 }
1094
1095 return true;
1096 }
1097
1098 /*
1099 * Initialize pending message list
1100 */
1101 void
pool_pending_messages_init(void)1102 pool_pending_messages_init(void)
1103 {
1104 if (!session_context)
1105 ereport(ERROR,
1106 (errmsg("pool_pending_message_init: session context is not initialized")));
1107
1108 session_context->pending_messages = NIL;
1109 }
1110
1111 /*
1112 * Destroy pending message list
1113 */
1114 void
pool_pending_messages_destroy(void)1115 pool_pending_messages_destroy(void)
1116 {
1117 ListCell *cell;
1118 POOL_PENDING_MESSAGE *msg;
1119
1120 if (!session_context)
1121 ereport(ERROR,
1122 (errmsg("pool_pending_message_destory: session context is not initialized")));
1123
1124 foreach(cell, session_context->pending_messages)
1125 {
1126 msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1127 pfree(msg->contents);
1128 }
1129 list_free(session_context->pending_messages);
1130 }
1131
1132 /*
1133 * Create one message.
1134 */
1135 POOL_PENDING_MESSAGE *
pool_pending_message_create(char kind,int len,char * contents)1136 pool_pending_message_create(char kind, int len, char *contents)
1137 {
1138 POOL_PENDING_MESSAGE *msg;
1139 MemoryContext old_context;
1140
1141 if (!session_context)
1142 ereport(ERROR,
1143 (errmsg("pool_pending_message_create: session context is not initialized")));
1144
1145 old_context = MemoryContextSwitchTo(session_context->memory_context);
1146 msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1147
1148 switch (kind)
1149 {
1150 case 'P':
1151 msg->type = POOL_PARSE;
1152 break;
1153
1154 case 'B':
1155 msg->type = POOL_BIND;
1156 break;
1157
1158 case 'E':
1159 msg->type = POOL_EXECUTE;
1160 break;
1161
1162 case 'D':
1163 msg->type = POOL_DESCRIBE;
1164 break;
1165
1166 case 'C':
1167 msg->type = POOL_CLOSE;
1168 break;
1169
1170 case 'S':
1171 msg->type = POOL_SYNC;
1172 break;
1173
1174 default:
1175 ereport(ERROR,
1176 (errmsg("pool_pending_message_create: unknow kind: %c", kind)));
1177 break;
1178 }
1179
1180 if (len > 0)
1181 {
1182 msg->contents = palloc(len);
1183 memcpy(msg->contents, contents, len);
1184 }
1185 else
1186 msg->contents = NULL;
1187
1188 msg->contents_len = len;
1189 msg->query[0] = '\0';
1190 msg->statement[0] = '\0';
1191 msg->portal[0] = '\0';
1192 msg->is_rows_returned = false;
1193 msg->not_forward_to_frontend = false;
1194 memset(msg->node_ids, false, sizeof(msg->node_ids));
1195
1196 MemoryContextSwitchTo(old_context);
1197
1198 return msg;
1199 }
1200
1201 /*
1202 * Set node_ids field of message which indicates which backend nodes the
1203 * message was sent.
1204 */
1205 void
pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1206 pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1207 {
1208 memcpy(message->node_ids, query_context->where_to_send, sizeof(message->node_ids));
1209
1210 message->query_context = query_context;
1211
1212 if (is_select_query(query_context->parse_tree, query_context->original_query))
1213 {
1214 message->is_rows_returned = true;
1215 }
1216 }
1217
1218 /*
1219 * Set where_to_send field in query_context from node_ids field of message
1220 * which indicates which backend nodes the message was sent.
1221 */
1222 void
pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1223 pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1224 {
1225 int i;
1226
1227 POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
1228
1229 /* Save where_to_send map */
1230 memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
1231 s->need_to_restore_where_to_send = true;
1232
1233 /* Rewrite where_to_send map */
1234 memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
1235
1236 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1237 {
1238 if (message->node_ids[i])
1239 {
1240 query_context->where_to_send[i] = 1;
1241 }
1242 }
1243 }
1244
1245 /*
1246 * Set query field of message.
1247 */
1248 void
pool_pending_message_query_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1249 pool_pending_message_query_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1250 {
1251 StrNCpy(message->query, query_context->original_query, sizeof(message->query));
1252 }
1253
1254 /*
1255 * Add one message to the tail of the list
1256 */
1257 void
pool_pending_message_add(POOL_PENDING_MESSAGE * message)1258 pool_pending_message_add(POOL_PENDING_MESSAGE * message)
1259 {
1260 POOL_PENDING_MESSAGE *msg;
1261 MemoryContext old_context;
1262
1263 if (!session_context)
1264 ereport(ERROR,
1265 (errmsg("pool_pending_message_add: session context is not initialized")));
1266
1267 switch (message->type)
1268 {
1269 case POOL_PARSE:
1270 StrNCpy(message->statement, message->contents, sizeof(message->statement));
1271 StrNCpy(message->query, message->contents + strlen(message->contents) + 1, sizeof(message->query));
1272 break;
1273
1274 case POOL_BIND:
1275 StrNCpy(message->portal, message->contents, sizeof(message->portal));
1276 StrNCpy(message->statement, message->contents + strlen(message->contents) + 1, sizeof(message->statement));
1277 break;
1278
1279 case POOL_EXECUTE:
1280 StrNCpy(message->portal, message->contents, sizeof(message->portal));
1281 break;
1282
1283 case POOL_CLOSE:
1284 case POOL_DESCRIBE:
1285 if (*message->contents == 'S')
1286 StrNCpy(message->statement, message->contents + 1, sizeof(message->statement));
1287 else
1288 StrNCpy(message->portal, message->contents + 1, sizeof(message->portal));
1289 break;
1290
1291 case POOL_SYNC:
1292 break;
1293
1294 default:
1295 ereport(ERROR,
1296 (errmsg("pool_pending_message_add: unknown message type:%d", message->type)));
1297 return;
1298 break;
1299 }
1300
1301 if (message->type != POOL_SYNC)
1302 ereport(Elevel,
1303 (errmsg("pool_pending_message_add: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1304 pool_pending_message_type_to_string(message->type),
1305 message->contents_len, message->query, message->statement, message->portal,
1306 message->node_ids[0], message->node_ids[1])));
1307 else
1308 ereport(Elevel,
1309 (errmsg("pool_pending_message_add: message type: sync")));
1310
1311 old_context = MemoryContextSwitchTo(session_context->memory_context);
1312 msg = copy_pending_message(message);
1313 session_context->pending_messages = lappend(session_context->pending_messages, msg);
1314 MemoryContextSwitchTo(old_context);
1315 }
1316
1317 /*
1318 * Return the message from the head of the list. If the list is not empty, a
1319 * copy of the message is returned. If the list is empty, returns NULL.
1320 */
1321 POOL_PENDING_MESSAGE *
pool_pending_message_head_message(void)1322 pool_pending_message_head_message(void)
1323 {
1324 ListCell *cell;
1325 POOL_PENDING_MESSAGE *message;
1326 POOL_PENDING_MESSAGE *m;
1327 MemoryContext old_context;
1328
1329 if (!session_context)
1330 ereport(ERROR,
1331 (errmsg("pool_pending_message_head_message: session context is not initialized")));
1332
1333 if (list_length(session_context->pending_messages) == 0)
1334 {
1335 return NULL;
1336 }
1337
1338 old_context = MemoryContextSwitchTo(session_context->memory_context);
1339
1340 cell = list_head(session_context->pending_messages);
1341 m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1342 message = copy_pending_message(m);
1343 ereport(Elevel,
1344 (errmsg("pool_pending_message_head_message: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1345 pool_pending_message_type_to_string(message->type),
1346 message->contents_len, message->query, message->statement, message->portal,
1347 message->node_ids[0], message->node_ids[1])));
1348
1349 MemoryContextSwitchTo(old_context);
1350 return message;
1351 }
1352
1353
1354 /*
1355 * Remove one message from the head of the list. If the list is not empty, a
1356 * copy of the message is returned and the message is removed the message
1357 * list. If the list is empty, returns NULL.
1358 */
1359 POOL_PENDING_MESSAGE *
pool_pending_message_pull_out(void)1360 pool_pending_message_pull_out(void)
1361 {
1362 ListCell *cell;
1363 POOL_PENDING_MESSAGE *message;
1364 POOL_PENDING_MESSAGE *m;
1365 MemoryContext old_context;
1366
1367 if (!session_context)
1368 ereport(ERROR,
1369 (errmsg("pool_pending_message_pull_out: session context is not initialized")));
1370
1371 if (list_length(session_context->pending_messages) == 0)
1372 {
1373 return NULL;
1374 }
1375
1376 old_context = MemoryContextSwitchTo(session_context->memory_context);
1377
1378 cell = list_head(session_context->pending_messages);
1379 m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1380 message = copy_pending_message(m);
1381 ereport(Elevel,
1382 (errmsg("pool_pending_message_pull_out: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1383 pool_pending_message_type_to_string(message->type),
1384 message->contents_len, message->query, message->statement, message->portal,
1385 message->node_ids[0], message->node_ids[1])));
1386
1387 pool_pending_message_free_pending_message(m);
1388 session_context->pending_messages =
1389 list_delete_cell(session_context->pending_messages, cell);
1390
1391 MemoryContextSwitchTo(old_context);
1392 return message;
1393 }
1394
1395 /*
1396 * Try to find the first message specified by the message type in the message
1397 * list. If found, a copy of the message is returned. If not, returns NULL.
1398 */
1399 POOL_PENDING_MESSAGE *
pool_pending_message_get(POOL_MESSAGE_TYPE type)1400 pool_pending_message_get(POOL_MESSAGE_TYPE type)
1401 {
1402 ListCell *cell;
1403 ListCell *next;
1404 POOL_PENDING_MESSAGE *msg;
1405 MemoryContext old_context;
1406
1407 if (!session_context)
1408 ereport(ERROR,
1409 (errmsg("pool_pending_message_remove: session context is not initialized")));
1410
1411 old_context = MemoryContextSwitchTo(session_context->memory_context);
1412
1413 msg = NULL;
1414
1415 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1416 {
1417 POOL_PENDING_MESSAGE *m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1418
1419 next = lnext(session_context->pending_messages, cell);
1420
1421 if (m->type == type)
1422 {
1423 msg = copy_pending_message(m);
1424 break;
1425 }
1426 }
1427
1428 MemoryContextSwitchTo(old_context);
1429 return msg;
1430 }
1431
1432 /*
1433 * Get message specification (either statement ('S') or portal ('P')) from a
1434 * close message.
1435 */
1436 char
pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)1437 pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)
1438 {
1439 return *msg->contents;
1440 }
1441
1442 /*
1443 * Get statement or portal name from close message.
1444 * The returned pointer is within "msg".
1445 */
1446 char *
pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)1447 pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)
1448 {
1449 return (msg->contents) + 1;
1450 }
1451
1452 /*
1453 * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory
1454 * context except the query context.
1455 */
copy_pending_message(POOL_PENDING_MESSAGE * message)1456 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * message)
1457 {
1458 POOL_PENDING_MESSAGE *msg;
1459
1460 msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1461 memcpy(msg, message, sizeof(POOL_PENDING_MESSAGE));
1462 msg->contents = palloc(msg->contents_len);
1463 memcpy(msg->contents, message->contents, msg->contents_len);
1464
1465 return msg;
1466 }
1467
1468 /*
1469 * Free POOL_PENDING_MESSAGE object in the current memory
1470 * context except the query context.
1471 */
1472 void
pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)1473 pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)
1474 {
1475 if (message == NULL)
1476 return;
1477
1478 if (message->contents)
1479 pfree(message->contents);
1480
1481 pfree(message);
1482 }
1483
1484 /*
1485 * Reset previous message.
1486 */
1487 void
pool_pending_message_reset_previous_message(void)1488 pool_pending_message_reset_previous_message(void)
1489 {
1490 if (!session_context)
1491 {
1492 ereport(ERROR,
1493 (errmsg("pool_pending_message_reset_previous_message: session context is not initialized")));
1494 return;
1495 }
1496 session_context->previous_message_exists = false;
1497 }
1498
1499 /*
1500 * Set previous message.
1501 */
1502 void
pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)1503 pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)
1504 {
1505 if (!session_context)
1506 {
1507 ereport(ERROR,
1508 (errmsg("pool_pending_message_set_previous_message: session context is not initialized")));
1509 return;
1510 }
1511 session_context->previous_message_exists = true;
1512 memcpy(&session_context->previous_message, message, sizeof(POOL_PENDING_MESSAGE));
1513 }
1514
1515 /*
1516 * Get previous message. This actually returns the address of memory. Do not
1517 * try to free using pool_pending_message_free_pending_message().
1518 */
1519 POOL_PENDING_MESSAGE *
pool_pending_message_get_previous_message(void)1520 pool_pending_message_get_previous_message(void)
1521 {
1522 if (!session_context)
1523 {
1524 ereport(ERROR,
1525 (errmsg("pool_pending_message_get_previous_message: session context is not initialized")));
1526 return NULL;
1527 }
1528 if (session_context->previous_message_exists == false)
1529 return NULL;
1530
1531 return &session_context->previous_message;
1532 }
1533
1534 /*
1535 * Return true if there's any pending message.
1536 */
1537 bool
pool_pending_message_exists(void)1538 pool_pending_message_exists(void)
1539 {
1540 return list_length(session_context->pending_messages) > 0;
1541 }
1542
1543 /*
1544 * Convert enum pending message type to string. The returned string must not
1545 * be modified or freed.
1546 */
1547 const char *
pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)1548 pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)
1549 {
1550 static const char *pending_msg_string[] = {"Parse", "Bind", "Execute",
1551 "Descripbe", "Close", "Sync"};
1552
1553 if (type < 0 || type > POOL_SYNC)
1554 return "unknown type";
1555 return pending_msg_string[type];
1556 }
1557
1558 /*
1559 * Check consistency the message type and backend message kind.
1560 * This function is intended to be used for debugging.
1561 */
1562 void
pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type,char kind)1563 pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char kind)
1564 {
1565 /*
1566 * Backend response message sorted by POOL_MESSAGE_TYPE
1567 */
1568 static char backend_response_kind[] = {
1569 '1', /* POOL_PARSE, parse complete */
1570 '2', /* POOL_BIND, bind complete */
1571 '*', /* POOL_EXECUTE, skip checking */
1572 '*', /* POOL_DESCRIBE, skip checking */
1573 '3', /* POOL_CLOSE, close complete */
1574 'Z' /* POOL_SYNC, ready for query */
1575 };
1576
1577 if (type < POOL_PARSE || type > POOL_SYNC)
1578 {
1579 ereport(DEBUG5,
1580 (errmsg("pool_check_pending_message_and_reply: type out of range: %d", type)));
1581 return;
1582 }
1583
1584 if (backend_response_kind[type] == '*')
1585 {
1586 return;
1587 }
1588
1589 if (backend_response_kind[type] != kind)
1590 {
1591 ereport(DEBUG5,
1592 (errmsg("pool_check_pending_message_and_reply: type: %s but is kind: %c",
1593 pool_pending_message_type_to_string(type), kind)));
1594 }
1595 return;
1596 }
1597
1598 /*
1599 * Find the latest pending message having specified query context. The
1600 * returned message is a pointer to the message list. So do not free it using
1601 * pool_pending_message_free_pending_message.
1602 */
1603 POOL_PENDING_MESSAGE *
pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)1604 pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)
1605 {
1606 List *msgs;
1607 POOL_PENDING_MESSAGE *msg;
1608 int len;
1609 ListCell *cell;
1610
1611 if (!session_context)
1612 {
1613 ereport(ERROR,
1614 (errmsg("pool_pending_message_find_lastest_by_query_context: session context is not initialized")));
1615 }
1616
1617 if (!session_context->pending_messages)
1618 return NULL;
1619
1620 msgs = session_context->pending_messages;
1621
1622 len = list_length(msgs);
1623 if (len <= 0)
1624 return NULL;
1625
1626 ereport(DEBUG5,
1627 (errmsg("pool_pending_message_find_lastest_by_query_context: num messages: %d",
1628 len)));
1629
1630 while (len--)
1631 {
1632 cell = list_nth_cell(msgs, len);
1633 if (cell)
1634 {
1635 msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1636 if (msg->query_context == qc)
1637 {
1638 ereport(DEBUG5,
1639 (errmsg("pool_pending_message_find_lastest_by_query_context: msg found. type: %s",
1640 pool_pending_message_type_to_string(msg->type))));
1641 return msg;
1642 }
1643 ereport(DEBUG5,
1644 (errmsg("pool_pending_message_find_lastest_by_query_context: type: %s",
1645 pool_pending_message_type_to_string(msg->type))));
1646 }
1647 }
1648 return NULL;
1649 }
1650
1651 /*
1652 * Get target backend id from pending message assuming that the destination for
1653 * the pending message is one of primary or standby node.
1654 */
1655 int
pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)1656 pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)
1657 {
1658 int backend_id = -1;
1659 int i;
1660
1661 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1662 {
1663 if (msg->node_ids[i])
1664 {
1665 backend_id = i;
1666 break;
1667 }
1668 }
1669
1670 if (backend_id == -1)
1671 ereport(ERROR,
1672 (errmsg("pool_pending_message_get_target_backend_id: no target backend id found")));
1673
1674 return backend_id;
1675 }
1676
1677 /*
1678 * Get number of pending message list entries of which target backend is same as specified one.
1679 */
1680 int
pool_pending_message_get_message_num_by_backend_id(int backend_id)1681 pool_pending_message_get_message_num_by_backend_id(int backend_id)
1682 {
1683 ListCell *cell;
1684 ListCell *next;
1685 int cnt = 0;
1686 int i;
1687
1688 if (!session_context)
1689 {
1690 ereport(ERROR,
1691 (errmsg("pool_pending_message_get_message_num_by_backend_id: session context is not initialized")));
1692 return 0;
1693 }
1694
1695 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1696 {
1697 POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1698
1699 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1700 {
1701 if (msg->node_ids[i] && i == backend_id)
1702 {
1703 cnt++;
1704 break;
1705 }
1706 }
1707
1708 next = lnext(session_context->pending_messages, cell);
1709 }
1710 return cnt;
1711 }
1712
1713 /*
1714 * Dump whole pending message list
1715 */
1716 void
dump_pending_message(void)1717 dump_pending_message(void)
1718 {
1719 ListCell *cell;
1720 ListCell *next;
1721
1722 if (!session_context)
1723 {
1724 ereport(ERROR,
1725 (errmsg("dump_pending_message: session context is not initialized")));
1726 return;
1727 }
1728
1729 ereport(DEBUG5,
1730 (errmsg("start dumping pending message list")));
1731
1732 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1733 {
1734 POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1735
1736 ereport(DEBUG5,
1737 (errmsg("pool_pending_message_dump: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1738 message->type, message->contents_len, message->query, message->statement, message->portal,
1739 message->node_ids[0], message->node_ids[1])));
1740
1741 next = lnext(session_context->pending_messages, cell);
1742 }
1743
1744 ereport(DEBUG5,
1745 (errmsg("end dumping pending message list")));
1746 }
1747
1748 /*
1749 * Set protocol major version number
1750 */
1751 void
pool_set_major_version(int major)1752 pool_set_major_version(int major)
1753 {
1754 if (session_context)
1755 {
1756 session_context->major = major;
1757 }
1758 }
1759
1760 /*
1761 * Get protocol major version number
1762 */
1763 int
pool_get_major_version(void)1764 pool_get_major_version(void)
1765 {
1766 if (session_context)
1767 {
1768 return session_context->major;
1769 }
1770 return PROTO_MAJOR_V3;
1771 }
1772
1773 /*
1774 * Set protocol minor version number
1775 */
1776 void
pool_set_minor_version(int minor)1777 pool_set_minor_version(int minor)
1778 {
1779 if (session_context)
1780 {
1781 session_context->minor = minor;
1782 }
1783 }
1784
1785 /*
1786 * Get protocol minor version number
1787 */
1788 int
pool_get_minor_version(void)1789 pool_get_minor_version(void)
1790 {
1791 if (session_context)
1792 {
1793 return session_context->minor;
1794 }
1795 return 0;
1796 }
1797
1798 /*
1799 * Is suspend_reading_from_frontend flag set?
1800 */
1801 bool
pool_is_suspend_reading_from_frontend(void)1802 pool_is_suspend_reading_from_frontend(void)
1803 {
1804 return session_context->suspend_reading_from_frontend;
1805 }
1806
1807 /*
1808 * Set suspend_reading_from_frontend flag.
1809 */
1810 void
pool_set_suspend_reading_from_frontend(void)1811 pool_set_suspend_reading_from_frontend(void)
1812 {
1813 session_context->suspend_reading_from_frontend = true;
1814 }
1815
1816 /*
1817 * Unset suspend_reading_from_frontend flag.
1818 */
1819 void
pool_unset_suspend_reading_from_frontend(void)1820 pool_unset_suspend_reading_from_frontend(void)
1821 {
1822 session_context->suspend_reading_from_frontend = false;
1823 }
1824
1825 #ifdef NOT_USED
1826 /*
1827 * Set preferred "main" node id.
1828 * Only used for SimpleForwardToFrontend.
1829 */
1830 void
pool_set_preferred_main_node_id(int node_id)1831 pool_set_preferred_main_node_id(int node_id)
1832 {
1833 session_context->preferred_main_node_id = node_id;
1834 }
1835
1836 /*
1837 * Return preferred "main" node id.
1838 * Only used for SimpleForwardToFrontend.
1839 */
1840 int
pool_get_preferred_main_node_id(void)1841 pool_get_preferred_main_node_id(void)
1842 {
1843 return session_context->preferred_main_node_id;
1844 }
1845
1846 /*
1847 * Reset preferred "main" node id.
1848 * Only used for SimpleForwardToFrontend.
1849 */
1850 void
pool_reset_preferred_main_node_id(void)1851 pool_reset_preferred_main_node_id(void)
1852 {
1853 session_context->preferred_main_node_id = -1;
1854 }
1855 #endif
1856
1857 /*-----------------------------------------------------------------------
1858 * Temporary table list management modules.
1859 *-----------------------------------------------------------------------
1860 */
1861
1862 /*
1863 * Initialize temp table list
1864 */
1865 void
pool_temp_tables_init(void)1866 pool_temp_tables_init(void)
1867 {
1868 if (!session_context)
1869 ereport(ERROR,
1870 (errmsg("pool_temp_tables_init: session context is not initialized")));
1871
1872 session_context->temp_tables = NIL;
1873 }
1874
1875 /*
1876 * Destroy temp table list
1877 */
1878 void
pool_temp_tables_destroy(void)1879 pool_temp_tables_destroy(void)
1880 {
1881 if (!session_context)
1882 ereport(ERROR,
1883 (errmsg("pool_temp_tables_destory: session context is not initialized")));
1884
1885 list_free(session_context->temp_tables);
1886 }
1887
1888 /*
1889 * Add a temp table to the tail of the list.
1890 * If the table already exists, just replace state.
1891 */
1892 void
pool_temp_tables_add(char * tablename,POOL_TEMP_TABLE_STATE state)1893 pool_temp_tables_add(char * tablename, POOL_TEMP_TABLE_STATE state)
1894 {
1895 MemoryContext old_context;
1896 POOL_TEMP_TABLE * table;
1897
1898 if (!session_context)
1899 ereport(ERROR,
1900 (errmsg("pool_temp_tables_add: session context is not initialized")));
1901
1902 old_context = MemoryContextSwitchTo(session_context->memory_context);
1903
1904 table = pool_temp_tables_find(tablename);
1905 if (table)
1906 {
1907 /* Table already exists. Just replace state. */
1908 table->state = state;
1909 }
1910 else
1911 {
1912 table = palloc(sizeof(POOL_TEMP_TABLE));
1913 StrNCpy(table->tablename, tablename, sizeof(table->tablename));
1914 table->state = state;
1915 session_context->temp_tables = lappend(session_context->temp_tables, table);
1916 }
1917
1918 MemoryContextSwitchTo(old_context);
1919 }
1920
1921 /*
1922 * Returns pointer to the table cell if specified tablename is in the temp table list.
1923 */
1924
1925 POOL_TEMP_TABLE *
pool_temp_tables_find(char * tablename)1926 pool_temp_tables_find(char * tablename)
1927 {
1928 ListCell *cell;
1929
1930 if (!session_context)
1931 ereport(ERROR,
1932 (errmsg("pool_temp_tables_find: session context is not initialized")));
1933
1934 foreach(cell, session_context->temp_tables)
1935 {
1936 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1937 if (strcmp(tablename, table->tablename) == 0)
1938 return table;
1939 }
1940 return NULL;
1941 }
1942
1943 /*
1944 * If requested state or table state is TEMP_TABLE_DROP_COMMITTED, removes the
1945 * temp table entry from the list. Otherwise just set the requested state to
1946 * the table state.
1947 */
1948 void
pool_temp_tables_delete(char * tablename,POOL_TEMP_TABLE_STATE state)1949 pool_temp_tables_delete(char * tablename, POOL_TEMP_TABLE_STATE state)
1950 {
1951 POOL_TEMP_TABLE * table;
1952 MemoryContext old_context;
1953
1954 if (!session_context)
1955 ereport(ERROR,
1956 (errmsg("pool_temp_tables_delete: session context is not initialized")));
1957
1958 ereport(LOG,
1959 (errmsg("pool_temp_tables_delete: table: %s state: %d", tablename, state)));
1960
1961 old_context = MemoryContextSwitchTo(session_context->memory_context);
1962
1963 table = pool_temp_tables_find(tablename);
1964
1965 if (table)
1966 {
1967 if (table->state == TEMP_TABLE_DROP_COMMITTED)
1968 {
1969 ereport(DEBUG1,
1970 (errmsg("pool_temp_tables_delete: remove %s. previous state: %d requested state: %d",
1971 table->tablename, table->state, state)));
1972
1973 session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1974 }
1975 else
1976 {
1977 ereport(DEBUG1,
1978 (errmsg("pool_temp_tables_delete: set state %s. previous state: %d requested state: %d",
1979 table->tablename, table->state, state)));
1980 table->state = state;
1981 }
1982 }
1983
1984 MemoryContextSwitchTo(old_context);
1985 }
1986
1987 /*
1988 * Commits creating entries. Also remove dropping entries. This is supposed to
1989 * be called when an explicit transaction commits.
1990 */
1991 void
pool_temp_tables_commit_pending(void)1992 pool_temp_tables_commit_pending(void)
1993 {
1994 ListCell *cell;
1995 MemoryContext old_context;
1996
1997 if (!session_context)
1998 ereport(ERROR,
1999 (errmsg("pool_temp_tables_commit_pending: session context is not initialized")));
2000
2001 old_context = MemoryContextSwitchTo(session_context->memory_context);
2002
2003 pool_temp_tables_dump();
2004
2005 Retry:
2006 foreach(cell, session_context->temp_tables)
2007 {
2008 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2009
2010 if (table->state == TEMP_TABLE_CREATING)
2011 {
2012 ereport(DEBUG1,
2013 (errmsg("pool_temp_tables_commit_pending: commit: %s", table->tablename)));
2014
2015 table->state = TEMP_TABLE_CREATE_COMMITTED;
2016 }
2017 else if (table->state == TEMP_TABLE_DROPPING)
2018 {
2019 ereport(DEBUG1,
2020 (errmsg("pool_temp_tables_commit_pending: remove: %s", table->tablename)));
2021 session_context->temp_tables = list_delete_cell(session_context->temp_tables, cell);
2022 pool_temp_tables_dump();
2023 goto Retry;
2024 }
2025 }
2026
2027 MemoryContextSwitchTo(old_context);
2028 }
2029
2030 /*
2031 * Removes all ongoing creating or dropping entries. This is supposed to be
2032 * called when an explicit transaction aborts.
2033 */
2034 void
pool_temp_tables_remove_pending(void)2035 pool_temp_tables_remove_pending(void)
2036 {
2037 ListCell *cell;
2038 MemoryContext old_context;
2039
2040 if (!session_context)
2041 ereport(ERROR,
2042 (errmsg("pool_temp_tables_remove_pending: session context is not initialized")));
2043
2044 old_context = MemoryContextSwitchTo(session_context->memory_context);
2045
2046 pool_temp_tables_dump();
2047
2048 Retry:
2049 foreach(cell, session_context->temp_tables)
2050 {
2051 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2052
2053 if (table->state == TEMP_TABLE_CREATING || table->state == TEMP_TABLE_DROPPING)
2054 {
2055 ereport(DEBUG1,
2056 (errmsg("pool_temp_tables_remove_pending: remove: %s", table->tablename)));
2057
2058 session_context->temp_tables = list_delete_cell(session_context->temp_tables, cell);
2059 pool_temp_tables_dump();
2060 goto Retry;
2061 }
2062 }
2063
2064 MemoryContextSwitchTo(old_context);
2065 }
2066
2067 void
pool_temp_tables_dump(void)2068 pool_temp_tables_dump(void)
2069 {
2070 #ifdef TEMP_TABLES_DEBUG
2071 ListCell *cell;
2072
2073 if (!session_context)
2074 ereport(ERROR,
2075 (errmsg("pool_temp_tables_dump: session context is not initialized")));
2076
2077 foreach(cell, session_context->temp_tables)
2078 {
2079 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2080 ereport(DEBUG1,
2081 (errmsg("pool_temp_tables_dump: table %s state: %d",
2082 table->tablename, table->state)));
2083 }
2084 #endif
2085
2086 }
2087