1 /* -*-pgsql-c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2020	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  */
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24 
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "pool_config.h"
30 #include "protocol/pool_proto_modules.h"
31 #include "protocol/pool_process_query.h"
32 #include "protocol/pool_connection_pool.h"
33 #include "protocol/pool_pg_utils.h"
34 #include "context/pool_session_context.h"
35 
36 static POOL_SESSION_CONTEXT session_context_d;
37 static POOL_SESSION_CONTEXT * session_context = NULL;
38 static void GetTranIsolationErrorCb(void *arg);
39 static void init_sent_message_list(void);
40 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * messag);
41 static void dump_sent_message(char *caller, POOL_SENT_MESSAGE * m);
42 static void dml_adaptive_init(void);
43 static void dml_adaptive_destroy(void);
44 
45 #ifdef PENDING_MESSAGE_DEBUG
46 static int	Elevel = LOG;
47 #else
48 static int	Elevel = DEBUG2;
49 #endif
50 
51 /*
52  * Initialize per session context
53  */
54 void
pool_init_session_context(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)55 pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
56 {
57 	session_context = &session_context_d;
58 	ProcessInfo *process_info;
59 	int			node_id;
60 	int			i;
61 
62 	/* Clear session context memory */
63 	memset(&session_context_d, 0, sizeof(session_context_d));
64 
65 	/* Get Process context */
66 	session_context->process_context = pool_get_process_context();
67 	if (!session_context->process_context)
68 		ereport(ERROR,
69 				(errmsg("failed to get process context")));
70 
71 	/* Set connection info */
72 	session_context->frontend = frontend;
73 	session_context->backend = backend;
74 
75 	/* Initialize query context */
76 	session_context->query_context = NULL;
77 
78 	/* Initialize local session id */
79 	pool_incremnet_local_session_id();
80 
81 	/* Create memory context */
82 	/* TODO re-think about the parent for this context ?? */
83 	session_context->memory_context = AllocSetContextCreate(ProcessLoopContext,
84 															"SessionContext",
85 															ALLOCSET_SMALL_MINSIZE,
86 															ALLOCSET_SMALL_INITSIZE,
87 															ALLOCSET_SMALL_MAXSIZE);
88 
89 	/* Initialize sent message list */
90 	init_sent_message_list();
91 
92 	process_info = pool_get_my_process_info();
93 
94 	if (!process_info)
95 		ereport(ERROR,
96 				(errmsg("failed to get process info for current process")));
97 
98 	/* Choose load balancing node if necessary */
99 	if (pool_config->load_balance_mode)
100 	{
101 		node_id = select_load_balancing_node();
102 	}
103 	else
104 	{
105 		node_id = SL_MODE ? PRIMARY_NODE_ID : MAIN_NODE_ID;
106 	}
107 
108 	session_context->load_balance_node_id = node_id;
109 
110 	for (i = 0; i < NUM_BACKENDS; i++)
111 	{
112 		pool_coninfo(session_context->process_context->proc_id,
113 					 pool_pool_index(), i)->load_balancing_node = node_id;
114 	}
115 
116 	ereport(DEBUG5,
117 			(errmsg("initializing session context"),
118 			 errdetail("selected load balancing node: %d", node_id)));
119 
120 	/* Unset query is in progress */
121 	pool_unset_query_in_progress();
122 
123 	/* The command in progress has not succeeded yet */
124 	pool_unset_command_success();
125 
126 	/* We don't have a write query in this transaction yet */
127 	pool_unset_writing_transaction();
128 
129 	/* Error doesn't occur in this transaction yet */
130 	pool_unset_failed_transaction();
131 
132 	/* Forget transaction isolation mode */
133 	pool_unset_transaction_isolation();
134 
135 	/* We don't skip reading from backends */
136 	pool_unset_skip_reading_from_backends();
137 
138 	/* Backends have not ignored messages yet */
139 	pool_unset_ignore_till_sync();
140 
141 	/* Unset suspend reading from frontend flag */
142 	pool_unset_suspend_reading_from_frontend();
143 
144 	/* Initialize where to send map for PREPARE statements */
145 #ifdef NOT_USED
146 	memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
147 	session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS;
148 #endif							/* NOT_USED */
149 
150 	/*
151 	 * Reset flag to indicate difference in number of affected tuples in
152 	 * UPDATE/DELETE.
153 	 */
154 	session_context->mismatch_ntuples = false;
155 
156 	if (pool_config->memory_cache_enabled)
157 	{
158 		session_context->query_cache_array = pool_create_query_cache_array();
159 		session_context->num_selects = 0;
160 	}
161 
162 	/* Initialize pending message list */
163 	pool_pending_messages_init();
164 
165 	/* Initialize previous pending message */
166 	pool_pending_message_reset_previous_message();
167 
168 	/* Initialize temp tables */
169 	pool_temp_tables_init();
170 
171 #ifdef NOT_USED
172 	/* Initialize preferred main node id */
173 	pool_reset_preferred_main_node_id();
174 #endif
175 
176 	/* Snapshot isolation state */
177 	session_context->si_state = SI_NO_SNAPSHOT;
178 
179 	/* Transaction read only */
180 	session_context->transaction_read_only = false;
181 
182 	dml_adaptive_init();
183 }
184 
185 /*
186  * Destroy session context.
187  */
188 void
pool_session_context_destroy(void)189 pool_session_context_destroy(void)
190 {
191 	if (session_context)
192 	{
193 		pool_clear_sent_message_list();
194 		pfree(session_context->message_list.sent_messages);
195 		if (pool_config->memory_cache_enabled)
196 		{
197 			pool_discard_query_cache_array(session_context->query_cache_array);
198 			session_context->num_selects = 0;
199 		}
200 
201 		if (session_context->query_context)
202 			pool_query_context_destroy(session_context->query_context);
203 		MemoryContextDelete(session_context->memory_context);
204 
205 		dml_adaptive_destroy();
206 	}
207 	/* XXX For now, just zap memory */
208 	memset(&session_context_d, 0, sizeof(session_context_d));
209 	session_context = NULL;
210 }
211 
212 /*
213  * Return session context
214  */
215 POOL_SESSION_CONTEXT *
pool_get_session_context(bool noerror)216 pool_get_session_context(bool noerror)
217 {
218 	if (!session_context && !noerror)
219 	{
220 		ereport(FATAL,
221 				(return_code(2),
222 				 errmsg("unable to get session context")));
223 	}
224 	return session_context;
225 }
226 
227 /*
228  * Return local session id
229  */
230 int
pool_get_local_session_id(void)231 pool_get_local_session_id(void)
232 {
233 	return pool_get_session_context(false)->process_context->local_session_id;
234 }
235 
236 /*
237  * Return true if query is in progress
238  */
239 bool
pool_is_query_in_progress(void)240 pool_is_query_in_progress(void)
241 {
242 	return pool_get_session_context(false)->in_progress;
243 }
244 
245 /*
246  * Set query is in progress
247  */
248 void
pool_set_query_in_progress(void)249 pool_set_query_in_progress(void)
250 {
251 	ereport(DEBUG5,
252 			(errmsg("session context: setting query in progress. DONE")));
253 
254 	pool_get_session_context(false)->in_progress = true;
255 }
256 
257 /*
258  * Unset query is in progress
259  */
260 void
pool_unset_query_in_progress(void)261 pool_unset_query_in_progress(void)
262 {
263 	POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
264 
265 	ereport(DEBUG5,
266 			(errmsg("session context: unsetting query in progress. DONE")));
267 
268 	s->in_progress = false;
269 
270 	/* Restore where_to_send map if necessary */
271 	if (s->need_to_restore_where_to_send)
272 	{
273 		memcpy(s->query_context->where_to_send, s->where_to_send_save, sizeof(s->where_to_send_save));
274 	}
275 	s->need_to_restore_where_to_send = false;
276 }
277 
278 /*
279  * Return true if we skip reading from backends
280  */
281 bool
pool_is_skip_reading_from_backends(void)282 pool_is_skip_reading_from_backends(void)
283 {
284 	return pool_get_session_context(false)->skip_reading_from_backends;
285 }
286 
287 /*
288  * Set skip_reading_from_backends
289  */
290 void
pool_set_skip_reading_from_backends(void)291 pool_set_skip_reading_from_backends(void)
292 {
293 	ereport(DEBUG5,
294 			(errmsg("session context: setting skip reading from backends. DONE")));
295 
296 
297 	pool_get_session_context(false)->skip_reading_from_backends = true;
298 }
299 
300 /*
301  * Unset skip_reading_from_backends
302  */
303 void
pool_unset_skip_reading_from_backends(void)304 pool_unset_skip_reading_from_backends(void)
305 {
306 	ereport(DEBUG5,
307 			(errmsg("session context: clearing skip reading from backends. DONE")));
308 
309 	pool_get_session_context(false)->skip_reading_from_backends = false;
310 }
311 
312 /*
313  * Return true if we are doing extended query message
314  */
315 bool
pool_is_doing_extended_query_message(void)316 pool_is_doing_extended_query_message(void)
317 {
318 	return pool_get_session_context(false)->doing_extended_query_message;
319 }
320 
321 /*
322  * Set doing_extended_query_message
323  */
324 void
pool_set_doing_extended_query_message(void)325 pool_set_doing_extended_query_message(void)
326 {
327 	ereport(DEBUG5,
328 			(errmsg("session context: setting doing extended query messaging. DONE")));
329 
330 	pool_get_session_context(false)->doing_extended_query_message = true;
331 }
332 
333 /*
334  * Unset doing_extended_query_message
335  */
336 void
pool_unset_doing_extended_query_message(void)337 pool_unset_doing_extended_query_message(void)
338 {
339 	ereport(DEBUG5,
340 			(errmsg("session context: clearing doing extended query messaging. DONE")));
341 
342 	pool_get_session_context(false)->doing_extended_query_message = false;
343 }
344 
345 /*
346  * Return true if backends ignore extended query message
347  */
348 bool
pool_is_ignore_till_sync(void)349 pool_is_ignore_till_sync(void)
350 {
351 	return pool_get_session_context(false)->ignore_till_sync;
352 }
353 
354 /*
355  * Set ignore_till_sync
356  */
357 void
pool_set_ignore_till_sync(void)358 pool_set_ignore_till_sync(void)
359 {
360 	ereport(DEBUG5,
361 			(errmsg("session context: setting ignore till sync. DONE")));
362 
363 	pool_get_session_context(false)->ignore_till_sync = true;
364 }
365 
366 /*
367  * Unset ignore_till_sync
368  */
369 void
pool_unset_ignore_till_sync(void)370 pool_unset_ignore_till_sync(void)
371 {
372 	ereport(DEBUG5,
373 			(errmsg("session context: clearing ignore till sync. DONE")));
374 
375 	pool_get_session_context(false)->ignore_till_sync = false;
376 }
377 
378 /*
379  * Remove a sent message
380  */
381 bool
pool_remove_sent_message(char kind,const char * name)382 pool_remove_sent_message(char kind, const char *name)
383 {
384 	int			i;
385 	POOL_SENT_MESSAGE_LIST *msglist;
386 
387 	if (kind == 0 || name == NULL)
388 		return false;
389 
390 	msglist = &pool_get_session_context(false)->message_list;
391 
392 	for (i = 0; i < msglist->size; i++)
393 	{
394 		if (msglist->sent_messages[i]->kind == kind &&
395 			!strcmp(msglist->sent_messages[i]->name, name))
396 		{
397 			pool_sent_message_destroy(msglist->sent_messages[i]);
398 			break;
399 		}
400 	}
401 
402 	/* sent message not found */
403 	if (i == msglist->size)
404 		return false;
405 
406 	if (i != msglist->size - 1)
407 	{
408 		memmove(&msglist->sent_messages[i], &msglist->sent_messages[i + 1],
409 				sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1));
410 	}
411 
412 	msglist->size--;
413 
414 	return true;
415 }
416 
417 /*
418  * Remove same kind of sent messages
419  */
420 void
pool_remove_sent_messages(char kind)421 pool_remove_sent_messages(char kind)
422 {
423 	int			i;
424 	POOL_SENT_MESSAGE_LIST *msglist;
425 
426 	msglist = &pool_get_session_context(false)->message_list;
427 
428 	for (i = 0; i < msglist->size; i++)
429 	{
430 		if (msglist->sent_messages[i]->kind == kind)
431 		{
432 			if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name))
433 				i--;			/* for relocation by removing */
434 		}
435 	}
436 }
437 
438 /*
439  * Destroy sent message
440  */
441 void
pool_sent_message_destroy(POOL_SENT_MESSAGE * message)442 pool_sent_message_destroy(POOL_SENT_MESSAGE * message)
443 {
444 	bool		in_progress;
445 	POOL_QUERY_CONTEXT *qc = NULL;
446 
447 	dump_sent_message("pool_sent_message_destroy", message);
448 
449 	in_progress = pool_is_query_in_progress();
450 
451 	if (message)
452 	{
453 		if (message->contents)
454 			pfree(message->contents);
455 
456 		if (message->name)
457 			pfree(message->name);
458 
459 		if (message->query_context)
460 		{
461 			if (session_context->query_context != message->query_context)
462 				qc = session_context->query_context;
463 
464 			if (can_query_context_destroy(message->query_context))
465 			{
466 				pool_query_context_destroy(message->query_context);
467 
468 				/*
469 				 * set in_progress flag, because pool_query_context_destroy()
470 				 * unsets in_progress flag
471 				 */
472 				if (in_progress)
473 					pool_set_query_in_progress();
474 
475 				/*
476 				 * set query_context of session_context, because
477 				 * pool_query_context_destroy() sets it to NULL.
478 				 */
479 				if (qc)
480 					session_context->query_context = qc;
481 			}
482 		}
483 
484 		if (session_context->memory_context)
485 			pfree(message);
486 	}
487 }
488 
489 /*
490  * Clear sent message list
491  */
492 void
pool_clear_sent_message_list(void)493 pool_clear_sent_message_list(void)
494 {
495 	POOL_SENT_MESSAGE_LIST *msglist;
496 
497 	msglist = &pool_get_session_context(false)->message_list;
498 
499 	while (msglist->size > 0)
500 	{
501 		pool_remove_sent_messages(msglist->sent_messages[0]->kind);
502 	}
503 }
504 
505 /*
506  * Zap query context info in sent messages to indicate that the query context
507  * has been already removed.
508  */
509 void
pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT * query_context)510 pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT *query_context)
511 {
512 	int			i;
513 	POOL_SENT_MESSAGE_LIST *msglist;
514 
515 	msglist = &pool_get_session_context(false)->message_list;
516 
517 	for (i = 0; i < msglist->size; i++)
518 	{
519 		elog(DEBUG5, "checking zapping sent message: %p query_context: %p",
520 			 &msglist->sent_messages[i], msglist->sent_messages[i]->query_context);
521 		if (msglist->sent_messages[i]->query_context == query_context)
522 		{
523 			msglist->sent_messages[i]->query_context = NULL;
524 			elog(DEBUG5, "Zap sent message: %p", &msglist->sent_messages[i]);
525 		}
526 	}
527 }
528 
529 static void
dump_sent_message(char * caller,POOL_SENT_MESSAGE * m)530 dump_sent_message(char *caller, POOL_SENT_MESSAGE * m)
531 {
532 	ereport(DEBUG5,
533 			(errmsg("called by %s: sent message: address: %p kind: %c name: =%s= state:%d",
534 					caller, m, m->kind, m->name, m->state)));
535 }
536 
537 static void
dml_adaptive_init(void)538 dml_adaptive_init(void)
539 {
540 	if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
541 	{
542 		session_context->is_in_transaction = false;
543 		session_context->transaction_temp_write_list = NIL;
544 	}
545 }
546 
547 static void
dml_adaptive_destroy(void)548 dml_adaptive_destroy(void)
549 {
550 	if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
551 	{
552 		if (session_context->transaction_temp_write_list != NIL)
553 			list_free_deep(session_context->transaction_temp_write_list);
554 	}
555 }
556 
557 /*
558  * Create a sent message.
559  * kind: one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE)
560  * len: message length in host order
561  * contents: message contents
562  * num_tsparams: number of timestamp parameters
563  * name: prepared statement name or portal name
564  */
565 POOL_SENT_MESSAGE *
pool_create_sent_message(char kind,int len,char * contents,int num_tsparams,const char * name,POOL_QUERY_CONTEXT * query_context)566 pool_create_sent_message(char kind, int len, char *contents,
567 						 int num_tsparams, const char *name,
568 						 POOL_QUERY_CONTEXT * query_context)
569 {
570 	POOL_SENT_MESSAGE *msg;
571 
572 	if (!session_context)
573 		ereport(ERROR,
574 				(errmsg("unable to create message"),
575 				 errdetail("cannot get the session context")));
576 
577 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
578 
579 	msg = palloc(sizeof(POOL_SENT_MESSAGE));
580 	msg->kind = kind;
581 	msg->len = len;
582 	msg->contents = palloc(len);
583 	memcpy(msg->contents, contents, len);
584 	msg->state = POOL_SENT_MESSAGE_CREATED;
585 	msg->num_tsparams = num_tsparams;
586 	msg->name = pstrdup(name);
587 	msg->query_context = query_context;
588 	MemoryContextSwitchTo(old_context);
589 
590 	return msg;
591 }
592 
593 /*
594  * Add a sent message to sent message list
595  */
596 void
pool_add_sent_message(POOL_SENT_MESSAGE * message)597 pool_add_sent_message(POOL_SENT_MESSAGE * message)
598 {
599 	POOL_SENT_MESSAGE *old_msg;
600 	POOL_SENT_MESSAGE_LIST *msglist;
601 
602 	dump_sent_message("pool_add_sent_message", message);
603 
604 	if (!message)
605 	{
606 		ereport(DEBUG5,
607 				(errmsg("adding sent message to list"),
608 				 errdetail("message is null")));
609 		return;
610 	}
611 
612 	old_msg = pool_get_sent_message(message->kind, message->name, POOL_SENT_MESSAGE_CREATED);
613 
614 	if (old_msg == message)
615 	{
616 		/*
617 		 * It is likely caller tries to add the exact same message previously
618 		 * added. We should ignore this because pool_remove_sent_message()
619 		 * will free memory allocated in the message.
620 		 */
621 		ereport(DEBUG5,
622 				(errmsg("adding sent message to list"),
623 				 errdetail("adding exactly same message is prohibited")));
624 		return;
625 	}
626 
627 	msglist = &session_context->message_list;
628 
629 	if (old_msg)
630 	{
631 		if (message->kind == 'B')
632 			ereport(DEBUG5,
633 					(errmsg("adding sent message to list"),
634 					 errdetail("portal \"%s\" already exists", message->name)));
635 		else
636 			ereport(DEBUG5,
637 					(errmsg("adding sent message to list"),
638 					 errdetail("prepared statement \"%s\" already exists", message->name)));
639 
640 		if (*message->name == '\0')
641 			pool_remove_sent_message(old_msg->kind, old_msg->name);
642 		else
643 			return;
644 	}
645 
646 	if (msglist->size == msglist->capacity)
647 	{
648 		msglist->capacity *= 2;
649 
650 		MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
651 
652 		msglist->sent_messages = repalloc(msglist->sent_messages,
653 										  sizeof(POOL_SENT_MESSAGE *) * msglist->capacity);
654 
655 		MemoryContextSwitchTo(oldContext);
656 	}
657 
658 	msglist->sent_messages[msglist->size++] = message;
659 }
660 
661 /*
662  * Find a sent message by kind and name.
663  */
664 POOL_SENT_MESSAGE *
pool_get_sent_message(char kind,const char * name,POOL_SENT_MESSAGE_STATE state)665 pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state)
666 {
667 	int			i;
668 	POOL_SENT_MESSAGE_LIST *msglist;
669 
670 	msglist = &pool_get_session_context(false)->message_list;
671 
672 	if (kind == 0 || name == NULL)
673 		return NULL;
674 
675 	for (i = 0; i < msglist->size; i++)
676 	{
677 		if (msglist->sent_messages[i]->kind == kind &&
678 			!strcmp(msglist->sent_messages[i]->name, name) &&
679 			msglist->sent_messages[i]->state == state)
680 			return msglist->sent_messages[i];
681 	}
682 
683 	return NULL;
684 }
685 
686 /*
687  * Find a sent message by query context.
688  */
689 POOL_SENT_MESSAGE *
pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)690 pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)
691 {
692 	int			i;
693 	POOL_SENT_MESSAGE_LIST *msglist;
694 
695 	msglist = &pool_get_session_context(false)->message_list;
696 
697 	if (query_context == NULL)
698 		return NULL;
699 
700 	for (i = 0; i < msglist->size; i++)
701 	{
702 		if (msglist->sent_messages[i]->query_context == query_context)
703 			return msglist->sent_messages[i];
704 	}
705 
706 	return NULL;
707 }
708 
709 /*
710  * Set message state to POOL_SENT_MESSAGE_STATE to POOL_SENT_MESSAGE_CLOSED.
711  */
712 void
pool_set_sent_message_state(POOL_SENT_MESSAGE * message)713 pool_set_sent_message_state(POOL_SENT_MESSAGE * message)
714 {
715 	ereport(DEBUG5,
716 			(errmsg("pool_set_sent_message_state: name:%s kind:%c previous state: %d",
717 					message->name, message->kind, message->state)));
718 	message->state = POOL_SENT_MESSAGE_CLOSED;
719 }
720 
721 /*
722  * We don't have a write query in this transaction yet.
723  */
724 void
pool_unset_writing_transaction(void)725 pool_unset_writing_transaction(void)
726 {
727 	/*
728 	 * If disable_transaction_on_write is 'always', then never turn off
729 	 * writing transaction flag.
730 	 */
731 	if (pool_config->disable_load_balance_on_write != DLBOW_ALWAYS)
732 	{
733 		pool_get_session_context(false)->writing_transaction = false;
734 		ereport(DEBUG5,
735 				(errmsg("session context: clearing writing transaction. DONE")));
736 	}
737 }
738 
739 /*
740  * We have a write query in this transaction.
741  */
742 void
pool_set_writing_transaction(void)743 pool_set_writing_transaction(void)
744 {
745 	/*
746 	 * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never turn on writing
747 	 * transaction flag.
748 	 */
749 	if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
750 	{
751 		pool_get_session_context(false)->writing_transaction = true;
752 		ereport(DEBUG5,
753 				(errmsg("session context: setting writing transaction. DONE")));
754 	}
755 }
756 
757 /*
758  * Do we have a write query in this transaction?
759  */
760 bool
pool_is_writing_transaction(void)761 pool_is_writing_transaction(void)
762 {
763 	return pool_get_session_context(false)->writing_transaction;
764 }
765 
766 /*
767  * Error doesn't occur in this transaction yet.
768  */
769 void
pool_unset_failed_transaction(void)770 pool_unset_failed_transaction(void)
771 {
772 	ereport(DEBUG5,
773 			(errmsg("session context: clearing failed transaction. DONE")));
774 
775 	pool_get_session_context(false)->failed_transaction = false;
776 }
777 
778 /*
779  * Error occurred in this transaction.
780  */
781 void
pool_set_failed_transaction(void)782 pool_set_failed_transaction(void)
783 {
784 	ereport(DEBUG5,
785 			(errmsg("session context: setting failed transaction. DONE")));
786 
787 	pool_get_session_context(false)->failed_transaction = true;
788 }
789 
790 /*
791  * Did error occur in this transaction?
792  */
793 bool
pool_is_failed_transaction(void)794 pool_is_failed_transaction(void)
795 {
796 	return pool_get_session_context(false)->failed_transaction;
797 }
798 
799 /*
800  * Forget transaction isolation mode
801  */
802 void
pool_unset_transaction_isolation(void)803 pool_unset_transaction_isolation(void)
804 {
805 	ereport(DEBUG5,
806 			(errmsg("session context: clearing failed transaction. DONE")));
807 	pool_get_session_context(false)->transaction_isolation = POOL_UNKNOWN;
808 }
809 
810 /*
811  * Set transaction isolation mode
812  */
813 void
pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)814 pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)
815 {
816 	ereport(DEBUG5,
817 			(errmsg("session context: setting transaction isolation. DONE")));
818 	pool_get_session_context(false)->transaction_isolation = isolation_level;
819 }
820 
821 /*
822  * Get or return cached transaction isolation mode
823  */
824 POOL_TRANSACTION_ISOLATION
pool_get_transaction_isolation(void)825 pool_get_transaction_isolation(void)
826 {
827 	POOL_SELECT_RESULT *res;
828 	POOL_TRANSACTION_ISOLATION ret;
829 	ErrorContextCallback callback;
830 
831 	if (!session_context)
832 	{
833 		ereport(WARNING,
834 				(errmsg("error while getting transaction isolation, session context is not initialized")));
835 		return POOL_UNKNOWN;
836 	}
837 
838 	/* It seems cached result is usable. Return it. */
839 	if (session_context->transaction_isolation != POOL_UNKNOWN)
840 		return session_context->transaction_isolation;
841 
842 	/*
843 	 * Register a error context callback to throw proper context message
844 	 */
845 	callback.callback = GetTranIsolationErrorCb;
846 	callback.arg = NULL;
847 	callback.previous = error_context_stack;
848 	error_context_stack = &callback;
849 
850 	/* No cached data is available. Ask backend. */
851 
852 	do_query(MAIN(session_context->backend),
853 			 "SELECT current_setting('transaction_isolation')", &res, MAJOR(session_context->backend));
854 
855 	error_context_stack = callback.previous;
856 
857 	if (res->numrows <= 0)
858 	{
859 		ereport(WARNING,
860 				(errmsg("error while getting transaction isolation, do_query returns no rows")));
861 		free_select_result(res);
862 		return POOL_UNKNOWN;
863 	}
864 	if (res->data[0] == NULL)
865 	{
866 		ereport(WARNING,
867 				(errmsg("error while getting transaction isolation, do_query returns no data")));
868 
869 		free_select_result(res);
870 		return POOL_UNKNOWN;
871 	}
872 	if (res->nullflags[0] == -1)
873 	{
874 		ereport(WARNING,
875 				(errmsg("error while getting transaction isolation, do_query returns NULL")));
876 		free_select_result(res);
877 		return POOL_UNKNOWN;
878 	}
879 
880 	if (!strcmp(res->data[0], "read uncommitted"))
881 		ret = POOL_READ_UNCOMMITTED;
882 	else if (!strcmp(res->data[0], "read committed"))
883 		ret = POOL_READ_COMMITTED;
884 	else if (!strcmp(res->data[0], "repeatable read"))
885 		ret = POOL_REPEATABLE_READ;
886 	else if (!strcmp(res->data[0], "serializable"))
887 		ret = POOL_SERIALIZABLE;
888 	else
889 	{
890 		ereport(WARNING,
891 				(errmsg("error while getting transaction isolation, unknown transaction isolation level:%s", res->data[0])));
892 
893 		ret = POOL_UNKNOWN;
894 	}
895 
896 	free_select_result(res);
897 
898 	if (ret != POOL_UNKNOWN)
899 		session_context->transaction_isolation = ret;
900 
901 	return ret;
902 }
903 
904 static void
GetTranIsolationErrorCb(void * arg)905 GetTranIsolationErrorCb(void *arg)
906 {
907 	errcontext("while getting transaction isolation");
908 }
909 
910 
911 /*
912  * The command in progress has not succeeded yet.
913  */
914 void
pool_unset_command_success(void)915 pool_unset_command_success(void)
916 {
917 	ereport(DEBUG5,
918 			(errmsg("session context: clearing transaction isolation. DONE")));
919 	pool_get_session_context(false)->command_success = false;
920 }
921 
922 /*
923  * The command in progress has succeeded.
924  */
925 void
pool_set_command_success(void)926 pool_set_command_success(void)
927 {
928 	ereport(DEBUG5,
929 			(errmsg("session context: setting command success. DONE")));
930 
931 	pool_get_session_context(false)->command_success = true;
932 }
933 
934 /*
935  * Has the command in progress succeeded?
936  */
937 bool
pool_is_command_success(void)938 pool_is_command_success(void)
939 {
940 	return pool_get_session_context(false)->command_success;
941 }
942 
943 /*
944  * Copy send map
945  */
946 void
pool_copy_prep_where(bool * src,bool * dest)947 pool_copy_prep_where(bool *src, bool *dest)
948 {
949 	memcpy(dest, src, sizeof(bool) * MAX_NUM_BACKENDS);
950 }
951 #ifdef NOT_USED
952 /*
953  * Add to send map a PREPARED statement
954  */
955 void
pool_add_prep_where(char * name,bool * map)956 pool_add_prep_where(char *name, bool *map)
957 {
958 	int			i;
959 
960 	if (!session_context)
961 	{
962 		ereport(ERROR,
963 				(errmsg("pool_add_prep_where: session context is not initialized")));
964 		return;
965 	}
966 
967 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
968 	{
969 		if (*session_context->prep_where.name[i] == '\0')
970 		{
971 			strncpy(session_context->prep_where.name[i], name, POOL_MAX_PREPARED_NAME);
972 			pool_copy_prep_where(map, session_context->prep_where.where_to_send[i]);
973 			return;
974 		}
975 	}
976 	ereport(ERROR,
977 			(errmsg("pool_add_prep_where: no empty slot found")));
978 }
979 
980 /*
981  * Search send map by PREPARED statement name
982  */
983 bool *
pool_get_prep_where(char * name)984 pool_get_prep_where(char *name)
985 {
986 	int			i;
987 
988 	if (!session_context)
989 		ereport(ERROR,
990 				(errmsg("pool_get_prep_where: session context is not initialized")));
991 
992 
993 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
994 	{
995 		if (!strcmp(session_context->prep_where.name[i], name))
996 		{
997 			return session_context->prep_where.where_to_send[i];
998 		}
999 	}
1000 	return NULL;
1001 }
1002 
1003 /*
1004  * Remove PREPARED statement by name
1005  */
1006 void
pool_delete_prep_where(char * name)1007 pool_delete_prep_where(char *name)
1008 {
1009 	int			i;
1010 
1011 	if (!session_context)
1012 		ereport(ERROR,
1013 				(errmsg("pool_delete_prep_where: session context is not initialized")));
1014 
1015 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
1016 	{
1017 		if (!strcmp(session_context->prep_where.name[i], name))
1018 		{
1019 			memset(&session_context->prep_where.where_to_send[i], 0, sizeof(bool) * MAX_NUM_BACKENDS);
1020 			*session_context->prep_where.name[i] = '\0';
1021 			return;
1022 		}
1023 	}
1024 }
1025 #endif							/* NOT_USED */
1026 /*
1027  * Initialize sent message list
1028  */
1029 static void
init_sent_message_list(void)1030 init_sent_message_list(void)
1031 {
1032 	POOL_SENT_MESSAGE_LIST *msglist;
1033 
1034 	msglist = &session_context->message_list;
1035 	msglist->size = 0;
1036 	msglist->capacity = INIT_LIST_SIZE;
1037 
1038 	MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
1039 
1040 	msglist->sent_messages = palloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE);
1041 
1042 	MemoryContextSwitchTo(oldContext);
1043 }
1044 
1045 /*
1046  * Look for extended message list to check if given query context qc
1047  * is used. Returns true if it is not used.
1048  */
1049 bool
can_query_context_destroy(POOL_QUERY_CONTEXT * qc)1050 can_query_context_destroy(POOL_QUERY_CONTEXT * qc)
1051 {
1052 	int			i;
1053 	int			count = 0;
1054 	POOL_SENT_MESSAGE_LIST *msglist;
1055 	ListCell   *cell;
1056 	ListCell   *next;
1057 
1058 	msglist = &session_context->message_list;
1059 
1060 	for (i = 0; i < msglist->size; i++)
1061 	{
1062 		if (msglist->sent_messages[i]->query_context == qc)
1063 			count++;
1064 	}
1065 	if (count > 1)
1066 	{
1067 		ereport(DEBUG5,
1068 				(errmsg("checking if query context can be safely destroyed"),
1069 				 errdetail("query context %p is still used %d times in sent message list. query:\"%s\"",
1070 						   qc, count, qc->original_query)));
1071 		return false;
1072 	}
1073 
1074 	count = 0;
1075 
1076 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1077 	{
1078 		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1079 
1080 		if (message->query_context == qc)
1081 		{
1082 			count++;
1083 		}
1084 		next = lnext(session_context->pending_messages, cell);
1085 	}
1086 
1087 	if (count >= 1)
1088 	{
1089 		ereport(DEBUG5,
1090 				(errmsg("checking if query context can be safely destroyed"),
1091 				 errdetail("query context %p is still used %d times in pending message list. query:%s", qc, count, qc->original_query)));
1092 		return false;
1093 	}
1094 
1095 	return true;
1096 }
1097 
1098 /*
1099  * Initialize pending message list
1100  */
1101 void
pool_pending_messages_init(void)1102 pool_pending_messages_init(void)
1103 {
1104 	if (!session_context)
1105 		ereport(ERROR,
1106 				(errmsg("pool_pending_message_init: session context is not initialized")));
1107 
1108 	session_context->pending_messages = NIL;
1109 }
1110 
1111 /*
1112  * Destroy pending message list
1113  */
1114 void
pool_pending_messages_destroy(void)1115 pool_pending_messages_destroy(void)
1116 {
1117 	ListCell   *cell;
1118 	POOL_PENDING_MESSAGE *msg;
1119 
1120 	if (!session_context)
1121 		ereport(ERROR,
1122 				(errmsg("pool_pending_message_destory: session context is not initialized")));
1123 
1124 	foreach(cell, session_context->pending_messages)
1125 	{
1126 		msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1127 		pfree(msg->contents);
1128 	}
1129 	list_free(session_context->pending_messages);
1130 }
1131 
1132 /*
1133  * Create one message.
1134  */
1135 POOL_PENDING_MESSAGE *
pool_pending_message_create(char kind,int len,char * contents)1136 pool_pending_message_create(char kind, int len, char *contents)
1137 {
1138 	POOL_PENDING_MESSAGE *msg;
1139 	MemoryContext old_context;
1140 
1141 	if (!session_context)
1142 		ereport(ERROR,
1143 				(errmsg("pool_pending_message_create: session context is not initialized")));
1144 
1145 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1146 	msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1147 
1148 	switch (kind)
1149 	{
1150 		case 'P':
1151 			msg->type = POOL_PARSE;
1152 			break;
1153 
1154 		case 'B':
1155 			msg->type = POOL_BIND;
1156 			break;
1157 
1158 		case 'E':
1159 			msg->type = POOL_EXECUTE;
1160 			break;
1161 
1162 		case 'D':
1163 			msg->type = POOL_DESCRIBE;
1164 			break;
1165 
1166 		case 'C':
1167 			msg->type = POOL_CLOSE;
1168 			break;
1169 
1170 		case 'S':
1171 			msg->type = POOL_SYNC;
1172 			break;
1173 
1174 		default:
1175 			ereport(ERROR,
1176 					(errmsg("pool_pending_message_create: unknow kind: %c", kind)));
1177 			break;
1178 	}
1179 
1180 	if (len > 0)
1181 	{
1182 		msg->contents = palloc(len);
1183 		memcpy(msg->contents, contents, len);
1184 	}
1185 	else
1186 		msg->contents = NULL;
1187 
1188 	msg->contents_len = len;
1189 	msg->query[0] = '\0';
1190 	msg->statement[0] = '\0';
1191 	msg->portal[0] = '\0';
1192 	msg->is_rows_returned = false;
1193 	msg->not_forward_to_frontend = false;
1194 	memset(msg->node_ids, false, sizeof(msg->node_ids));
1195 
1196 	MemoryContextSwitchTo(old_context);
1197 
1198 	return msg;
1199 }
1200 
1201 /*
1202  * Set node_ids field of message which indicates which backend nodes the
1203  * message was sent.
1204  */
1205 void
pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1206 pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1207 {
1208 	memcpy(message->node_ids, query_context->where_to_send, sizeof(message->node_ids));
1209 
1210 	message->query_context = query_context;
1211 
1212 	if (is_select_query(query_context->parse_tree, query_context->original_query))
1213 	{
1214 		message->is_rows_returned = true;
1215 	}
1216 }
1217 
1218 /*
1219  * Set where_to_send field in query_context from node_ids field of message
1220  * which indicates which backend nodes the message was sent.
1221  */
1222 void
pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1223 pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1224 {
1225 	int			i;
1226 
1227 	POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
1228 
1229 	/* Save where_to_send map */
1230 	memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
1231 	s->need_to_restore_where_to_send = true;
1232 
1233 	/* Rewrite where_to_send map */
1234 	memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
1235 
1236 	for (i = 0; i < MAX_NUM_BACKENDS; i++)
1237 	{
1238 		if (message->node_ids[i])
1239 		{
1240 			query_context->where_to_send[i] = 1;
1241 		}
1242 	}
1243 }
1244 
1245 /*
1246  * Set query field of message.
1247  */
1248 void
pool_pending_message_query_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1249 pool_pending_message_query_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1250 {
1251 	StrNCpy(message->query, query_context->original_query, sizeof(message->query));
1252 }
1253 
1254 /*
1255  * Add one message to the tail of the list
1256  */
1257 void
pool_pending_message_add(POOL_PENDING_MESSAGE * message)1258 pool_pending_message_add(POOL_PENDING_MESSAGE * message)
1259 {
1260 	POOL_PENDING_MESSAGE *msg;
1261 	MemoryContext old_context;
1262 
1263 	if (!session_context)
1264 		ereport(ERROR,
1265 				(errmsg("pool_pending_message_add: session context is not initialized")));
1266 
1267 	switch (message->type)
1268 	{
1269 		case POOL_PARSE:
1270 			StrNCpy(message->statement, message->contents, sizeof(message->statement));
1271 			StrNCpy(message->query, message->contents + strlen(message->contents) + 1, sizeof(message->query));
1272 			break;
1273 
1274 		case POOL_BIND:
1275 			StrNCpy(message->portal, message->contents, sizeof(message->portal));
1276 			StrNCpy(message->statement, message->contents + strlen(message->contents) + 1, sizeof(message->statement));
1277 			break;
1278 
1279 		case POOL_EXECUTE:
1280 			StrNCpy(message->portal, message->contents, sizeof(message->portal));
1281 			break;
1282 
1283 		case POOL_CLOSE:
1284 		case POOL_DESCRIBE:
1285 			if (*message->contents == 'S')
1286 				StrNCpy(message->statement, message->contents + 1, sizeof(message->statement));
1287 			else
1288 				StrNCpy(message->portal, message->contents + 1, sizeof(message->portal));
1289 			break;
1290 
1291 		case POOL_SYNC:
1292 			break;
1293 
1294 		default:
1295 			ereport(ERROR,
1296 					(errmsg("pool_pending_message_add: unknown message type:%d", message->type)));
1297 			return;
1298 			break;
1299 	}
1300 
1301 	if (message->type != POOL_SYNC)
1302 		ereport(Elevel,
1303 				(errmsg("pool_pending_message_add: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1304 						pool_pending_message_type_to_string(message->type),
1305 						message->contents_len, message->query, message->statement, message->portal,
1306 						message->node_ids[0], message->node_ids[1])));
1307 	else
1308 		ereport(Elevel,
1309 				(errmsg("pool_pending_message_add: message type: sync")));
1310 
1311 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1312 	msg = copy_pending_message(message);
1313 	session_context->pending_messages = lappend(session_context->pending_messages, msg);
1314 	MemoryContextSwitchTo(old_context);
1315 }
1316 
1317 /*
1318  * Return the message from the head of the list.  If the list is not empty, a
1319  * copy of the message is returned. If the list is empty, returns NULL.
1320  */
1321 POOL_PENDING_MESSAGE *
pool_pending_message_head_message(void)1322 pool_pending_message_head_message(void)
1323 {
1324 	ListCell   *cell;
1325 	POOL_PENDING_MESSAGE *message;
1326 	POOL_PENDING_MESSAGE *m;
1327 	MemoryContext old_context;
1328 
1329 	if (!session_context)
1330 		ereport(ERROR,
1331 				(errmsg("pool_pending_message_head_message: session context is not initialized")));
1332 
1333 	if (list_length(session_context->pending_messages) == 0)
1334 	{
1335 		return NULL;
1336 	}
1337 
1338 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1339 
1340 	cell = list_head(session_context->pending_messages);
1341 	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1342 	message = copy_pending_message(m);
1343 	ereport(Elevel,
1344 			(errmsg("pool_pending_message_head_message: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1345 					pool_pending_message_type_to_string(message->type),
1346 					message->contents_len, message->query, message->statement, message->portal,
1347 					message->node_ids[0], message->node_ids[1])));
1348 
1349 	MemoryContextSwitchTo(old_context);
1350 	return message;
1351 }
1352 
1353 
1354 /*
1355  * Remove one message from the head of the list.  If the list is not empty, a
1356  * copy of the message is returned and the message is removed the message
1357  * list. If the list is empty, returns NULL.
1358  */
1359 POOL_PENDING_MESSAGE *
pool_pending_message_pull_out(void)1360 pool_pending_message_pull_out(void)
1361 {
1362 	ListCell   *cell;
1363 	POOL_PENDING_MESSAGE *message;
1364 	POOL_PENDING_MESSAGE *m;
1365 	MemoryContext old_context;
1366 
1367 	if (!session_context)
1368 		ereport(ERROR,
1369 				(errmsg("pool_pending_message_pull_out: session context is not initialized")));
1370 
1371 	if (list_length(session_context->pending_messages) == 0)
1372 	{
1373 		return NULL;
1374 	}
1375 
1376 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1377 
1378 	cell = list_head(session_context->pending_messages);
1379 	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1380 	message = copy_pending_message(m);
1381 	ereport(Elevel,
1382 			(errmsg("pool_pending_message_pull_out: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1383 					pool_pending_message_type_to_string(message->type),
1384 					message->contents_len, message->query, message->statement, message->portal,
1385 					message->node_ids[0], message->node_ids[1])));
1386 
1387 	pool_pending_message_free_pending_message(m);
1388 	session_context->pending_messages =
1389 		list_delete_cell(session_context->pending_messages, cell);
1390 
1391 	MemoryContextSwitchTo(old_context);
1392 	return message;
1393 }
1394 
1395 /*
1396  * Try to find the first message specified by the message type in the message
1397  * list. If found, a copy of the message is returned. If not, returns NULL.
1398  */
1399 POOL_PENDING_MESSAGE *
pool_pending_message_get(POOL_MESSAGE_TYPE type)1400 pool_pending_message_get(POOL_MESSAGE_TYPE type)
1401 {
1402 	ListCell   *cell;
1403 	ListCell   *next;
1404 	POOL_PENDING_MESSAGE *msg;
1405 	MemoryContext old_context;
1406 
1407 	if (!session_context)
1408 		ereport(ERROR,
1409 				(errmsg("pool_pending_message_remove: session context is not initialized")));
1410 
1411 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1412 
1413 	msg = NULL;
1414 
1415 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1416 	{
1417 		POOL_PENDING_MESSAGE *m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1418 
1419 		next = lnext(session_context->pending_messages, cell);
1420 
1421 		if (m->type == type)
1422 		{
1423 			msg = copy_pending_message(m);
1424 			break;
1425 		}
1426 	}
1427 
1428 	MemoryContextSwitchTo(old_context);
1429 	return msg;
1430 }
1431 
1432 /*
1433  * Get message specification (either statement ('S') or portal ('P')) from a
1434  * close message.
1435  */
1436 char
pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)1437 pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)
1438 {
1439 	return *msg->contents;
1440 }
1441 
1442 /*
1443  * Get statement or portal name from close message.
1444  * The returned pointer is within "msg".
1445  */
1446 char *
pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)1447 pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)
1448 {
1449 	return (msg->contents) + 1;
1450 }
1451 
1452 /*
1453  * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory
1454  * context except the query context.
1455  */
copy_pending_message(POOL_PENDING_MESSAGE * message)1456 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * message)
1457 {
1458 	POOL_PENDING_MESSAGE *msg;
1459 
1460 	msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1461 	memcpy(msg, message, sizeof(POOL_PENDING_MESSAGE));
1462 	msg->contents = palloc(msg->contents_len);
1463 	memcpy(msg->contents, message->contents, msg->contents_len);
1464 
1465 	return msg;
1466 }
1467 
1468 /*
1469  * Free POOL_PENDING_MESSAGE object in the current memory
1470  * context except the query context.
1471  */
1472 void
pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)1473 pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)
1474 {
1475 	if (message == NULL)
1476 		return;
1477 
1478 	if (message->contents)
1479 		pfree(message->contents);
1480 
1481 	pfree(message);
1482 }
1483 
1484 /*
1485  * Reset previous message.
1486  */
1487 void
pool_pending_message_reset_previous_message(void)1488 pool_pending_message_reset_previous_message(void)
1489 {
1490 	if (!session_context)
1491 	{
1492 		ereport(ERROR,
1493 				(errmsg("pool_pending_message_reset_previous_message: session context is not initialized")));
1494 		return;
1495 	}
1496 	session_context->previous_message_exists = false;
1497 }
1498 
1499 /*
1500  * Set previous message.
1501  */
1502 void
pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)1503 pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)
1504 {
1505 	if (!session_context)
1506 	{
1507 		ereport(ERROR,
1508 				(errmsg("pool_pending_message_set_previous_message: session context is not initialized")));
1509 		return;
1510 	}
1511 	session_context->previous_message_exists = true;
1512 	memcpy(&session_context->previous_message, message, sizeof(POOL_PENDING_MESSAGE));
1513 }
1514 
1515 /*
1516  * Get previous message. This actually returns the address of memory. Do not
1517  * try to free using pool_pending_message_free_pending_message().
1518  */
1519 POOL_PENDING_MESSAGE *
pool_pending_message_get_previous_message(void)1520 pool_pending_message_get_previous_message(void)
1521 {
1522 	if (!session_context)
1523 	{
1524 		ereport(ERROR,
1525 				(errmsg("pool_pending_message_get_previous_message: session context is not initialized")));
1526 		return NULL;
1527 	}
1528 	if (session_context->previous_message_exists == false)
1529 		return NULL;
1530 
1531 	return &session_context->previous_message;
1532 }
1533 
1534 /*
1535  * Return true if there's any pending message.
1536  */
1537 bool
pool_pending_message_exists(void)1538 pool_pending_message_exists(void)
1539 {
1540 	return list_length(session_context->pending_messages) > 0;
1541 }
1542 
1543 /*
1544  * Convert enum pending message type to string. The returned string must not
1545  * be modified or freed.
1546  */
1547 const char *
pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)1548 pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)
1549 {
1550 	static const char *pending_msg_string[] = {"Parse", "Bind", "Execute",
1551 	"Descripbe", "Close", "Sync"};
1552 
1553 	if (type < 0 || type > POOL_SYNC)
1554 		return "unknown type";
1555 	return pending_msg_string[type];
1556 }
1557 
1558 /*
1559  * Check consistency the message type and backend message kind.
1560  * This function is intended to be used for debugging.
1561  */
1562 void
pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type,char kind)1563 pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char kind)
1564 {
1565 	/*
1566 	 * Backend response message sorted by POOL_MESSAGE_TYPE
1567 	 */
1568 	static char backend_response_kind[] = {
1569 		'1',					/* POOL_PARSE, parse complete */
1570 		'2',					/* POOL_BIND, bind complete */
1571 		'*',					/* POOL_EXECUTE, skip checking */
1572 		'*',					/* POOL_DESCRIBE, skip checking */
1573 		'3',					/* POOL_CLOSE, close complete */
1574 		'Z'						/* POOL_SYNC, ready for query */
1575 	};
1576 
1577 	if (type < POOL_PARSE || type > POOL_SYNC)
1578 	{
1579 		ereport(DEBUG5,
1580 				(errmsg("pool_check_pending_message_and_reply: type out of range: %d", type)));
1581 		return;
1582 	}
1583 
1584 	if (backend_response_kind[type] == '*')
1585 	{
1586 		return;
1587 	}
1588 
1589 	if (backend_response_kind[type] != kind)
1590 	{
1591 		ereport(DEBUG5,
1592 				(errmsg("pool_check_pending_message_and_reply: type: %s but is kind: %c",
1593 						pool_pending_message_type_to_string(type), kind)));
1594 	}
1595 	return;
1596 }
1597 
1598 /*
1599  * Find the latest pending message having specified query context.  The
1600  * returned message is a pointer to the message list. So do not free it using
1601  * pool_pending_message_free_pending_message.
1602  */
1603 POOL_PENDING_MESSAGE *
pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)1604 pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)
1605 {
1606 	List	   *msgs;
1607 	POOL_PENDING_MESSAGE *msg;
1608 	int			len;
1609 	ListCell   *cell;
1610 
1611 	if (!session_context)
1612 	{
1613 		ereport(ERROR,
1614 				(errmsg("pool_pending_message_find_lastest_by_query_context: session context is not initialized")));
1615 	}
1616 
1617 	if (!session_context->pending_messages)
1618 		return NULL;
1619 
1620 	msgs = session_context->pending_messages;
1621 
1622 	len = list_length(msgs);
1623 	if (len <= 0)
1624 		return NULL;
1625 
1626 	ereport(DEBUG5,
1627 			(errmsg("pool_pending_message_find_lastest_by_query_context: num messages: %d",
1628 					len)));
1629 
1630 	while (len--)
1631 	{
1632 		cell = list_nth_cell(msgs, len);
1633 		if (cell)
1634 		{
1635 			msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1636 			if (msg->query_context == qc)
1637 			{
1638 				ereport(DEBUG5,
1639 						(errmsg("pool_pending_message_find_lastest_by_query_context: msg found. type: %s",
1640 								pool_pending_message_type_to_string(msg->type))));
1641 				return msg;
1642 			}
1643 			ereport(DEBUG5,
1644 					(errmsg("pool_pending_message_find_lastest_by_query_context: type: %s",
1645 							pool_pending_message_type_to_string(msg->type))));
1646 		}
1647 	}
1648 	return NULL;
1649 }
1650 
1651 /*
1652  * Get target backend id from pending message assuming that the destination for
1653  * the pending message is one of primary or standby node.
1654  */
1655 int
pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)1656 pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)
1657 {
1658 	int			backend_id = -1;
1659 	int			i;
1660 
1661 	for (i = 0; i < MAX_NUM_BACKENDS; i++)
1662 	{
1663 		if (msg->node_ids[i])
1664 		{
1665 			backend_id = i;
1666 			break;
1667 		}
1668 	}
1669 
1670 	if (backend_id == -1)
1671 		ereport(ERROR,
1672 				(errmsg("pool_pending_message_get_target_backend_id: no target backend id found")));
1673 
1674 	return backend_id;
1675 }
1676 
1677 /*
1678  * Get number of pending message list entries of which target backend is same as specified one.
1679  */
1680 int
pool_pending_message_get_message_num_by_backend_id(int backend_id)1681 pool_pending_message_get_message_num_by_backend_id(int backend_id)
1682 {
1683 	ListCell   *cell;
1684 	ListCell   *next;
1685 	int        cnt = 0;
1686 	int        i;
1687 
1688 	if (!session_context)
1689 	{
1690 		ereport(ERROR,
1691 				(errmsg("pool_pending_message_get_message_num_by_backend_id: session context is not initialized")));
1692 		return 0;
1693 	}
1694 
1695 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1696 	{
1697 		POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1698 
1699 		for (i = 0; i < MAX_NUM_BACKENDS; i++)
1700 		{
1701 			if (msg->node_ids[i] && i == backend_id)
1702 			{
1703 				cnt++;
1704 				break;
1705 			}
1706 		}
1707 
1708 		next = lnext(session_context->pending_messages, cell);
1709 	}
1710 	return cnt;
1711 }
1712 
1713 /*
1714  * Dump whole pending message list
1715  */
1716 void
dump_pending_message(void)1717 dump_pending_message(void)
1718 {
1719 	ListCell   *cell;
1720 	ListCell   *next;
1721 
1722 	if (!session_context)
1723 	{
1724 		ereport(ERROR,
1725 				(errmsg("dump_pending_message: session context is not initialized")));
1726 		return;
1727 	}
1728 
1729 	ereport(DEBUG5,
1730 			(errmsg("start dumping pending message list")));
1731 
1732 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1733 	{
1734 		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1735 
1736 		ereport(DEBUG5,
1737 				(errmsg("pool_pending_message_dump: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1738 						message->type, message->contents_len, message->query, message->statement, message->portal,
1739 						message->node_ids[0], message->node_ids[1])));
1740 
1741 		next = lnext(session_context->pending_messages, cell);
1742 	}
1743 
1744 	ereport(DEBUG5,
1745 			(errmsg("end dumping pending message list")));
1746 }
1747 
1748 /*
1749  * Set protocol major version number
1750  */
1751 void
pool_set_major_version(int major)1752 pool_set_major_version(int major)
1753 {
1754 	if (session_context)
1755 	{
1756 		session_context->major = major;
1757 	}
1758 }
1759 
1760 /*
1761  * Get protocol major version number
1762  */
1763 int
pool_get_major_version(void)1764 pool_get_major_version(void)
1765 {
1766 	if (session_context)
1767 	{
1768 		return session_context->major;
1769 	}
1770 	return PROTO_MAJOR_V3;
1771 }
1772 
1773 /*
1774  * Set protocol minor version number
1775  */
1776 void
pool_set_minor_version(int minor)1777 pool_set_minor_version(int minor)
1778 {
1779 	if (session_context)
1780 	{
1781 		session_context->minor = minor;
1782 	}
1783 }
1784 
1785 /*
1786  * Get protocol minor version number
1787  */
1788 int
pool_get_minor_version(void)1789 pool_get_minor_version(void)
1790 {
1791 	if (session_context)
1792 	{
1793 		return session_context->minor;
1794 	}
1795 	return 0;
1796 }
1797 
1798 /*
1799  * Is suspend_reading_from_frontend flag set?
1800  */
1801 bool
pool_is_suspend_reading_from_frontend(void)1802 pool_is_suspend_reading_from_frontend(void)
1803 {
1804 	return session_context->suspend_reading_from_frontend;
1805 }
1806 
1807 /*
1808  * Set suspend_reading_from_frontend flag.
1809  */
1810 void
pool_set_suspend_reading_from_frontend(void)1811 pool_set_suspend_reading_from_frontend(void)
1812 {
1813 	session_context->suspend_reading_from_frontend = true;
1814 }
1815 
1816 /*
1817  * Unset suspend_reading_from_frontend flag.
1818  */
1819 void
pool_unset_suspend_reading_from_frontend(void)1820 pool_unset_suspend_reading_from_frontend(void)
1821 {
1822 	session_context->suspend_reading_from_frontend = false;
1823 }
1824 
1825 #ifdef NOT_USED
1826 /*
1827  * Set preferred "main" node id.
1828  * Only used for SimpleForwardToFrontend.
1829  */
1830 void
pool_set_preferred_main_node_id(int node_id)1831 pool_set_preferred_main_node_id(int node_id)
1832 {
1833 	session_context->preferred_main_node_id = node_id;
1834 }
1835 
1836 /*
1837  * Return preferred "main" node id.
1838  * Only used for SimpleForwardToFrontend.
1839  */
1840 int
pool_get_preferred_main_node_id(void)1841 pool_get_preferred_main_node_id(void)
1842 {
1843 	return session_context->preferred_main_node_id;
1844 }
1845 
1846 /*
1847  * Reset preferred "main" node id.
1848  * Only used for SimpleForwardToFrontend.
1849  */
1850 void
pool_reset_preferred_main_node_id(void)1851 pool_reset_preferred_main_node_id(void)
1852 {
1853 	session_context->preferred_main_node_id = -1;
1854 }
1855 #endif
1856 
1857 /*-----------------------------------------------------------------------
1858  * Temporary table list management modules.
1859  *-----------------------------------------------------------------------
1860  */
1861 
1862 /*
1863  * Initialize temp table list
1864  */
1865 void
pool_temp_tables_init(void)1866 pool_temp_tables_init(void)
1867 {
1868 	if (!session_context)
1869 		ereport(ERROR,
1870 				(errmsg("pool_temp_tables_init: session context is not initialized")));
1871 
1872 	session_context->temp_tables = NIL;
1873 }
1874 
1875 /*
1876  * Destroy temp table list
1877  */
1878 void
pool_temp_tables_destroy(void)1879 pool_temp_tables_destroy(void)
1880 {
1881 	if (!session_context)
1882 		ereport(ERROR,
1883 				(errmsg("pool_temp_tables_destory: session context is not initialized")));
1884 
1885 	list_free(session_context->temp_tables);
1886 }
1887 
1888 /*
1889  * Add a temp table to the tail of the list.
1890  * If the table already exists, just replace state.
1891  */
1892 void
pool_temp_tables_add(char * tablename,POOL_TEMP_TABLE_STATE state)1893 pool_temp_tables_add(char * tablename, POOL_TEMP_TABLE_STATE state)
1894 {
1895 	MemoryContext old_context;
1896 	POOL_TEMP_TABLE * table;
1897 
1898 	if (!session_context)
1899 		ereport(ERROR,
1900 				(errmsg("pool_temp_tables_add: session context is not initialized")));
1901 
1902 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1903 
1904 	table = pool_temp_tables_find(tablename);
1905 	if (table)
1906 	{
1907 		/* Table already exists. Just replace state. */
1908 		table->state = state;
1909 	}
1910 	else
1911 	{
1912 		table = palloc(sizeof(POOL_TEMP_TABLE));
1913 		StrNCpy(table->tablename, tablename, sizeof(table->tablename));
1914 		table->state = state;
1915 		session_context->temp_tables = lappend(session_context->temp_tables, table);
1916 	}
1917 
1918 	MemoryContextSwitchTo(old_context);
1919 }
1920 
1921 /*
1922  * Returns pointer to the table cell if specified tablename is in the temp table list.
1923  */
1924 
1925 POOL_TEMP_TABLE *
pool_temp_tables_find(char * tablename)1926 pool_temp_tables_find(char * tablename)
1927 {
1928 	ListCell   *cell;
1929 
1930 	if (!session_context)
1931 		ereport(ERROR,
1932 				(errmsg("pool_temp_tables_find: session context is not initialized")));
1933 
1934 	foreach(cell, session_context->temp_tables)
1935 	{
1936 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1937 		if (strcmp(tablename, table->tablename) == 0)
1938 			return table;
1939 	}
1940 	return NULL;
1941 }
1942 
1943 /*
1944  * If requested state or table state is TEMP_TABLE_DROP_COMMITTED, removes the
1945  * temp table entry from the list.  Otherwise just set the requested state to
1946  * the table state.
1947  */
1948 void
pool_temp_tables_delete(char * tablename,POOL_TEMP_TABLE_STATE state)1949 pool_temp_tables_delete(char * tablename, POOL_TEMP_TABLE_STATE state)
1950 {
1951 	POOL_TEMP_TABLE * table;
1952 	MemoryContext old_context;
1953 
1954 	if (!session_context)
1955 		ereport(ERROR,
1956 				(errmsg("pool_temp_tables_delete: session context is not initialized")));
1957 
1958 	ereport(LOG,
1959 			(errmsg("pool_temp_tables_delete: table: %s state: %d", tablename, state)));
1960 
1961 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1962 
1963 	table = pool_temp_tables_find(tablename);
1964 
1965 	if (table)
1966 	{
1967 		if (table->state == TEMP_TABLE_DROP_COMMITTED)
1968 		{
1969 			ereport(DEBUG1,
1970 					(errmsg("pool_temp_tables_delete: remove %s. previous state: %d requested state: %d",
1971 							table->tablename, table->state, state)));
1972 
1973 			session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1974 		}
1975 		else
1976 		{
1977 			ereport(DEBUG1,
1978 					(errmsg("pool_temp_tables_delete: set state %s. previous state: %d requested state: %d",
1979 							table->tablename, table->state, state)));
1980 			table->state = state;
1981 		}
1982 	}
1983 
1984 	MemoryContextSwitchTo(old_context);
1985 }
1986 
1987 /*
1988  * Commits creating entries. Also remove dropping entries. This is supposed to
1989  * be called when an explicit transaction commits.
1990  */
1991 void
pool_temp_tables_commit_pending(void)1992 pool_temp_tables_commit_pending(void)
1993 {
1994 	ListCell   *cell;
1995 	MemoryContext old_context;
1996 
1997 	if (!session_context)
1998 		ereport(ERROR,
1999 				(errmsg("pool_temp_tables_commit_pending: session context is not initialized")));
2000 
2001 	old_context = MemoryContextSwitchTo(session_context->memory_context);
2002 
2003 	pool_temp_tables_dump();
2004 
2005 Retry:
2006 	foreach(cell, session_context->temp_tables)
2007 	{
2008 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2009 
2010 		if (table->state == TEMP_TABLE_CREATING)
2011 		{
2012 			ereport(DEBUG1,
2013 					(errmsg("pool_temp_tables_commit_pending: commit: %s", table->tablename)));
2014 
2015 			table->state = TEMP_TABLE_CREATE_COMMITTED;
2016 		}
2017 		else if (table->state == TEMP_TABLE_DROPPING)
2018 		{
2019 			ereport(DEBUG1,
2020 					(errmsg("pool_temp_tables_commit_pending: remove: %s", table->tablename)));
2021 			session_context->temp_tables = list_delete_cell(session_context->temp_tables, cell);
2022 			pool_temp_tables_dump();
2023 			goto Retry;
2024 		}
2025 	}
2026 
2027 	MemoryContextSwitchTo(old_context);
2028 }
2029 
2030 /*
2031  * Removes all ongoing creating or dropping entries. This is supposed to be
2032  * called when an explicit transaction aborts.
2033  */
2034 void
pool_temp_tables_remove_pending(void)2035 pool_temp_tables_remove_pending(void)
2036 {
2037 	ListCell   *cell;
2038 	MemoryContext old_context;
2039 
2040 	if (!session_context)
2041 		ereport(ERROR,
2042 				(errmsg("pool_temp_tables_remove_pending: session context is not initialized")));
2043 
2044 	old_context = MemoryContextSwitchTo(session_context->memory_context);
2045 
2046 	pool_temp_tables_dump();
2047 
2048 Retry:
2049 	foreach(cell, session_context->temp_tables)
2050 	{
2051 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2052 
2053 		if (table->state == TEMP_TABLE_CREATING || table->state == TEMP_TABLE_DROPPING)
2054 		{
2055 			ereport(DEBUG1,
2056 					(errmsg("pool_temp_tables_remove_pending: remove: %s", table->tablename)));
2057 
2058 			session_context->temp_tables = list_delete_cell(session_context->temp_tables, cell);
2059 			pool_temp_tables_dump();
2060 			goto Retry;
2061 		}
2062 	}
2063 
2064 	MemoryContextSwitchTo(old_context);
2065 }
2066 
2067 void
pool_temp_tables_dump(void)2068 pool_temp_tables_dump(void)
2069 {
2070 #ifdef TEMP_TABLES_DEBUG
2071 	ListCell   *cell;
2072 
2073 	if (!session_context)
2074 		ereport(ERROR,
2075 				(errmsg("pool_temp_tables_dump: session context is not initialized")));
2076 
2077 	foreach(cell, session_context->temp_tables)
2078 	{
2079 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2080 		ereport(DEBUG1,
2081 				(errmsg("pool_temp_tables_dump: table %s state: %d",
2082 						table->tablename, table->state)));
2083 	}
2084 #endif
2085 
2086 }
2087