1 /* -*-pgsql-c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2019 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 */
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "pool_config.h"
30 #include "context/pool_session_context.h"
31 #include "protocol/pool_proto_modules.h"
32
33 static POOL_SESSION_CONTEXT session_context_d;
34 static POOL_SESSION_CONTEXT * session_context = NULL;
35 static void GetTranIsolationErrorCb(void *arg);
36 static void init_sent_message_list(void);
37 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * messag);
38 static void dump_sent_message(char *caller, POOL_SENT_MESSAGE * m);
39
40 #ifdef PENDING_MESSAGE_DEBUG
41 static int Elevel = LOG;
42 #else
43 static int Elevel = DEBUG2;
44 #endif
45
46 /*
47 * Initialize per session context
48 */
49 void
pool_init_session_context(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)50 pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
51 {
52 session_context = &session_context_d;
53 ProcessInfo *process_info;
54 int node_id;
55 int i;
56
57 /* Clear session context memory */
58 memset(&session_context_d, 0, sizeof(session_context_d));
59
60 /* Get Process context */
61 session_context->process_context = pool_get_process_context();
62 if (!session_context->process_context)
63 ereport(ERROR,
64 (errmsg("failed to get process context")));
65
66 /* Set connection info */
67 session_context->frontend = frontend;
68 session_context->backend = backend;
69
70 /* Initialize query context */
71 session_context->query_context = NULL;
72
73 /* Initialize local session id */
74 pool_incremnet_local_session_id();
75
76 /* Create memory context */
77 /* TODO re-think about the parent for this context ?? */
78 session_context->memory_context = AllocSetContextCreate(ProcessLoopContext,
79 "SessionContext",
80 ALLOCSET_SMALL_MINSIZE,
81 ALLOCSET_SMALL_INITSIZE,
82 ALLOCSET_SMALL_MAXSIZE);
83
84 /* Initialize sent message list */
85 init_sent_message_list();
86
87 process_info = pool_get_my_process_info();
88
89 if (!process_info)
90 ereport(ERROR,
91 (errmsg("failed to get process info for current process")));
92
93 /* Choose load balancing node if necessary */
94 if (!RAW_MODE && pool_config->load_balance_mode)
95 {
96 node_id = select_load_balancing_node();
97 }
98 else
99 {
100 node_id = SL_MODE ? PRIMARY_NODE_ID : MASTER_NODE_ID;
101 }
102
103 session_context->load_balance_node_id = node_id;
104
105 for (i = 0; i < NUM_BACKENDS; i++)
106 {
107 pool_coninfo(session_context->process_context->proc_id,
108 pool_pool_index(), i)->load_balancing_node = node_id;
109 }
110
111 ereport(DEBUG5,
112 (errmsg("initializing session context"),
113 errdetail("selected load balancing node: %d", node_id)));
114
115 /* Unset query is in progress */
116 pool_unset_query_in_progress();
117
118 /* The command in progress has not succeeded yet */
119 pool_unset_command_success();
120
121 /* We don't have a write query in this transaction yet */
122 pool_unset_writing_transaction();
123
124 /* Error doesn't occur in this transaction yet */
125 pool_unset_failed_transaction();
126
127 /* Forget transaction isolation mode */
128 pool_unset_transaction_isolation();
129
130 /* We don't skip reading from backends */
131 pool_unset_skip_reading_from_backends();
132
133 /* Backends have not ignored messages yet */
134 pool_unset_ignore_till_sync();
135
136 /* Unset suspend reading from frontend flag */
137 pool_unset_suspend_reading_from_frontend();
138
139 /* Initialize where to send map for PREPARE statements */
140 #ifdef NOT_USED
141 memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
142 session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS;
143 #endif /* NOT_USED */
144
145 /*
146 * Reset flag to indicate difference in number of affected tuples in
147 * UPDATE/DELETE.
148 */
149 session_context->mismatch_ntuples = false;
150
151 if (pool_config->memory_cache_enabled)
152 {
153 session_context->query_cache_array = pool_create_query_cache_array();
154 session_context->num_selects = 0;
155 }
156
157 /* Initialize pending message list */
158 pool_pending_messages_init();
159
160 /* Initialize previous pending message */
161 pool_pending_message_reset_previous_message();
162
163 /* Initialize temp tables */
164 pool_temp_tables_init();
165
166 #ifdef NOT_USED
167 /* Initialize preferred master node id */
168 pool_reset_preferred_master_node_id();
169 #endif
170 }
171
172 /*
173 * Destroy session context.
174 */
175 void
pool_session_context_destroy(void)176 pool_session_context_destroy(void)
177 {
178 if (session_context)
179 {
180 pool_clear_sent_message_list();
181 pfree(session_context->message_list.sent_messages);
182 if (pool_config->memory_cache_enabled)
183 {
184 pool_discard_query_cache_array(session_context->query_cache_array);
185 session_context->num_selects = 0;
186 }
187
188 if (session_context->query_context)
189 pool_query_context_destroy(session_context->query_context);
190 MemoryContextDelete(session_context->memory_context);
191 }
192 /* XXX For now, just zap memory */
193 memset(&session_context_d, 0, sizeof(session_context_d));
194 session_context = NULL;
195 }
196
197 /*
198 * Return session context
199 */
200 POOL_SESSION_CONTEXT *
pool_get_session_context(bool noerror)201 pool_get_session_context(bool noerror)
202 {
203 if (!session_context && !noerror)
204 {
205 ereport(FATAL,
206 (return_code(2),
207 errmsg("unable to get session context")));
208 }
209 return session_context;
210 }
211
212 /*
213 * Return local session id
214 */
215 int
pool_get_local_session_id(void)216 pool_get_local_session_id(void)
217 {
218 return pool_get_session_context(false)->process_context->local_session_id;
219 }
220
221 /*
222 * Return true if query is in progress
223 */
224 bool
pool_is_query_in_progress(void)225 pool_is_query_in_progress(void)
226 {
227 return pool_get_session_context(false)->in_progress;
228 }
229
230 /*
231 * Set query is in progress
232 */
233 void
pool_set_query_in_progress(void)234 pool_set_query_in_progress(void)
235 {
236 ereport(DEBUG5,
237 (errmsg("session context: setting query in progress. DONE")));
238
239 pool_get_session_context(false)->in_progress = true;
240 }
241
242 /*
243 * Unset query is in progress
244 */
245 void
pool_unset_query_in_progress(void)246 pool_unset_query_in_progress(void)
247 {
248 POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
249
250 ereport(DEBUG5,
251 (errmsg("session context: unsetting query in progress. DONE")));
252
253 s->in_progress = false;
254
255 /* Restore where_to_send map if necessary */
256 if (s->need_to_restore_where_to_send)
257 {
258 memcpy(s->query_context->where_to_send, s->where_to_send_save, sizeof(s->where_to_send_save));
259 }
260 s->need_to_restore_where_to_send = false;
261 }
262
263 /*
264 * Return true if we skip reading from backends
265 */
266 bool
pool_is_skip_reading_from_backends(void)267 pool_is_skip_reading_from_backends(void)
268 {
269 return pool_get_session_context(false)->skip_reading_from_backends;
270 }
271
272 /*
273 * Set skip_reading_from_backends
274 */
275 void
pool_set_skip_reading_from_backends(void)276 pool_set_skip_reading_from_backends(void)
277 {
278 ereport(DEBUG5,
279 (errmsg("session context: setting skip reading from backends. DONE")));
280
281
282 pool_get_session_context(false)->skip_reading_from_backends = true;
283 }
284
285 /*
286 * Unset skip_reading_from_backends
287 */
288 void
pool_unset_skip_reading_from_backends(void)289 pool_unset_skip_reading_from_backends(void)
290 {
291 ereport(DEBUG5,
292 (errmsg("session context: clearing skip reading from backends. DONE")));
293
294 pool_get_session_context(false)->skip_reading_from_backends = false;
295 }
296
297 /*
298 * Return true if we are doing extended query message
299 */
300 bool
pool_is_doing_extended_query_message(void)301 pool_is_doing_extended_query_message(void)
302 {
303 return pool_get_session_context(false)->doing_extended_query_message;
304 }
305
306 /*
307 * Set doing_extended_query_message
308 */
309 void
pool_set_doing_extended_query_message(void)310 pool_set_doing_extended_query_message(void)
311 {
312 ereport(DEBUG5,
313 (errmsg("session context: setting doing extended query messaging. DONE")));
314
315 pool_get_session_context(false)->doing_extended_query_message = true;
316 }
317
318 /*
319 * Unset doing_extended_query_message
320 */
321 void
pool_unset_doing_extended_query_message(void)322 pool_unset_doing_extended_query_message(void)
323 {
324 ereport(DEBUG5,
325 (errmsg("session context: clearing doing extended query messaging. DONE")));
326
327 pool_get_session_context(false)->doing_extended_query_message = false;
328 }
329
330 /*
331 * Return true if backends ignore extended query message
332 */
333 bool
pool_is_ignore_till_sync(void)334 pool_is_ignore_till_sync(void)
335 {
336 return pool_get_session_context(false)->ignore_till_sync;
337 }
338
339 /*
340 * Set ignore_till_sync
341 */
342 void
pool_set_ignore_till_sync(void)343 pool_set_ignore_till_sync(void)
344 {
345 ereport(DEBUG5,
346 (errmsg("session context: setting ignore till sync. DONE")));
347
348 pool_get_session_context(false)->ignore_till_sync = true;
349 }
350
351 /*
352 * Unset ignore_till_sync
353 */
354 void
pool_unset_ignore_till_sync(void)355 pool_unset_ignore_till_sync(void)
356 {
357 ereport(DEBUG5,
358 (errmsg("session context: clearing ignore till sync. DONE")));
359
360 pool_get_session_context(false)->ignore_till_sync = false;
361 }
362
363 /*
364 * Remove a sent message
365 */
366 bool
pool_remove_sent_message(char kind,const char * name)367 pool_remove_sent_message(char kind, const char *name)
368 {
369 int i;
370 POOL_SENT_MESSAGE_LIST *msglist;
371
372 if (kind == 0 || name == NULL)
373 return false;
374
375 msglist = &pool_get_session_context(false)->message_list;
376
377 for (i = 0; i < msglist->size; i++)
378 {
379 if (msglist->sent_messages[i]->kind == kind &&
380 !strcmp(msglist->sent_messages[i]->name, name))
381 {
382 pool_sent_message_destroy(msglist->sent_messages[i]);
383 break;
384 }
385 }
386
387 /* sent message not found */
388 if (i == msglist->size)
389 return false;
390
391 if (i != msglist->size - 1)
392 {
393 memmove(&msglist->sent_messages[i], &msglist->sent_messages[i + 1],
394 sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1));
395 }
396
397 msglist->size--;
398
399 return true;
400 }
401
402 /*
403 * Remove same kind of sent messages
404 */
405 void
pool_remove_sent_messages(char kind)406 pool_remove_sent_messages(char kind)
407 {
408 int i;
409 POOL_SENT_MESSAGE_LIST *msglist;
410
411 msglist = &pool_get_session_context(false)->message_list;
412
413 for (i = 0; i < msglist->size; i++)
414 {
415 if (msglist->sent_messages[i]->kind == kind)
416 {
417 if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name))
418 i--; /* for relocation by removing */
419 }
420 }
421 }
422
423 /*
424 * Destroy sent message
425 */
426 void
pool_sent_message_destroy(POOL_SENT_MESSAGE * message)427 pool_sent_message_destroy(POOL_SENT_MESSAGE * message)
428 {
429 bool in_progress;
430 POOL_QUERY_CONTEXT *qc = NULL;
431
432 dump_sent_message("pool_sent_message_destroy", message);
433
434 in_progress = pool_is_query_in_progress();
435
436 if (message)
437 {
438 if (message->contents)
439 pfree(message->contents);
440
441 if (message->name)
442 pfree(message->name);
443
444 if (message->query_context)
445 {
446 if (session_context->query_context != message->query_context)
447 qc = session_context->query_context;
448
449 if (can_query_context_destroy(message->query_context))
450 {
451 pool_query_context_destroy(message->query_context);
452
453 /*
454 * set in_progress flag, because pool_query_context_destroy()
455 * unsets in_progress flag
456 */
457 if (in_progress)
458 pool_set_query_in_progress();
459
460 /*
461 * set query_context of session_context, because
462 * pool_query_context_destroy() sets it to NULL.
463 */
464 if (qc)
465 session_context->query_context = qc;
466 }
467 }
468
469 if (session_context->memory_context)
470 pfree(message);
471 }
472 }
473
474 /*
475 * Clear sent message list
476 */
477 void
pool_clear_sent_message_list(void)478 pool_clear_sent_message_list(void)
479 {
480 POOL_SENT_MESSAGE_LIST *msglist;
481
482 msglist = &pool_get_session_context(false)->message_list;
483
484 while (msglist->size > 0)
485 {
486 pool_remove_sent_messages(msglist->sent_messages[0]->kind);
487 }
488 }
489
490 /*
491 * Zap query context info in sent messages to indicate that the query context
492 * has been already removed.
493 */
494 void
pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT * query_context)495 pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT *query_context)
496 {
497 int i;
498 POOL_SENT_MESSAGE_LIST *msglist;
499
500 msglist = &pool_get_session_context(false)->message_list;
501
502 for (i = 0; i < msglist->size; i++)
503 {
504 elog(DEBUG5, "checking zapping sent message: %p query_context: %p",
505 &msglist->sent_messages[i], msglist->sent_messages[i]->query_context);
506 if (msglist->sent_messages[i]->query_context == query_context)
507 {
508 msglist->sent_messages[i]->query_context = NULL;
509 elog(DEBUG5, "Zap sent message: %p", &msglist->sent_messages[i]);
510 }
511 }
512 }
513
514 static void
dump_sent_message(char * caller,POOL_SENT_MESSAGE * m)515 dump_sent_message(char *caller, POOL_SENT_MESSAGE * m)
516 {
517 ereport(DEBUG5,
518 (errmsg("called by %s: sent message: address: %p kind: %c name: =%s= state:%d",
519 caller, m, m->kind, m->name, m->state)));
520 }
521
522 /*
523 * Create a sent message.
524 * kind: one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE)
525 * len: message length in host order
526 * contents: message contents
527 * num_tsparams: number of timestamp parameters
528 * name: prepared statement name or portal name
529 */
530 POOL_SENT_MESSAGE *
pool_create_sent_message(char kind,int len,char * contents,int num_tsparams,const char * name,POOL_QUERY_CONTEXT * query_context)531 pool_create_sent_message(char kind, int len, char *contents,
532 int num_tsparams, const char *name,
533 POOL_QUERY_CONTEXT * query_context)
534 {
535 POOL_SENT_MESSAGE *msg;
536
537 if (!session_context)
538 ereport(ERROR,
539 (errmsg("unable to create message"),
540 errdetail("cannot get the session context")));
541
542 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
543
544 msg = palloc(sizeof(POOL_SENT_MESSAGE));
545 msg->kind = kind;
546 msg->len = len;
547 msg->contents = palloc(len);
548 memcpy(msg->contents, contents, len);
549 msg->state = POOL_SENT_MESSAGE_CREATED;
550 msg->num_tsparams = num_tsparams;
551 msg->name = pstrdup(name);
552 msg->query_context = query_context;
553 MemoryContextSwitchTo(old_context);
554
555 return msg;
556 }
557
558 /*
559 * Add a sent message to sent message list
560 */
561 void
pool_add_sent_message(POOL_SENT_MESSAGE * message)562 pool_add_sent_message(POOL_SENT_MESSAGE * message)
563 {
564 POOL_SENT_MESSAGE *old_msg;
565 POOL_SENT_MESSAGE_LIST *msglist;
566
567 dump_sent_message("pool_add_sent_message", message);
568
569 if (!message)
570 {
571 ereport(DEBUG5,
572 (errmsg("adding sent message to list"),
573 errdetail("message is null")));
574 return;
575 }
576
577 old_msg = pool_get_sent_message(message->kind, message->name, POOL_SENT_MESSAGE_CREATED);
578
579 if (old_msg == message)
580 {
581 /*
582 * It is likely caller tries to add the exact same message previously
583 * added. We should ignore this because pool_remove_sent_message()
584 * will free memory allocated in the message.
585 */
586 ereport(DEBUG5,
587 (errmsg("adding sent message to list"),
588 errdetail("adding exactly same message is prohibited")));
589 return;
590 }
591
592 msglist = &session_context->message_list;
593
594 if (old_msg)
595 {
596 if (message->kind == 'B')
597 ereport(DEBUG5,
598 (errmsg("adding sent message to list"),
599 errdetail("portal \"%s\" already exists", message->name)));
600 else
601 ereport(DEBUG5,
602 (errmsg("adding sent message to list"),
603 errdetail("prepared statement \"%s\" already exists", message->name)));
604
605 if (*message->name == '\0')
606 pool_remove_sent_message(old_msg->kind, old_msg->name);
607 else
608 return;
609 }
610
611 if (msglist->size == msglist->capacity)
612 {
613 msglist->capacity *= 2;
614
615 MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
616
617 msglist->sent_messages = repalloc(msglist->sent_messages,
618 sizeof(POOL_SENT_MESSAGE *) * msglist->capacity);
619
620 MemoryContextSwitchTo(oldContext);
621 }
622
623 msglist->sent_messages[msglist->size++] = message;
624 }
625
626 /*
627 * Find a sent message by kind and name.
628 */
629 POOL_SENT_MESSAGE *
pool_get_sent_message(char kind,const char * name,POOL_SENT_MESSAGE_STATE state)630 pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state)
631 {
632 int i;
633 POOL_SENT_MESSAGE_LIST *msglist;
634
635 msglist = &pool_get_session_context(false)->message_list;
636
637 if (kind == 0 || name == NULL)
638 return NULL;
639
640 for (i = 0; i < msglist->size; i++)
641 {
642 if (msglist->sent_messages[i]->kind == kind &&
643 !strcmp(msglist->sent_messages[i]->name, name) &&
644 msglist->sent_messages[i]->state == state)
645 return msglist->sent_messages[i];
646 }
647
648 return NULL;
649 }
650
651 /*
652 * Find a sent message by query context.
653 */
654 POOL_SENT_MESSAGE *
pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)655 pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)
656 {
657 int i;
658 POOL_SENT_MESSAGE_LIST *msglist;
659
660 msglist = &pool_get_session_context(false)->message_list;
661
662 if (query_context == NULL)
663 return NULL;
664
665 for (i = 0; i < msglist->size; i++)
666 {
667 if (msglist->sent_messages[i]->query_context == query_context)
668 return msglist->sent_messages[i];
669 }
670
671 return NULL;
672 }
673
674 /*
675 * Set message state to POOL_SENT_MESSAGE_STATE to POOL_SENT_MESSAGE_CLOSED.
676 */
677 void
pool_set_sent_message_state(POOL_SENT_MESSAGE * message)678 pool_set_sent_message_state(POOL_SENT_MESSAGE * message)
679 {
680 ereport(DEBUG5,
681 (errmsg("pool_set_sent_message_state: name:%s kind:%c previous state: %d",
682 message->name, message->kind, message->state)));
683 message->state = POOL_SENT_MESSAGE_CLOSED;
684 }
685
686 /*
687 * We don't have a write query in this transaction yet.
688 */
689 void
pool_unset_writing_transaction(void)690 pool_unset_writing_transaction(void)
691 {
692 /*
693 * If disable_transaction_on_write is 'always', then never turn off
694 * writing transaction flag.
695 */
696 if (pool_config->disable_load_balance_on_write != DLBOW_ALWAYS)
697 {
698 pool_get_session_context(false)->writing_transaction = false;
699 ereport(DEBUG5,
700 (errmsg("session context: clearing writing transaction. DONE")));
701 }
702 }
703
704 /*
705 * We have a write query in this transaction.
706 */
707 void
pool_set_writing_transaction(void)708 pool_set_writing_transaction(void)
709 {
710 /*
711 * If disable_transaction_on_write is 'off', then never turn on writing
712 * transaction flag.
713 */
714 if (pool_config->disable_load_balance_on_write != DLBOW_OFF)
715 {
716 pool_get_session_context(false)->writing_transaction = true;
717 ereport(DEBUG5,
718 (errmsg("session context: setting writing transaction. DONE")));
719 }
720 }
721
722 /*
723 * Do we have a write query in this transaction?
724 */
725 bool
pool_is_writing_transaction(void)726 pool_is_writing_transaction(void)
727 {
728 return pool_get_session_context(false)->writing_transaction;
729 }
730
731 /*
732 * Error doesn't occur in this transaction yet.
733 */
734 void
pool_unset_failed_transaction(void)735 pool_unset_failed_transaction(void)
736 {
737 ereport(DEBUG5,
738 (errmsg("session context: clearing failed transaction. DONE")));
739
740 pool_get_session_context(false)->failed_transaction = false;
741 }
742
743 /*
744 * Error occurred in this transaction.
745 */
746 void
pool_set_failed_transaction(void)747 pool_set_failed_transaction(void)
748 {
749 ereport(DEBUG5,
750 (errmsg("session context: setting failed transaction. DONE")));
751
752 pool_get_session_context(false)->failed_transaction = true;
753 }
754
755 /*
756 * Did error occur in this transaction?
757 */
758 bool
pool_is_failed_transaction(void)759 pool_is_failed_transaction(void)
760 {
761 return pool_get_session_context(false)->failed_transaction;
762 }
763
764 /*
765 * Forget transaction isolation mode
766 */
767 void
pool_unset_transaction_isolation(void)768 pool_unset_transaction_isolation(void)
769 {
770 ereport(DEBUG5,
771 (errmsg("session context: clearing failed transaction. DONE")));
772 pool_get_session_context(false)->transaction_isolation = POOL_UNKNOWN;
773 }
774
775 /*
776 * Set transaction isolation mode
777 */
778 void
pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)779 pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)
780 {
781 ereport(DEBUG5,
782 (errmsg("session context: setting transaction isolation. DONE")));
783 pool_get_session_context(false)->transaction_isolation = isolation_level;
784 }
785
786 /*
787 * Get or return cached transaction isolation mode
788 */
789 POOL_TRANSACTION_ISOLATION
pool_get_transaction_isolation(void)790 pool_get_transaction_isolation(void)
791 {
792 POOL_SELECT_RESULT *res;
793 POOL_TRANSACTION_ISOLATION ret;
794 ErrorContextCallback callback;
795
796 if (!session_context)
797 {
798 ereport(WARNING,
799 (errmsg("error while getting transaction isolation, session context is not initialized")));
800 return POOL_UNKNOWN;
801 }
802
803 /* It seems cached result is usable. Return it. */
804 if (session_context->transaction_isolation != POOL_UNKNOWN)
805 return session_context->transaction_isolation;
806
807 /*
808 * Register a error context callback to throw proper context message
809 */
810 callback.callback = GetTranIsolationErrorCb;
811 callback.arg = NULL;
812 callback.previous = error_context_stack;
813 error_context_stack = &callback;
814
815 /* No cached data is available. Ask backend. */
816
817 do_query(MASTER(session_context->backend),
818 "SELECT current_setting('transaction_isolation')", &res, MAJOR(session_context->backend));
819
820 error_context_stack = callback.previous;
821
822 if (res->numrows <= 0)
823 {
824 ereport(WARNING,
825 (errmsg("error while getting transaction isolation, do_query returns no rows")));
826 free_select_result(res);
827 return POOL_UNKNOWN;
828 }
829 if (res->data[0] == NULL)
830 {
831 ereport(WARNING,
832 (errmsg("error while getting transaction isolation, do_query returns no data")));
833
834 free_select_result(res);
835 return POOL_UNKNOWN;
836 }
837 if (res->nullflags[0] == -1)
838 {
839 ereport(WARNING,
840 (errmsg("error while getting transaction isolation, do_query returns NULL")));
841 free_select_result(res);
842 return POOL_UNKNOWN;
843 }
844
845 if (!strcmp(res->data[0], "read uncommitted"))
846 ret = POOL_READ_UNCOMMITTED;
847 else if (!strcmp(res->data[0], "read committed"))
848 ret = POOL_READ_COMMITTED;
849 else if (!strcmp(res->data[0], "repeatable read"))
850 ret = POOL_REPEATABLE_READ;
851 else if (!strcmp(res->data[0], "serializable"))
852 ret = POOL_SERIALIZABLE;
853 else
854 {
855 ereport(WARNING,
856 (errmsg("error while getting transaction isolation, unknown transaction isolation level:%s", res->data[0])));
857
858 ret = POOL_UNKNOWN;
859 }
860
861 free_select_result(res);
862
863 if (ret != POOL_UNKNOWN)
864 session_context->transaction_isolation = ret;
865
866 return ret;
867 }
868
869 static void
GetTranIsolationErrorCb(void * arg)870 GetTranIsolationErrorCb(void *arg)
871 {
872 errcontext("while getting transaction isolation");
873 }
874
875
876 /*
877 * The command in progress has not succeeded yet.
878 */
879 void
pool_unset_command_success(void)880 pool_unset_command_success(void)
881 {
882 ereport(DEBUG5,
883 (errmsg("session context: clearing transaction isolation. DONE")));
884 pool_get_session_context(false)->command_success = false;
885 }
886
887 /*
888 * The command in progress has succeeded.
889 */
890 void
pool_set_command_success(void)891 pool_set_command_success(void)
892 {
893 ereport(DEBUG5,
894 (errmsg("session context: setting command success. DONE")));
895
896 pool_get_session_context(false)->command_success = true;
897 }
898
899 /*
900 * Has the command in progress succeeded?
901 */
902 bool
pool_is_command_success(void)903 pool_is_command_success(void)
904 {
905 return pool_get_session_context(false)->command_success;
906 }
907
908 /*
909 * Copy send map
910 */
911 void
pool_copy_prep_where(bool * src,bool * dest)912 pool_copy_prep_where(bool *src, bool *dest)
913 {
914 memcpy(dest, src, sizeof(bool) * MAX_NUM_BACKENDS);
915 }
916 #ifdef NOT_USED
917 /*
918 * Add to send map a PREPARED statement
919 */
920 void
pool_add_prep_where(char * name,bool * map)921 pool_add_prep_where(char *name, bool *map)
922 {
923 int i;
924
925 if (!session_context)
926 {
927 ereport(ERROR,
928 (errmsg("pool_add_prep_where: session context is not initialized")));
929 return;
930 }
931
932 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
933 {
934 if (*session_context->prep_where.name[i] == '\0')
935 {
936 strncpy(session_context->prep_where.name[i], name, POOL_MAX_PREPARED_NAME);
937 pool_copy_prep_where(map, session_context->prep_where.where_to_send[i]);
938 return;
939 }
940 }
941 ereport(ERROR,
942 (errmsg("pool_add_prep_where: no empty slot found")));
943 }
944
945 /*
946 * Search send map by PREPARED statement name
947 */
948 bool *
pool_get_prep_where(char * name)949 pool_get_prep_where(char *name)
950 {
951 int i;
952
953 if (!session_context)
954 ereport(ERROR,
955 (errmsg("pool_get_prep_where: session context is not initialized")));
956
957
958 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
959 {
960 if (!strcmp(session_context->prep_where.name[i], name))
961 {
962 return session_context->prep_where.where_to_send[i];
963 }
964 }
965 return NULL;
966 }
967
968 /*
969 * Remove PREPARED statement by name
970 */
971 void
pool_delete_prep_where(char * name)972 pool_delete_prep_where(char *name)
973 {
974 int i;
975
976 if (!session_context)
977 ereport(ERROR,
978 (errmsg("pool_delete_prep_where: session context is not initialized")));
979
980 for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
981 {
982 if (!strcmp(session_context->prep_where.name[i], name))
983 {
984 memset(&session_context->prep_where.where_to_send[i], 0, sizeof(bool) * MAX_NUM_BACKENDS);
985 *session_context->prep_where.name[i] = '\0';
986 return;
987 }
988 }
989 }
990 #endif /* NOT_USED */
991 /*
992 * Initialize sent message list
993 */
994 static void
init_sent_message_list(void)995 init_sent_message_list(void)
996 {
997 POOL_SENT_MESSAGE_LIST *msglist;
998
999 msglist = &session_context->message_list;
1000 msglist->size = 0;
1001 msglist->capacity = INIT_LIST_SIZE;
1002
1003 MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
1004
1005 msglist->sent_messages = palloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE);
1006
1007 MemoryContextSwitchTo(oldContext);
1008 }
1009
1010 /*
1011 * Look for extended message list to check if given query context qc
1012 * is used. Returns true if it is not used.
1013 */
1014 bool
can_query_context_destroy(POOL_QUERY_CONTEXT * qc)1015 can_query_context_destroy(POOL_QUERY_CONTEXT * qc)
1016 {
1017 int i;
1018 int count = 0;
1019 POOL_SENT_MESSAGE_LIST *msglist;
1020 ListCell *cell;
1021 ListCell *next;
1022
1023 msglist = &session_context->message_list;
1024
1025 for (i = 0; i < msglist->size; i++)
1026 {
1027 if (msglist->sent_messages[i]->query_context == qc)
1028 count++;
1029 }
1030 if (count > 1)
1031 {
1032 ereport(DEBUG5,
1033 (errmsg("checking if query context can be safely destroyed"),
1034 errdetail("query context %p is still used %d times in sent message list. query:\"%s\"",
1035 qc, count, qc->original_query)));
1036 return false;
1037 }
1038
1039 count = 0;
1040
1041 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1042 {
1043 POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1044
1045 if (message->query_context == qc)
1046 {
1047 count++;
1048 }
1049 next = lnext(cell);
1050 }
1051
1052 if (count >= 1)
1053 {
1054 ereport(DEBUG5,
1055 (errmsg("checking if query context can be safely destroyed"),
1056 errdetail("query context %p is still used %d times in pending message list. query:%s", qc, count, qc->original_query)));
1057 return false;
1058 }
1059
1060 return true;
1061 }
1062
1063 /*
1064 * Initialize pending message list
1065 */
1066 void
pool_pending_messages_init(void)1067 pool_pending_messages_init(void)
1068 {
1069 if (!session_context)
1070 ereport(ERROR,
1071 (errmsg("pool_pending_message_init: session context is not initialized")));
1072
1073 session_context->pending_messages = NIL;
1074 }
1075
1076 /*
1077 * Destroy pending message list
1078 */
1079 void
pool_pending_messages_destroy(void)1080 pool_pending_messages_destroy(void)
1081 {
1082 ListCell *cell;
1083 POOL_PENDING_MESSAGE *msg;
1084
1085 if (!session_context)
1086 ereport(ERROR,
1087 (errmsg("pool_pending_message_destory: session context is not initialized")));
1088
1089 foreach(cell, session_context->pending_messages)
1090 {
1091 msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1092 pfree(msg->contents);
1093 }
1094 list_free(session_context->pending_messages);
1095 }
1096
1097 /*
1098 * Create one message.
1099 */
1100 POOL_PENDING_MESSAGE *
pool_pending_message_create(char kind,int len,char * contents)1101 pool_pending_message_create(char kind, int len, char *contents)
1102 {
1103 POOL_PENDING_MESSAGE *msg;
1104 MemoryContext old_context;
1105
1106 if (!session_context)
1107 ereport(ERROR,
1108 (errmsg("pool_pending_message_create: session context is not initialized")));
1109
1110 old_context = MemoryContextSwitchTo(session_context->memory_context);
1111 msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1112
1113 switch (kind)
1114 {
1115 case 'P':
1116 msg->type = POOL_PARSE;
1117 break;
1118
1119 case 'B':
1120 msg->type = POOL_BIND;
1121 break;
1122
1123 case 'E':
1124 msg->type = POOL_EXECUTE;
1125 break;
1126
1127 case 'D':
1128 msg->type = POOL_DESCRIBE;
1129 break;
1130
1131 case 'C':
1132 msg->type = POOL_CLOSE;
1133 break;
1134
1135 case 'S':
1136 msg->type = POOL_SYNC;
1137 break;
1138
1139 default:
1140 ereport(ERROR,
1141 (errmsg("pool_pending_message_create: unknow kind: %c", kind)));
1142 break;
1143 }
1144
1145 if (len > 0)
1146 {
1147 msg->contents = palloc(len);
1148 memcpy(msg->contents, contents, len);
1149 }
1150 else
1151 msg->contents = NULL;
1152
1153 msg->contents_len = len;
1154 msg->query[0] = '\0';
1155 msg->statement[0] = '\0';
1156 msg->portal[0] = '\0';
1157 msg->is_rows_returned = false;
1158 msg->not_forward_to_frontend = false;
1159 memset(msg->node_ids, false, sizeof(msg->node_ids));
1160
1161 MemoryContextSwitchTo(old_context);
1162
1163 return msg;
1164 }
1165
1166 /*
1167 * Set node_ids field of message which indicates which backend nodes the
1168 * message was sent.
1169 */
1170 void
pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1171 pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1172 {
1173 memcpy(message->node_ids, query_context->where_to_send, sizeof(message->node_ids));
1174
1175 message->query_context = query_context;
1176
1177 if (is_select_query(query_context->parse_tree, query_context->original_query))
1178 {
1179 message->is_rows_returned = true;
1180 }
1181 }
1182
1183 /*
1184 * Set where_to_send field in query_context from node_ids field of message
1185 * which indicates which backend nodes the message was sent.
1186 */
1187 void
pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1188 pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1189 {
1190 int i;
1191
1192 POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
1193
1194 /* Save where_to_send map */
1195 memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
1196 s->need_to_restore_where_to_send = true;
1197
1198 /* Rewrite where_to_send map */
1199 memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
1200
1201 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1202 {
1203 if (message->node_ids[i])
1204 {
1205 query_context->where_to_send[i] = 1;
1206 }
1207 }
1208 }
1209
1210 /*
1211 * Set query field of message.
1212 */
1213 void
pool_pending_message_query_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1214 pool_pending_message_query_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1215 {
1216 StrNCpy(message->query, query_context->original_query, sizeof(message->query));
1217 }
1218
1219 /*
1220 * Add one message to the tail of the list
1221 */
1222 void
pool_pending_message_add(POOL_PENDING_MESSAGE * message)1223 pool_pending_message_add(POOL_PENDING_MESSAGE * message)
1224 {
1225 POOL_PENDING_MESSAGE *msg;
1226 MemoryContext old_context;
1227
1228 if (!session_context)
1229 ereport(ERROR,
1230 (errmsg("pool_pending_message_add: session context is not initialized")));
1231
1232 switch (message->type)
1233 {
1234 case POOL_PARSE:
1235 StrNCpy(message->statement, message->contents, sizeof(message->statement));
1236 StrNCpy(message->query, message->contents + strlen(message->contents) + 1, sizeof(message->query));
1237 break;
1238
1239 case POOL_BIND:
1240 StrNCpy(message->portal, message->contents, sizeof(message->portal));
1241 StrNCpy(message->statement, message->contents + strlen(message->contents) + 1, sizeof(message->statement));
1242 break;
1243
1244 case POOL_EXECUTE:
1245 StrNCpy(message->portal, message->contents, sizeof(message->portal));
1246 break;
1247
1248 case POOL_CLOSE:
1249 case POOL_DESCRIBE:
1250 if (*message->contents == 'S')
1251 StrNCpy(message->statement, message->contents + 1, sizeof(message->statement));
1252 else
1253 StrNCpy(message->portal, message->contents + 1, sizeof(message->portal));
1254 break;
1255
1256 case POOL_SYNC:
1257 break;
1258
1259 default:
1260 ereport(ERROR,
1261 (errmsg("pool_pending_message_add: unknown message type:%d", message->type)));
1262 return;
1263 break;
1264 }
1265
1266 if (message->type != POOL_SYNC)
1267 ereport(Elevel,
1268 (errmsg("pool_pending_message_add: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1269 pool_pending_message_type_to_string(message->type),
1270 message->contents_len, message->query, message->statement, message->portal,
1271 message->node_ids[0], message->node_ids[1])));
1272 else
1273 ereport(Elevel,
1274 (errmsg("pool_pending_message_add: message type: sync")));
1275
1276 old_context = MemoryContextSwitchTo(session_context->memory_context);
1277 msg = copy_pending_message(message);
1278 session_context->pending_messages = lappend(session_context->pending_messages, msg);
1279 MemoryContextSwitchTo(old_context);
1280 }
1281
1282 /*
1283 * Return the message from the head of the list. If the list is not empty, a
1284 * copy of the message is returned. If the list is empty, returns NULL.
1285 */
1286 POOL_PENDING_MESSAGE *
pool_pending_message_head_message(void)1287 pool_pending_message_head_message(void)
1288 {
1289 ListCell *cell;
1290 POOL_PENDING_MESSAGE *message;
1291 POOL_PENDING_MESSAGE *m;
1292 MemoryContext old_context;
1293
1294 if (!session_context)
1295 ereport(ERROR,
1296 (errmsg("pool_pending_message_head_message: session context is not initialized")));
1297
1298 if (list_length(session_context->pending_messages) == 0)
1299 {
1300 return NULL;
1301 }
1302
1303 old_context = MemoryContextSwitchTo(session_context->memory_context);
1304
1305 cell = list_head(session_context->pending_messages);
1306 m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1307 message = copy_pending_message(m);
1308 ereport(Elevel,
1309 (errmsg("pool_pending_message_head_message: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1310 pool_pending_message_type_to_string(message->type),
1311 message->contents_len, message->query, message->statement, message->portal,
1312 message->node_ids[0], message->node_ids[1])));
1313
1314 MemoryContextSwitchTo(old_context);
1315 return message;
1316 }
1317
1318
1319 /*
1320 * Remove one message from the head of the list. If the list is not empty, a
1321 * copy of the message is returned and the message is removed the message
1322 * list. If the list is empty, returns NULL.
1323 */
1324 POOL_PENDING_MESSAGE *
pool_pending_message_pull_out(void)1325 pool_pending_message_pull_out(void)
1326 {
1327 ListCell *cell;
1328 POOL_PENDING_MESSAGE *message;
1329 POOL_PENDING_MESSAGE *m;
1330 MemoryContext old_context;
1331
1332 if (!session_context)
1333 ereport(ERROR,
1334 (errmsg("pool_pending_message_pull_out: session context is not initialized")));
1335
1336 if (list_length(session_context->pending_messages) == 0)
1337 {
1338 return NULL;
1339 }
1340
1341 old_context = MemoryContextSwitchTo(session_context->memory_context);
1342
1343 cell = list_head(session_context->pending_messages);
1344 m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1345 message = copy_pending_message(m);
1346 ereport(Elevel,
1347 (errmsg("pool_pending_message_pull_out: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1348 pool_pending_message_type_to_string(message->type),
1349 message->contents_len, message->query, message->statement, message->portal,
1350 message->node_ids[0], message->node_ids[1])));
1351
1352 pool_pending_message_free_pending_message(m);
1353 session_context->pending_messages =
1354 list_delete_cell(session_context->pending_messages, cell, NULL);
1355
1356 MemoryContextSwitchTo(old_context);
1357 return message;
1358 }
1359
1360 /*
1361 * Try to find the first message specified by the message type in the message
1362 * list. If found, a copy of the message is returned. If not, returns NULL.
1363 */
1364 POOL_PENDING_MESSAGE *
pool_pending_message_get(POOL_MESSAGE_TYPE type)1365 pool_pending_message_get(POOL_MESSAGE_TYPE type)
1366 {
1367 ListCell *cell;
1368 ListCell *next;
1369 POOL_PENDING_MESSAGE *msg;
1370 MemoryContext old_context;
1371
1372 if (!session_context)
1373 ereport(ERROR,
1374 (errmsg("pool_pending_message_remove: session context is not initialized")));
1375
1376 old_context = MemoryContextSwitchTo(session_context->memory_context);
1377
1378 msg = NULL;
1379
1380 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1381 {
1382 POOL_PENDING_MESSAGE *m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1383
1384 next = lnext(cell);
1385
1386 if (m->type == type)
1387 {
1388 msg = copy_pending_message(m);
1389 break;
1390 }
1391 }
1392
1393 MemoryContextSwitchTo(old_context);
1394 return msg;
1395 }
1396
1397 /*
1398 * Get message specification (either statement ('S') or portal ('P')) from a
1399 * close message.
1400 */
1401 char
pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)1402 pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)
1403 {
1404 return *msg->contents;
1405 }
1406
1407 /*
1408 * Get statement or portal name from close message.
1409 * The returned pointer is within "msg".
1410 */
1411 char *
pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)1412 pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)
1413 {
1414 return (msg->contents) + 1;
1415 }
1416
1417 /*
1418 * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory
1419 * context except the query context.
1420 */
copy_pending_message(POOL_PENDING_MESSAGE * message)1421 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * message)
1422 {
1423 POOL_PENDING_MESSAGE *msg;
1424
1425 msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1426 memcpy(msg, message, sizeof(POOL_PENDING_MESSAGE));
1427 msg->contents = palloc(msg->contents_len);
1428 memcpy(msg->contents, message->contents, msg->contents_len);
1429
1430 return msg;
1431 }
1432
1433 /*
1434 * Free POOL_PENDING_MESSAGE object in the current memory
1435 * context except the query context.
1436 */
1437 void
pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)1438 pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)
1439 {
1440 if (message == NULL)
1441 return;
1442
1443 if (message->contents)
1444 pfree(message->contents);
1445
1446 pfree(message);
1447 }
1448
1449 /*
1450 * Reset previous message.
1451 */
1452 void
pool_pending_message_reset_previous_message(void)1453 pool_pending_message_reset_previous_message(void)
1454 {
1455 if (!session_context)
1456 {
1457 ereport(ERROR,
1458 (errmsg("pool_pending_message_reset_previous_message: session context is not initialized")));
1459 return;
1460 }
1461 session_context->previous_message_exists = false;
1462 }
1463
1464 /*
1465 * Set previous message.
1466 */
1467 void
pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)1468 pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)
1469 {
1470 if (!session_context)
1471 {
1472 ereport(ERROR,
1473 (errmsg("pool_pending_message_set_previous_message: session context is not initialized")));
1474 return;
1475 }
1476 session_context->previous_message_exists = true;
1477 memcpy(&session_context->previous_message, message, sizeof(POOL_PENDING_MESSAGE));
1478 }
1479
1480 /*
1481 * Get previous message. This actually returns the address of memory. Do not
1482 * try to free using pool_pending_message_free_pending_message().
1483 */
1484 POOL_PENDING_MESSAGE *
pool_pending_message_get_previous_message(void)1485 pool_pending_message_get_previous_message(void)
1486 {
1487 if (!session_context)
1488 {
1489 ereport(ERROR,
1490 (errmsg("pool_pending_message_get_previous_message: session context is not initialized")));
1491 return NULL;
1492 }
1493 if (session_context->previous_message_exists == false)
1494 return NULL;
1495
1496 return &session_context->previous_message;
1497 }
1498
1499 /*
1500 * Return true if there's any pending message.
1501 */
1502 bool
pool_pending_message_exists(void)1503 pool_pending_message_exists(void)
1504 {
1505 return list_length(session_context->pending_messages) > 0;
1506 }
1507
1508 /*
1509 * Convert enum pending message type to string. The returned string must not
1510 * be modified or freed.
1511 */
1512 const char *
pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)1513 pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)
1514 {
1515 static const char *pending_msg_string[] = {"Parse", "Bind", "Execute",
1516 "Descripbe", "Close", "Sync"};
1517
1518 if (type < 0 || type > POOL_SYNC)
1519 return "unknown type";
1520 return pending_msg_string[type];
1521 }
1522
1523 /*
1524 * Check consistency the message type and backend message kind.
1525 * This function is intended to be used for debugging.
1526 */
1527 void
pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type,char kind)1528 pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char kind)
1529 {
1530 /*
1531 * Backend response message sorted by POOL_MESSAGE_TYPE
1532 */
1533 static char backend_response_kind[] = {
1534 '1', /* POOL_PARSE, parse complete */
1535 '2', /* POOL_BIND, bind complete */
1536 '*', /* POOL_EXECUTE, skip checking */
1537 '*', /* POOL_DESCRIBE, skip checking */
1538 '3', /* POOL_CLOSE, close complete */
1539 'Z' /* POOL_SYNC, ready for query */
1540 };
1541
1542 if (type < POOL_PARSE || type > POOL_SYNC)
1543 {
1544 ereport(DEBUG5,
1545 (errmsg("pool_check_pending_message_and_reply: type out of range: %d", type)));
1546 return;
1547 }
1548
1549 if (backend_response_kind[type] == '*')
1550 {
1551 return;
1552 }
1553
1554 if (backend_response_kind[type] != kind)
1555 {
1556 ereport(DEBUG5,
1557 (errmsg("pool_check_pending_message_and_reply: type: %s but is kind: %c",
1558 pool_pending_message_type_to_string(type), kind)));
1559 }
1560 return;
1561 }
1562
1563 /*
1564 * Find the latest pending message having specified query context. The
1565 * returned message is a pointer to the message list. So do not free it using
1566 * pool_pending_message_free_pending_message.
1567 */
1568 POOL_PENDING_MESSAGE *
pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)1569 pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)
1570 {
1571 List *msgs;
1572 POOL_PENDING_MESSAGE *msg;
1573 int len;
1574 ListCell *cell;
1575
1576 if (!session_context)
1577 {
1578 ereport(ERROR,
1579 (errmsg("pool_pending_message_find_lastest_by_query_context: session context is not initialized")));
1580 }
1581
1582 if (!session_context->pending_messages)
1583 return NULL;
1584
1585 msgs = session_context->pending_messages;
1586
1587 len = list_length(msgs);
1588 if (len <= 0)
1589 return NULL;
1590
1591 ereport(DEBUG5,
1592 (errmsg("pool_pending_message_find_lastest_by_query_context: num messages: %d",
1593 len)));
1594
1595 while (len--)
1596 {
1597 cell = list_nth_cell(msgs, len);
1598 if (cell)
1599 {
1600 msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1601 if (msg->query_context == qc)
1602 {
1603 ereport(DEBUG5,
1604 (errmsg("pool_pending_message_find_lastest_by_query_context: msg found. type: %s",
1605 pool_pending_message_type_to_string(msg->type))));
1606 return msg;
1607 }
1608 ereport(DEBUG5,
1609 (errmsg("pool_pending_message_find_lastest_by_query_context: type: %s",
1610 pool_pending_message_type_to_string(msg->type))));
1611 }
1612 }
1613 return NULL;
1614 }
1615
1616 /*
1617 * Get target backend id from pending message assuming that the destination for
1618 * the pending message is one of primary or standby node.
1619 */
1620 int
pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)1621 pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)
1622 {
1623 int backend_id = -1;
1624 int i;
1625
1626 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1627 {
1628 if (msg->node_ids[i])
1629 {
1630 backend_id = i;
1631 break;
1632 }
1633 }
1634
1635 if (backend_id == -1)
1636 ereport(ERROR,
1637 (errmsg("pool_pending_message_get_target_backend_id: no target backend id found")));
1638
1639 return backend_id;
1640 }
1641
1642 /*
1643 * Get number of pending message list entries of which target backend is same as specified one.
1644 */
1645 int
pool_pending_message_get_message_num_by_backend_id(int backend_id)1646 pool_pending_message_get_message_num_by_backend_id(int backend_id)
1647 {
1648 ListCell *cell;
1649 ListCell *next;
1650 int cnt = 0;
1651 int i;
1652
1653 if (!session_context)
1654 {
1655 ereport(ERROR,
1656 (errmsg("pool_pending_message_get_message_num_by_backend_id: session context is not initialized")));
1657 return 0;
1658 }
1659
1660 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1661 {
1662 POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1663
1664 for (i = 0; i < MAX_NUM_BACKENDS; i++)
1665 {
1666 if (msg->node_ids[i] && i == backend_id)
1667 {
1668 cnt++;
1669 break;
1670 }
1671 }
1672
1673 next = lnext(cell);
1674 }
1675 return cnt;
1676 }
1677
1678 /*
1679 * Dump whole pending message list
1680 */
1681 void
dump_pending_message(void)1682 dump_pending_message(void)
1683 {
1684 ListCell *cell;
1685 ListCell *next;
1686
1687 if (!session_context)
1688 {
1689 ereport(ERROR,
1690 (errmsg("dump_pending_message: session context is not initialized")));
1691 return;
1692 }
1693
1694 ereport(DEBUG5,
1695 (errmsg("start dumping pending message list")));
1696
1697 for (cell = list_head(session_context->pending_messages); cell; cell = next)
1698 {
1699 POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1700
1701 ereport(DEBUG5,
1702 (errmsg("pool_pending_message_dump: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1703 message->type, message->contents_len, message->query, message->statement, message->portal,
1704 message->node_ids[0], message->node_ids[1])));
1705
1706 next = lnext(cell);
1707 }
1708
1709 ereport(DEBUG5,
1710 (errmsg("end dumping pending message list")));
1711 }
1712
1713 /*
1714 * Set protocol major version number
1715 */
1716 void
pool_set_major_version(int major)1717 pool_set_major_version(int major)
1718 {
1719 if (session_context)
1720 {
1721 session_context->major = major;
1722 }
1723 }
1724
1725 /*
1726 * Get protocol major version number
1727 */
1728 int
pool_get_major_version(void)1729 pool_get_major_version(void)
1730 {
1731 if (session_context)
1732 {
1733 return session_context->major;
1734 }
1735 return PROTO_MAJOR_V3;
1736 }
1737
1738 /*
1739 * Set protocol minor version number
1740 */
1741 void
pool_set_minor_version(int minor)1742 pool_set_minor_version(int minor)
1743 {
1744 if (session_context)
1745 {
1746 session_context->minor = minor;
1747 }
1748 }
1749
1750 /*
1751 * Get protocol minor version number
1752 */
1753 int
pool_get_minor_version(void)1754 pool_get_minor_version(void)
1755 {
1756 if (session_context)
1757 {
1758 return session_context->minor;
1759 }
1760 return 0;
1761 }
1762
1763 /*
1764 * Is suspend_reading_from_frontend flag set?
1765 */
1766 bool
pool_is_suspend_reading_from_frontend(void)1767 pool_is_suspend_reading_from_frontend(void)
1768 {
1769 return session_context->suspend_reading_from_frontend;
1770 }
1771
1772 /*
1773 * Set suspend_reading_from_frontend flag.
1774 */
1775 void
pool_set_suspend_reading_from_frontend(void)1776 pool_set_suspend_reading_from_frontend(void)
1777 {
1778 session_context->suspend_reading_from_frontend = true;
1779 }
1780
1781 /*
1782 * Unset suspend_reading_from_frontend flag.
1783 */
1784 void
pool_unset_suspend_reading_from_frontend(void)1785 pool_unset_suspend_reading_from_frontend(void)
1786 {
1787 session_context->suspend_reading_from_frontend = false;
1788 }
1789
1790 #ifdef NOT_USED
1791 /*
1792 * Set preferred "master" node id.
1793 * Only used for SimpleForwardToFrontend.
1794 */
1795 void
pool_set_preferred_master_node_id(int node_id)1796 pool_set_preferred_master_node_id(int node_id)
1797 {
1798 session_context->preferred_master_node_id = node_id;
1799 }
1800
1801 /*
1802 * Return preferred "master" node id.
1803 * Only used for SimpleForwardToFrontend.
1804 */
1805 int
pool_get_preferred_master_node_id(void)1806 pool_get_preferred_master_node_id(void)
1807 {
1808 return session_context->preferred_master_node_id;
1809 }
1810
1811 /*
1812 * Reset preferred "master" node id.
1813 * Only used for SimpleForwardToFrontend.
1814 */
1815 void
pool_reset_preferred_master_node_id(void)1816 pool_reset_preferred_master_node_id(void)
1817 {
1818 session_context->preferred_master_node_id = -1;
1819 }
1820 #endif
1821
1822 /*-----------------------------------------------------------------------
1823 * Temporary table list management modules.
1824 *-----------------------------------------------------------------------
1825 */
1826
1827 /*
1828 * Initialize temp table list
1829 */
1830 void
pool_temp_tables_init(void)1831 pool_temp_tables_init(void)
1832 {
1833 if (!session_context)
1834 ereport(ERROR,
1835 (errmsg("pool_temp_tables_init: session context is not initialized")));
1836
1837 session_context->temp_tables = NIL;
1838 }
1839
1840 /*
1841 * Destroy temp table list
1842 */
1843 void
pool_temp_tables_destroy(void)1844 pool_temp_tables_destroy(void)
1845 {
1846 if (!session_context)
1847 ereport(ERROR,
1848 (errmsg("pool_temp_tables_destory: session context is not initialized")));
1849
1850 list_free(session_context->temp_tables);
1851 }
1852
1853 /*
1854 * Add a temp table to the tail of the list.
1855 * If the table already exists, just replace state.
1856 */
1857 void
pool_temp_tables_add(char * tablename,POOL_TEMP_TABLE_STATE state)1858 pool_temp_tables_add(char * tablename, POOL_TEMP_TABLE_STATE state)
1859 {
1860 MemoryContext old_context;
1861 POOL_TEMP_TABLE * table;
1862
1863 if (!session_context)
1864 ereport(ERROR,
1865 (errmsg("pool_temp_tables_add: session context is not initialized")));
1866
1867 old_context = MemoryContextSwitchTo(session_context->memory_context);
1868
1869 table = pool_temp_tables_find(tablename);
1870 if (table)
1871 {
1872 /* Table already exists. Just replace state. */
1873 table->state = state;
1874 }
1875 else
1876 {
1877 table = palloc(sizeof(POOL_TEMP_TABLE));
1878 StrNCpy(table->tablename, tablename, sizeof(table->tablename));
1879 table->state = state;
1880 session_context->temp_tables = lappend(session_context->temp_tables, table);
1881 }
1882
1883 MemoryContextSwitchTo(old_context);
1884 }
1885
1886 /*
1887 * Returns pointer to the table cell if specified tablename is in the temp table list.
1888 */
1889
1890 POOL_TEMP_TABLE *
pool_temp_tables_find(char * tablename)1891 pool_temp_tables_find(char * tablename)
1892 {
1893 ListCell *cell;
1894
1895 if (!session_context)
1896 ereport(ERROR,
1897 (errmsg("pool_temp_tables_find: session context is not initialized")));
1898
1899 foreach(cell, session_context->temp_tables)
1900 {
1901 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1902 if (strcmp(tablename, table->tablename) == 0)
1903 return table;
1904 }
1905 return NULL;
1906 }
1907
1908 /*
1909 * If requested state or table state is TEMP_TABLE_DROP_COMMITTED, removes the
1910 * temp table entry from the list. Otherwise just set the requested state to
1911 * the table state.
1912 */
1913 void
pool_temp_tables_delete(char * tablename,POOL_TEMP_TABLE_STATE state)1914 pool_temp_tables_delete(char * tablename, POOL_TEMP_TABLE_STATE state)
1915 {
1916 POOL_TEMP_TABLE * table;
1917 MemoryContext old_context;
1918
1919 if (!session_context)
1920 ereport(ERROR,
1921 (errmsg("pool_temp_tables_delete: session context is not initialized")));
1922
1923 ereport(LOG,
1924 (errmsg("pool_temp_tables_delete: table: %s state: %d", tablename, state)));
1925
1926 old_context = MemoryContextSwitchTo(session_context->memory_context);
1927
1928 table = pool_temp_tables_find(tablename);
1929
1930 if (table)
1931 {
1932 if (table->state == TEMP_TABLE_DROP_COMMITTED)
1933 {
1934 ereport(DEBUG1,
1935 (errmsg("pool_temp_tables_delete: remove %s. previous state: %d requested state: %d",
1936 table->tablename, table->state, state)));
1937
1938 session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1939 }
1940 else
1941 {
1942 ereport(DEBUG1,
1943 (errmsg("pool_temp_tables_delete: set state %s. previous state: %d requested state: %d",
1944 table->tablename, table->state, state)));
1945 table->state = state;
1946 }
1947 }
1948
1949 MemoryContextSwitchTo(old_context);
1950 }
1951
1952 /*
1953 * Commits creating entries. Also remove dropping entries. This is supposed to
1954 * be called when an explicit transaction commits.
1955 */
1956 void
pool_temp_tables_commit_pending(void)1957 pool_temp_tables_commit_pending(void)
1958 {
1959 ListCell *cell;
1960 MemoryContext old_context;
1961
1962 if (!session_context)
1963 ereport(ERROR,
1964 (errmsg("pool_temp_tables_commit_pending: session context is not initialized")));
1965
1966 old_context = MemoryContextSwitchTo(session_context->memory_context);
1967
1968 pool_temp_tables_dump();
1969
1970 Retry:
1971 foreach(cell, session_context->temp_tables)
1972 {
1973 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1974
1975 if (table->state == TEMP_TABLE_CREATING)
1976 {
1977 ereport(DEBUG1,
1978 (errmsg("pool_temp_tables_commit_pending: commit: %s", table->tablename)));
1979
1980 table->state = TEMP_TABLE_CREATE_COMMITTED;
1981 }
1982 else if (table->state == TEMP_TABLE_DROPPING)
1983 {
1984 ereport(DEBUG1,
1985 (errmsg("pool_temp_tables_commit_pending: remove: %s", table->tablename)));
1986 session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1987 pool_temp_tables_dump();
1988 goto Retry;
1989 }
1990 }
1991
1992 MemoryContextSwitchTo(old_context);
1993 }
1994
1995 /*
1996 * Removes all ongoing creating or dropping entries. This is supposed to be
1997 * called when an explicit transaction aborts.
1998 */
1999 void
pool_temp_tables_remove_pending(void)2000 pool_temp_tables_remove_pending(void)
2001 {
2002 ListCell *cell;
2003 MemoryContext old_context;
2004
2005 if (!session_context)
2006 ereport(ERROR,
2007 (errmsg("pool_temp_tables_remove_pending: session context is not initialized")));
2008
2009 old_context = MemoryContextSwitchTo(session_context->memory_context);
2010
2011 pool_temp_tables_dump();
2012
2013 Retry:
2014 foreach(cell, session_context->temp_tables)
2015 {
2016 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2017
2018 if (table->state == TEMP_TABLE_CREATING || table->state == TEMP_TABLE_DROPPING)
2019 {
2020 ereport(DEBUG1,
2021 (errmsg("pool_temp_tables_remove_pending: remove: %s", table->tablename)));
2022
2023 session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
2024 pool_temp_tables_dump();
2025 goto Retry;
2026 }
2027 }
2028
2029 MemoryContextSwitchTo(old_context);
2030 }
2031
2032 void
pool_temp_tables_dump(void)2033 pool_temp_tables_dump(void)
2034 {
2035 #ifdef TEMP_TABLES_DEBUG
2036 ListCell *cell;
2037
2038 if (!session_context)
2039 ereport(ERROR,
2040 (errmsg("pool_temp_tables_dump: session context is not initialized")));
2041
2042 foreach(cell, session_context->temp_tables)
2043 {
2044 POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2045 ereport(DEBUG1,
2046 (errmsg("pool_temp_tables_dump: table %s state: %d",
2047 table->tablename, table->state)));
2048 }
2049 #endif
2050 }
2051