1 /* -*-pgsql-c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2019	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  */
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24 
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "pool_config.h"
30 #include "context/pool_session_context.h"
31 #include "protocol/pool_proto_modules.h"
32 
33 static POOL_SESSION_CONTEXT session_context_d;
34 static POOL_SESSION_CONTEXT * session_context = NULL;
35 static void GetTranIsolationErrorCb(void *arg);
36 static void init_sent_message_list(void);
37 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * messag);
38 static void dump_sent_message(char *caller, POOL_SENT_MESSAGE * m);
39 
40 #ifdef PENDING_MESSAGE_DEBUG
41 static int	Elevel = LOG;
42 #else
43 static int	Elevel = DEBUG2;
44 #endif
45 
46 /*
47  * Initialize per session context
48  */
49 void
pool_init_session_context(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)50 pool_init_session_context(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
51 {
52 	session_context = &session_context_d;
53 	ProcessInfo *process_info;
54 	int			node_id;
55 	int			i;
56 
57 	/* Clear session context memory */
58 	memset(&session_context_d, 0, sizeof(session_context_d));
59 
60 	/* Get Process context */
61 	session_context->process_context = pool_get_process_context();
62 	if (!session_context->process_context)
63 		ereport(ERROR,
64 				(errmsg("failed to get process context")));
65 
66 	/* Set connection info */
67 	session_context->frontend = frontend;
68 	session_context->backend = backend;
69 
70 	/* Initialize query context */
71 	session_context->query_context = NULL;
72 
73 	/* Initialize local session id */
74 	pool_incremnet_local_session_id();
75 
76 	/* Create memory context */
77 	/* TODO re-think about the parent for this context ?? */
78 	session_context->memory_context = AllocSetContextCreate(ProcessLoopContext,
79 															"SessionContext",
80 															ALLOCSET_SMALL_MINSIZE,
81 															ALLOCSET_SMALL_INITSIZE,
82 															ALLOCSET_SMALL_MAXSIZE);
83 
84 	/* Initialize sent message list */
85 	init_sent_message_list();
86 
87 	process_info = pool_get_my_process_info();
88 
89 	if (!process_info)
90 		ereport(ERROR,
91 				(errmsg("failed to get process info for current process")));
92 
93 	/* Choose load balancing node if necessary */
94 	if (!RAW_MODE && pool_config->load_balance_mode)
95 	{
96 		node_id = select_load_balancing_node();
97 	}
98 	else
99 	{
100 		node_id = SL_MODE ? PRIMARY_NODE_ID : MASTER_NODE_ID;
101 	}
102 
103 	session_context->load_balance_node_id = node_id;
104 
105 	for (i = 0; i < NUM_BACKENDS; i++)
106 	{
107 		pool_coninfo(session_context->process_context->proc_id,
108 					 pool_pool_index(), i)->load_balancing_node = node_id;
109 	}
110 
111 	ereport(DEBUG5,
112 			(errmsg("initializing session context"),
113 			 errdetail("selected load balancing node: %d", node_id)));
114 
115 	/* Unset query is in progress */
116 	pool_unset_query_in_progress();
117 
118 	/* The command in progress has not succeeded yet */
119 	pool_unset_command_success();
120 
121 	/* We don't have a write query in this transaction yet */
122 	pool_unset_writing_transaction();
123 
124 	/* Error doesn't occur in this transaction yet */
125 	pool_unset_failed_transaction();
126 
127 	/* Forget transaction isolation mode */
128 	pool_unset_transaction_isolation();
129 
130 	/* We don't skip reading from backends */
131 	pool_unset_skip_reading_from_backends();
132 
133 	/* Backends have not ignored messages yet */
134 	pool_unset_ignore_till_sync();
135 
136 	/* Unset suspend reading from frontend flag */
137 	pool_unset_suspend_reading_from_frontend();
138 
139 	/* Initialize where to send map for PREPARE statements */
140 #ifdef NOT_USED
141 	memset(&session_context->prep_where, 0, sizeof(session_context->prep_where));
142 	session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS;
143 #endif							/* NOT_USED */
144 
145 	/*
146 	 * Reset flag to indicate difference in number of affected tuples in
147 	 * UPDATE/DELETE.
148 	 */
149 	session_context->mismatch_ntuples = false;
150 
151 	if (pool_config->memory_cache_enabled)
152 	{
153 		session_context->query_cache_array = pool_create_query_cache_array();
154 		session_context->num_selects = 0;
155 	}
156 
157 	/* Initialize pending message list */
158 	pool_pending_messages_init();
159 
160 	/* Initialize previous pending message */
161 	pool_pending_message_reset_previous_message();
162 
163 	/* Initialize temp tables */
164 	pool_temp_tables_init();
165 
166 #ifdef NOT_USED
167 	/* Initialize preferred master node id */
168 	pool_reset_preferred_master_node_id();
169 #endif
170 }
171 
172 /*
173  * Destroy session context.
174  */
175 void
pool_session_context_destroy(void)176 pool_session_context_destroy(void)
177 {
178 	if (session_context)
179 	{
180 		pool_clear_sent_message_list();
181 		pfree(session_context->message_list.sent_messages);
182 		if (pool_config->memory_cache_enabled)
183 		{
184 			pool_discard_query_cache_array(session_context->query_cache_array);
185 			session_context->num_selects = 0;
186 		}
187 
188 		if (session_context->query_context)
189 			pool_query_context_destroy(session_context->query_context);
190 		MemoryContextDelete(session_context->memory_context);
191 	}
192 	/* XXX For now, just zap memory */
193 	memset(&session_context_d, 0, sizeof(session_context_d));
194 	session_context = NULL;
195 }
196 
197 /*
198  * Return session context
199  */
200 POOL_SESSION_CONTEXT *
pool_get_session_context(bool noerror)201 pool_get_session_context(bool noerror)
202 {
203 	if (!session_context && !noerror)
204 	{
205 		ereport(FATAL,
206 				(return_code(2),
207 				 errmsg("unable to get session context")));
208 	}
209 	return session_context;
210 }
211 
212 /*
213  * Return local session id
214  */
215 int
pool_get_local_session_id(void)216 pool_get_local_session_id(void)
217 {
218 	return pool_get_session_context(false)->process_context->local_session_id;
219 }
220 
221 /*
222  * Return true if query is in progress
223  */
224 bool
pool_is_query_in_progress(void)225 pool_is_query_in_progress(void)
226 {
227 	return pool_get_session_context(false)->in_progress;
228 }
229 
230 /*
231  * Set query is in progress
232  */
233 void
pool_set_query_in_progress(void)234 pool_set_query_in_progress(void)
235 {
236 	ereport(DEBUG5,
237 			(errmsg("session context: setting query in progress. DONE")));
238 
239 	pool_get_session_context(false)->in_progress = true;
240 }
241 
242 /*
243  * Unset query is in progress
244  */
245 void
pool_unset_query_in_progress(void)246 pool_unset_query_in_progress(void)
247 {
248 	POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
249 
250 	ereport(DEBUG5,
251 			(errmsg("session context: unsetting query in progress. DONE")));
252 
253 	s->in_progress = false;
254 
255 	/* Restore where_to_send map if necessary */
256 	if (s->need_to_restore_where_to_send)
257 	{
258 		memcpy(s->query_context->where_to_send, s->where_to_send_save, sizeof(s->where_to_send_save));
259 	}
260 	s->need_to_restore_where_to_send = false;
261 }
262 
263 /*
264  * Return true if we skip reading from backends
265  */
266 bool
pool_is_skip_reading_from_backends(void)267 pool_is_skip_reading_from_backends(void)
268 {
269 	return pool_get_session_context(false)->skip_reading_from_backends;
270 }
271 
272 /*
273  * Set skip_reading_from_backends
274  */
275 void
pool_set_skip_reading_from_backends(void)276 pool_set_skip_reading_from_backends(void)
277 {
278 	ereport(DEBUG5,
279 			(errmsg("session context: setting skip reading from backends. DONE")));
280 
281 
282 	pool_get_session_context(false)->skip_reading_from_backends = true;
283 }
284 
285 /*
286  * Unset skip_reading_from_backends
287  */
288 void
pool_unset_skip_reading_from_backends(void)289 pool_unset_skip_reading_from_backends(void)
290 {
291 	ereport(DEBUG5,
292 			(errmsg("session context: clearing skip reading from backends. DONE")));
293 
294 	pool_get_session_context(false)->skip_reading_from_backends = false;
295 }
296 
297 /*
298  * Return true if we are doing extended query message
299  */
300 bool
pool_is_doing_extended_query_message(void)301 pool_is_doing_extended_query_message(void)
302 {
303 	return pool_get_session_context(false)->doing_extended_query_message;
304 }
305 
306 /*
307  * Set doing_extended_query_message
308  */
309 void
pool_set_doing_extended_query_message(void)310 pool_set_doing_extended_query_message(void)
311 {
312 	ereport(DEBUG5,
313 			(errmsg("session context: setting doing extended query messaging. DONE")));
314 
315 	pool_get_session_context(false)->doing_extended_query_message = true;
316 }
317 
318 /*
319  * Unset doing_extended_query_message
320  */
321 void
pool_unset_doing_extended_query_message(void)322 pool_unset_doing_extended_query_message(void)
323 {
324 	ereport(DEBUG5,
325 			(errmsg("session context: clearing doing extended query messaging. DONE")));
326 
327 	pool_get_session_context(false)->doing_extended_query_message = false;
328 }
329 
330 /*
331  * Return true if backends ignore extended query message
332  */
333 bool
pool_is_ignore_till_sync(void)334 pool_is_ignore_till_sync(void)
335 {
336 	return pool_get_session_context(false)->ignore_till_sync;
337 }
338 
339 /*
340  * Set ignore_till_sync
341  */
342 void
pool_set_ignore_till_sync(void)343 pool_set_ignore_till_sync(void)
344 {
345 	ereport(DEBUG5,
346 			(errmsg("session context: setting ignore till sync. DONE")));
347 
348 	pool_get_session_context(false)->ignore_till_sync = true;
349 }
350 
351 /*
352  * Unset ignore_till_sync
353  */
354 void
pool_unset_ignore_till_sync(void)355 pool_unset_ignore_till_sync(void)
356 {
357 	ereport(DEBUG5,
358 			(errmsg("session context: clearing ignore till sync. DONE")));
359 
360 	pool_get_session_context(false)->ignore_till_sync = false;
361 }
362 
363 /*
364  * Remove a sent message
365  */
366 bool
pool_remove_sent_message(char kind,const char * name)367 pool_remove_sent_message(char kind, const char *name)
368 {
369 	int			i;
370 	POOL_SENT_MESSAGE_LIST *msglist;
371 
372 	if (kind == 0 || name == NULL)
373 		return false;
374 
375 	msglist = &pool_get_session_context(false)->message_list;
376 
377 	for (i = 0; i < msglist->size; i++)
378 	{
379 		if (msglist->sent_messages[i]->kind == kind &&
380 			!strcmp(msglist->sent_messages[i]->name, name))
381 		{
382 			pool_sent_message_destroy(msglist->sent_messages[i]);
383 			break;
384 		}
385 	}
386 
387 	/* sent message not found */
388 	if (i == msglist->size)
389 		return false;
390 
391 	if (i != msglist->size - 1)
392 	{
393 		memmove(&msglist->sent_messages[i], &msglist->sent_messages[i + 1],
394 				sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1));
395 	}
396 
397 	msglist->size--;
398 
399 	return true;
400 }
401 
402 /*
403  * Remove same kind of sent messages
404  */
405 void
pool_remove_sent_messages(char kind)406 pool_remove_sent_messages(char kind)
407 {
408 	int			i;
409 	POOL_SENT_MESSAGE_LIST *msglist;
410 
411 	msglist = &pool_get_session_context(false)->message_list;
412 
413 	for (i = 0; i < msglist->size; i++)
414 	{
415 		if (msglist->sent_messages[i]->kind == kind)
416 		{
417 			if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name))
418 				i--;			/* for relocation by removing */
419 		}
420 	}
421 }
422 
423 /*
424  * Destroy sent message
425  */
426 void
pool_sent_message_destroy(POOL_SENT_MESSAGE * message)427 pool_sent_message_destroy(POOL_SENT_MESSAGE * message)
428 {
429 	bool		in_progress;
430 	POOL_QUERY_CONTEXT *qc = NULL;
431 
432 	dump_sent_message("pool_sent_message_destroy", message);
433 
434 	in_progress = pool_is_query_in_progress();
435 
436 	if (message)
437 	{
438 		if (message->contents)
439 			pfree(message->contents);
440 
441 		if (message->name)
442 			pfree(message->name);
443 
444 		if (message->query_context)
445 		{
446 			if (session_context->query_context != message->query_context)
447 				qc = session_context->query_context;
448 
449 			if (can_query_context_destroy(message->query_context))
450 			{
451 				pool_query_context_destroy(message->query_context);
452 
453 				/*
454 				 * set in_progress flag, because pool_query_context_destroy()
455 				 * unsets in_progress flag
456 				 */
457 				if (in_progress)
458 					pool_set_query_in_progress();
459 
460 				/*
461 				 * set query_context of session_context, because
462 				 * pool_query_context_destroy() sets it to NULL.
463 				 */
464 				if (qc)
465 					session_context->query_context = qc;
466 			}
467 		}
468 
469 		if (session_context->memory_context)
470 			pfree(message);
471 	}
472 }
473 
474 /*
475  * Clear sent message list
476  */
477 void
pool_clear_sent_message_list(void)478 pool_clear_sent_message_list(void)
479 {
480 	POOL_SENT_MESSAGE_LIST *msglist;
481 
482 	msglist = &pool_get_session_context(false)->message_list;
483 
484 	while (msglist->size > 0)
485 	{
486 		pool_remove_sent_messages(msglist->sent_messages[0]->kind);
487 	}
488 }
489 
490 /*
491  * Zap query context info in sent messages to indicate that the query context
492  * has been already removed.
493  */
494 void
pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT * query_context)495 pool_zap_query_context_in_sent_messages(POOL_QUERY_CONTEXT *query_context)
496 {
497 	int			i;
498 	POOL_SENT_MESSAGE_LIST *msglist;
499 
500 	msglist = &pool_get_session_context(false)->message_list;
501 
502 	for (i = 0; i < msglist->size; i++)
503 	{
504 		elog(DEBUG5, "checking zapping sent message: %p query_context: %p",
505 			 &msglist->sent_messages[i], msglist->sent_messages[i]->query_context);
506 		if (msglist->sent_messages[i]->query_context == query_context)
507 		{
508 			msglist->sent_messages[i]->query_context = NULL;
509 			elog(DEBUG5, "Zap sent message: %p", &msglist->sent_messages[i]);
510 		}
511 	}
512 }
513 
514 static void
dump_sent_message(char * caller,POOL_SENT_MESSAGE * m)515 dump_sent_message(char *caller, POOL_SENT_MESSAGE * m)
516 {
517 	ereport(DEBUG5,
518 			(errmsg("called by %s: sent message: address: %p kind: %c name: =%s= state:%d",
519 					caller, m, m->kind, m->name, m->state)));
520 }
521 
522 /*
523  * Create a sent message.
524  * kind: one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE)
525  * len: message length in host order
526  * contents: message contents
527  * num_tsparams: number of timestamp parameters
528  * name: prepared statement name or portal name
529  */
530 POOL_SENT_MESSAGE *
pool_create_sent_message(char kind,int len,char * contents,int num_tsparams,const char * name,POOL_QUERY_CONTEXT * query_context)531 pool_create_sent_message(char kind, int len, char *contents,
532 						 int num_tsparams, const char *name,
533 						 POOL_QUERY_CONTEXT * query_context)
534 {
535 	POOL_SENT_MESSAGE *msg;
536 
537 	if (!session_context)
538 		ereport(ERROR,
539 				(errmsg("unable to create message"),
540 				 errdetail("cannot get the session context")));
541 
542 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
543 
544 	msg = palloc(sizeof(POOL_SENT_MESSAGE));
545 	msg->kind = kind;
546 	msg->len = len;
547 	msg->contents = palloc(len);
548 	memcpy(msg->contents, contents, len);
549 	msg->state = POOL_SENT_MESSAGE_CREATED;
550 	msg->num_tsparams = num_tsparams;
551 	msg->name = pstrdup(name);
552 	msg->query_context = query_context;
553 	MemoryContextSwitchTo(old_context);
554 
555 	return msg;
556 }
557 
558 /*
559  * Add a sent message to sent message list
560  */
561 void
pool_add_sent_message(POOL_SENT_MESSAGE * message)562 pool_add_sent_message(POOL_SENT_MESSAGE * message)
563 {
564 	POOL_SENT_MESSAGE *old_msg;
565 	POOL_SENT_MESSAGE_LIST *msglist;
566 
567 	dump_sent_message("pool_add_sent_message", message);
568 
569 	if (!message)
570 	{
571 		ereport(DEBUG5,
572 				(errmsg("adding sent message to list"),
573 				 errdetail("message is null")));
574 		return;
575 	}
576 
577 	old_msg = pool_get_sent_message(message->kind, message->name, POOL_SENT_MESSAGE_CREATED);
578 
579 	if (old_msg == message)
580 	{
581 		/*
582 		 * It is likely caller tries to add the exact same message previously
583 		 * added. We should ignore this because pool_remove_sent_message()
584 		 * will free memory allocated in the message.
585 		 */
586 		ereport(DEBUG5,
587 				(errmsg("adding sent message to list"),
588 				 errdetail("adding exactly same message is prohibited")));
589 		return;
590 	}
591 
592 	msglist = &session_context->message_list;
593 
594 	if (old_msg)
595 	{
596 		if (message->kind == 'B')
597 			ereport(DEBUG5,
598 					(errmsg("adding sent message to list"),
599 					 errdetail("portal \"%s\" already exists", message->name)));
600 		else
601 			ereport(DEBUG5,
602 					(errmsg("adding sent message to list"),
603 					 errdetail("prepared statement \"%s\" already exists", message->name)));
604 
605 		if (*message->name == '\0')
606 			pool_remove_sent_message(old_msg->kind, old_msg->name);
607 		else
608 			return;
609 	}
610 
611 	if (msglist->size == msglist->capacity)
612 	{
613 		msglist->capacity *= 2;
614 
615 		MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
616 
617 		msglist->sent_messages = repalloc(msglist->sent_messages,
618 										  sizeof(POOL_SENT_MESSAGE *) * msglist->capacity);
619 
620 		MemoryContextSwitchTo(oldContext);
621 	}
622 
623 	msglist->sent_messages[msglist->size++] = message;
624 }
625 
626 /*
627  * Find a sent message by kind and name.
628  */
629 POOL_SENT_MESSAGE *
pool_get_sent_message(char kind,const char * name,POOL_SENT_MESSAGE_STATE state)630 pool_get_sent_message(char kind, const char *name, POOL_SENT_MESSAGE_STATE state)
631 {
632 	int			i;
633 	POOL_SENT_MESSAGE_LIST *msglist;
634 
635 	msglist = &pool_get_session_context(false)->message_list;
636 
637 	if (kind == 0 || name == NULL)
638 		return NULL;
639 
640 	for (i = 0; i < msglist->size; i++)
641 	{
642 		if (msglist->sent_messages[i]->kind == kind &&
643 			!strcmp(msglist->sent_messages[i]->name, name) &&
644 			msglist->sent_messages[i]->state == state)
645 			return msglist->sent_messages[i];
646 	}
647 
648 	return NULL;
649 }
650 
651 /*
652  * Find a sent message by query context.
653  */
654 POOL_SENT_MESSAGE *
pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)655 pool_get_sent_message_by_query_context(POOL_QUERY_CONTEXT * query_context)
656 {
657 	int			i;
658 	POOL_SENT_MESSAGE_LIST *msglist;
659 
660 	msglist = &pool_get_session_context(false)->message_list;
661 
662 	if (query_context == NULL)
663 		return NULL;
664 
665 	for (i = 0; i < msglist->size; i++)
666 	{
667 		if (msglist->sent_messages[i]->query_context == query_context)
668 			return msglist->sent_messages[i];
669 	}
670 
671 	return NULL;
672 }
673 
674 /*
675  * Set message state to POOL_SENT_MESSAGE_STATE to POOL_SENT_MESSAGE_CLOSED.
676  */
677 void
pool_set_sent_message_state(POOL_SENT_MESSAGE * message)678 pool_set_sent_message_state(POOL_SENT_MESSAGE * message)
679 {
680 	ereport(DEBUG5,
681 			(errmsg("pool_set_sent_message_state: name:%s kind:%c previous state: %d",
682 					message->name, message->kind, message->state)));
683 	message->state = POOL_SENT_MESSAGE_CLOSED;
684 }
685 
686 /*
687  * We don't have a write query in this transaction yet.
688  */
689 void
pool_unset_writing_transaction(void)690 pool_unset_writing_transaction(void)
691 {
692 	/*
693 	 * If disable_transaction_on_write is 'always', then never turn off
694 	 * writing transaction flag.
695 	 */
696 	if (pool_config->disable_load_balance_on_write != DLBOW_ALWAYS)
697 	{
698 		pool_get_session_context(false)->writing_transaction = false;
699 		ereport(DEBUG5,
700 				(errmsg("session context: clearing writing transaction. DONE")));
701 	}
702 }
703 
704 /*
705  * We have a write query in this transaction.
706  */
707 void
pool_set_writing_transaction(void)708 pool_set_writing_transaction(void)
709 {
710 	/*
711 	 * If disable_transaction_on_write is 'off', then never turn on writing
712 	 * transaction flag.
713 	 */
714 	if (pool_config->disable_load_balance_on_write != DLBOW_OFF)
715 	{
716 		pool_get_session_context(false)->writing_transaction = true;
717 		ereport(DEBUG5,
718 				(errmsg("session context: setting writing transaction. DONE")));
719 	}
720 }
721 
722 /*
723  * Do we have a write query in this transaction?
724  */
725 bool
pool_is_writing_transaction(void)726 pool_is_writing_transaction(void)
727 {
728 	return pool_get_session_context(false)->writing_transaction;
729 }
730 
731 /*
732  * Error doesn't occur in this transaction yet.
733  */
734 void
pool_unset_failed_transaction(void)735 pool_unset_failed_transaction(void)
736 {
737 	ereport(DEBUG5,
738 			(errmsg("session context: clearing failed transaction. DONE")));
739 
740 	pool_get_session_context(false)->failed_transaction = false;
741 }
742 
743 /*
744  * Error occurred in this transaction.
745  */
746 void
pool_set_failed_transaction(void)747 pool_set_failed_transaction(void)
748 {
749 	ereport(DEBUG5,
750 			(errmsg("session context: setting failed transaction. DONE")));
751 
752 	pool_get_session_context(false)->failed_transaction = true;
753 }
754 
755 /*
756  * Did error occur in this transaction?
757  */
758 bool
pool_is_failed_transaction(void)759 pool_is_failed_transaction(void)
760 {
761 	return pool_get_session_context(false)->failed_transaction;
762 }
763 
764 /*
765  * Forget transaction isolation mode
766  */
767 void
pool_unset_transaction_isolation(void)768 pool_unset_transaction_isolation(void)
769 {
770 	ereport(DEBUG5,
771 			(errmsg("session context: clearing failed transaction. DONE")));
772 	pool_get_session_context(false)->transaction_isolation = POOL_UNKNOWN;
773 }
774 
775 /*
776  * Set transaction isolation mode
777  */
778 void
pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)779 pool_set_transaction_isolation(POOL_TRANSACTION_ISOLATION isolation_level)
780 {
781 	ereport(DEBUG5,
782 			(errmsg("session context: setting transaction isolation. DONE")));
783 	pool_get_session_context(false)->transaction_isolation = isolation_level;
784 }
785 
786 /*
787  * Get or return cached transaction isolation mode
788  */
789 POOL_TRANSACTION_ISOLATION
pool_get_transaction_isolation(void)790 pool_get_transaction_isolation(void)
791 {
792 	POOL_SELECT_RESULT *res;
793 	POOL_TRANSACTION_ISOLATION ret;
794 	ErrorContextCallback callback;
795 
796 	if (!session_context)
797 	{
798 		ereport(WARNING,
799 				(errmsg("error while getting transaction isolation, session context is not initialized")));
800 		return POOL_UNKNOWN;
801 	}
802 
803 	/* It seems cached result is usable. Return it. */
804 	if (session_context->transaction_isolation != POOL_UNKNOWN)
805 		return session_context->transaction_isolation;
806 
807 	/*
808 	 * Register a error context callback to throw proper context message
809 	 */
810 	callback.callback = GetTranIsolationErrorCb;
811 	callback.arg = NULL;
812 	callback.previous = error_context_stack;
813 	error_context_stack = &callback;
814 
815 	/* No cached data is available. Ask backend. */
816 
817 	do_query(MASTER(session_context->backend),
818 			 "SELECT current_setting('transaction_isolation')", &res, MAJOR(session_context->backend));
819 
820 	error_context_stack = callback.previous;
821 
822 	if (res->numrows <= 0)
823 	{
824 		ereport(WARNING,
825 				(errmsg("error while getting transaction isolation, do_query returns no rows")));
826 		free_select_result(res);
827 		return POOL_UNKNOWN;
828 	}
829 	if (res->data[0] == NULL)
830 	{
831 		ereport(WARNING,
832 				(errmsg("error while getting transaction isolation, do_query returns no data")));
833 
834 		free_select_result(res);
835 		return POOL_UNKNOWN;
836 	}
837 	if (res->nullflags[0] == -1)
838 	{
839 		ereport(WARNING,
840 				(errmsg("error while getting transaction isolation, do_query returns NULL")));
841 		free_select_result(res);
842 		return POOL_UNKNOWN;
843 	}
844 
845 	if (!strcmp(res->data[0], "read uncommitted"))
846 		ret = POOL_READ_UNCOMMITTED;
847 	else if (!strcmp(res->data[0], "read committed"))
848 		ret = POOL_READ_COMMITTED;
849 	else if (!strcmp(res->data[0], "repeatable read"))
850 		ret = POOL_REPEATABLE_READ;
851 	else if (!strcmp(res->data[0], "serializable"))
852 		ret = POOL_SERIALIZABLE;
853 	else
854 	{
855 		ereport(WARNING,
856 				(errmsg("error while getting transaction isolation, unknown transaction isolation level:%s", res->data[0])));
857 
858 		ret = POOL_UNKNOWN;
859 	}
860 
861 	free_select_result(res);
862 
863 	if (ret != POOL_UNKNOWN)
864 		session_context->transaction_isolation = ret;
865 
866 	return ret;
867 }
868 
869 static void
GetTranIsolationErrorCb(void * arg)870 GetTranIsolationErrorCb(void *arg)
871 {
872 	errcontext("while getting transaction isolation");
873 }
874 
875 
876 /*
877  * The command in progress has not succeeded yet.
878  */
879 void
pool_unset_command_success(void)880 pool_unset_command_success(void)
881 {
882 	ereport(DEBUG5,
883 			(errmsg("session context: clearing transaction isolation. DONE")));
884 	pool_get_session_context(false)->command_success = false;
885 }
886 
887 /*
888  * The command in progress has succeeded.
889  */
890 void
pool_set_command_success(void)891 pool_set_command_success(void)
892 {
893 	ereport(DEBUG5,
894 			(errmsg("session context: setting command success. DONE")));
895 
896 	pool_get_session_context(false)->command_success = true;
897 }
898 
899 /*
900  * Has the command in progress succeeded?
901  */
902 bool
pool_is_command_success(void)903 pool_is_command_success(void)
904 {
905 	return pool_get_session_context(false)->command_success;
906 }
907 
908 /*
909  * Copy send map
910  */
911 void
pool_copy_prep_where(bool * src,bool * dest)912 pool_copy_prep_where(bool *src, bool *dest)
913 {
914 	memcpy(dest, src, sizeof(bool) * MAX_NUM_BACKENDS);
915 }
916 #ifdef NOT_USED
917 /*
918  * Add to send map a PREPARED statement
919  */
920 void
pool_add_prep_where(char * name,bool * map)921 pool_add_prep_where(char *name, bool *map)
922 {
923 	int			i;
924 
925 	if (!session_context)
926 	{
927 		ereport(ERROR,
928 				(errmsg("pool_add_prep_where: session context is not initialized")));
929 		return;
930 	}
931 
932 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
933 	{
934 		if (*session_context->prep_where.name[i] == '\0')
935 		{
936 			strncpy(session_context->prep_where.name[i], name, POOL_MAX_PREPARED_NAME);
937 			pool_copy_prep_where(map, session_context->prep_where.where_to_send[i]);
938 			return;
939 		}
940 	}
941 	ereport(ERROR,
942 			(errmsg("pool_add_prep_where: no empty slot found")));
943 }
944 
945 /*
946  * Search send map by PREPARED statement name
947  */
948 bool *
pool_get_prep_where(char * name)949 pool_get_prep_where(char *name)
950 {
951 	int			i;
952 
953 	if (!session_context)
954 		ereport(ERROR,
955 				(errmsg("pool_get_prep_where: session context is not initialized")));
956 
957 
958 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
959 	{
960 		if (!strcmp(session_context->prep_where.name[i], name))
961 		{
962 			return session_context->prep_where.where_to_send[i];
963 		}
964 	}
965 	return NULL;
966 }
967 
968 /*
969  * Remove PREPARED statement by name
970  */
971 void
pool_delete_prep_where(char * name)972 pool_delete_prep_where(char *name)
973 {
974 	int			i;
975 
976 	if (!session_context)
977 		ereport(ERROR,
978 				(errmsg("pool_delete_prep_where: session context is not initialized")));
979 
980 	for (i = 0; i < POOL_MAX_PREPARED_STATEMENTS; i++)
981 	{
982 		if (!strcmp(session_context->prep_where.name[i], name))
983 		{
984 			memset(&session_context->prep_where.where_to_send[i], 0, sizeof(bool) * MAX_NUM_BACKENDS);
985 			*session_context->prep_where.name[i] = '\0';
986 			return;
987 		}
988 	}
989 }
990 #endif							/* NOT_USED */
991 /*
992  * Initialize sent message list
993  */
994 static void
init_sent_message_list(void)995 init_sent_message_list(void)
996 {
997 	POOL_SENT_MESSAGE_LIST *msglist;
998 
999 	msglist = &session_context->message_list;
1000 	msglist->size = 0;
1001 	msglist->capacity = INIT_LIST_SIZE;
1002 
1003 	MemoryContext oldContext = MemoryContextSwitchTo(session_context->memory_context);
1004 
1005 	msglist->sent_messages = palloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE);
1006 
1007 	MemoryContextSwitchTo(oldContext);
1008 }
1009 
1010 /*
1011  * Look for extended message list to check if given query context qc
1012  * is used. Returns true if it is not used.
1013  */
1014 bool
can_query_context_destroy(POOL_QUERY_CONTEXT * qc)1015 can_query_context_destroy(POOL_QUERY_CONTEXT * qc)
1016 {
1017 	int			i;
1018 	int			count = 0;
1019 	POOL_SENT_MESSAGE_LIST *msglist;
1020 	ListCell   *cell;
1021 	ListCell   *next;
1022 
1023 	msglist = &session_context->message_list;
1024 
1025 	for (i = 0; i < msglist->size; i++)
1026 	{
1027 		if (msglist->sent_messages[i]->query_context == qc)
1028 			count++;
1029 	}
1030 	if (count > 1)
1031 	{
1032 		ereport(DEBUG5,
1033 				(errmsg("checking if query context can be safely destroyed"),
1034 				 errdetail("query context %p is still used %d times in sent message list. query:\"%s\"",
1035 						   qc, count, qc->original_query)));
1036 		return false;
1037 	}
1038 
1039 	count = 0;
1040 
1041 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1042 	{
1043 		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1044 
1045 		if (message->query_context == qc)
1046 		{
1047 			count++;
1048 		}
1049 		next = lnext(cell);
1050 	}
1051 
1052 	if (count >= 1)
1053 	{
1054 		ereport(DEBUG5,
1055 				(errmsg("checking if query context can be safely destroyed"),
1056 				 errdetail("query context %p is still used %d times in pending message list. query:%s", qc, count, qc->original_query)));
1057 		return false;
1058 	}
1059 
1060 	return true;
1061 }
1062 
1063 /*
1064  * Initialize pending message list
1065  */
1066 void
pool_pending_messages_init(void)1067 pool_pending_messages_init(void)
1068 {
1069 	if (!session_context)
1070 		ereport(ERROR,
1071 				(errmsg("pool_pending_message_init: session context is not initialized")));
1072 
1073 	session_context->pending_messages = NIL;
1074 }
1075 
1076 /*
1077  * Destroy pending message list
1078  */
1079 void
pool_pending_messages_destroy(void)1080 pool_pending_messages_destroy(void)
1081 {
1082 	ListCell   *cell;
1083 	POOL_PENDING_MESSAGE *msg;
1084 
1085 	if (!session_context)
1086 		ereport(ERROR,
1087 				(errmsg("pool_pending_message_destory: session context is not initialized")));
1088 
1089 	foreach(cell, session_context->pending_messages)
1090 	{
1091 		msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1092 		pfree(msg->contents);
1093 	}
1094 	list_free(session_context->pending_messages);
1095 }
1096 
1097 /*
1098  * Create one message.
1099  */
1100 POOL_PENDING_MESSAGE *
pool_pending_message_create(char kind,int len,char * contents)1101 pool_pending_message_create(char kind, int len, char *contents)
1102 {
1103 	POOL_PENDING_MESSAGE *msg;
1104 	MemoryContext old_context;
1105 
1106 	if (!session_context)
1107 		ereport(ERROR,
1108 				(errmsg("pool_pending_message_create: session context is not initialized")));
1109 
1110 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1111 	msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1112 
1113 	switch (kind)
1114 	{
1115 		case 'P':
1116 			msg->type = POOL_PARSE;
1117 			break;
1118 
1119 		case 'B':
1120 			msg->type = POOL_BIND;
1121 			break;
1122 
1123 		case 'E':
1124 			msg->type = POOL_EXECUTE;
1125 			break;
1126 
1127 		case 'D':
1128 			msg->type = POOL_DESCRIBE;
1129 			break;
1130 
1131 		case 'C':
1132 			msg->type = POOL_CLOSE;
1133 			break;
1134 
1135 		case 'S':
1136 			msg->type = POOL_SYNC;
1137 			break;
1138 
1139 		default:
1140 			ereport(ERROR,
1141 					(errmsg("pool_pending_message_create: unknow kind: %c", kind)));
1142 			break;
1143 	}
1144 
1145 	if (len > 0)
1146 	{
1147 		msg->contents = palloc(len);
1148 		memcpy(msg->contents, contents, len);
1149 	}
1150 	else
1151 		msg->contents = NULL;
1152 
1153 	msg->contents_len = len;
1154 	msg->query[0] = '\0';
1155 	msg->statement[0] = '\0';
1156 	msg->portal[0] = '\0';
1157 	msg->is_rows_returned = false;
1158 	msg->not_forward_to_frontend = false;
1159 	memset(msg->node_ids, false, sizeof(msg->node_ids));
1160 
1161 	MemoryContextSwitchTo(old_context);
1162 
1163 	return msg;
1164 }
1165 
1166 /*
1167  * Set node_ids field of message which indicates which backend nodes the
1168  * message was sent.
1169  */
1170 void
pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1171 pool_pending_message_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1172 {
1173 	memcpy(message->node_ids, query_context->where_to_send, sizeof(message->node_ids));
1174 
1175 	message->query_context = query_context;
1176 
1177 	if (is_select_query(query_context->parse_tree, query_context->original_query))
1178 	{
1179 		message->is_rows_returned = true;
1180 	}
1181 }
1182 
1183 /*
1184  * Set where_to_send field in query_context from node_ids field of message
1185  * which indicates which backend nodes the message was sent.
1186  */
1187 void
pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1188 pool_pending_message_query_context_dest_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1189 {
1190 	int			i;
1191 
1192 	POOL_SESSION_CONTEXT *s = pool_get_session_context(false);
1193 
1194 	/* Save where_to_send map */
1195 	memcpy(s->where_to_send_save, query_context->where_to_send, sizeof(s->where_to_send_save));
1196 	s->need_to_restore_where_to_send = true;
1197 
1198 	/* Rewrite where_to_send map */
1199 	memset(query_context->where_to_send, 0, sizeof(query_context->where_to_send));
1200 
1201 	for (i = 0; i < MAX_NUM_BACKENDS; i++)
1202 	{
1203 		if (message->node_ids[i])
1204 		{
1205 			query_context->where_to_send[i] = 1;
1206 		}
1207 	}
1208 }
1209 
1210 /*
1211  * Set query field of message.
1212  */
1213 void
pool_pending_message_query_set(POOL_PENDING_MESSAGE * message,POOL_QUERY_CONTEXT * query_context)1214 pool_pending_message_query_set(POOL_PENDING_MESSAGE * message, POOL_QUERY_CONTEXT * query_context)
1215 {
1216 	StrNCpy(message->query, query_context->original_query, sizeof(message->query));
1217 }
1218 
1219 /*
1220  * Add one message to the tail of the list
1221  */
1222 void
pool_pending_message_add(POOL_PENDING_MESSAGE * message)1223 pool_pending_message_add(POOL_PENDING_MESSAGE * message)
1224 {
1225 	POOL_PENDING_MESSAGE *msg;
1226 	MemoryContext old_context;
1227 
1228 	if (!session_context)
1229 		ereport(ERROR,
1230 				(errmsg("pool_pending_message_add: session context is not initialized")));
1231 
1232 	switch (message->type)
1233 	{
1234 		case POOL_PARSE:
1235 			StrNCpy(message->statement, message->contents, sizeof(message->statement));
1236 			StrNCpy(message->query, message->contents + strlen(message->contents) + 1, sizeof(message->query));
1237 			break;
1238 
1239 		case POOL_BIND:
1240 			StrNCpy(message->portal, message->contents, sizeof(message->portal));
1241 			StrNCpy(message->statement, message->contents + strlen(message->contents) + 1, sizeof(message->statement));
1242 			break;
1243 
1244 		case POOL_EXECUTE:
1245 			StrNCpy(message->portal, message->contents, sizeof(message->portal));
1246 			break;
1247 
1248 		case POOL_CLOSE:
1249 		case POOL_DESCRIBE:
1250 			if (*message->contents == 'S')
1251 				StrNCpy(message->statement, message->contents + 1, sizeof(message->statement));
1252 			else
1253 				StrNCpy(message->portal, message->contents + 1, sizeof(message->portal));
1254 			break;
1255 
1256 		case POOL_SYNC:
1257 			break;
1258 
1259 		default:
1260 			ereport(ERROR,
1261 					(errmsg("pool_pending_message_add: unknown message type:%d", message->type)));
1262 			return;
1263 			break;
1264 	}
1265 
1266 	if (message->type != POOL_SYNC)
1267 		ereport(Elevel,
1268 				(errmsg("pool_pending_message_add: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1269 						pool_pending_message_type_to_string(message->type),
1270 						message->contents_len, message->query, message->statement, message->portal,
1271 						message->node_ids[0], message->node_ids[1])));
1272 	else
1273 		ereport(Elevel,
1274 				(errmsg("pool_pending_message_add: message type: sync")));
1275 
1276 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1277 	msg = copy_pending_message(message);
1278 	session_context->pending_messages = lappend(session_context->pending_messages, msg);
1279 	MemoryContextSwitchTo(old_context);
1280 }
1281 
1282 /*
1283  * Return the message from the head of the list.  If the list is not empty, a
1284  * copy of the message is returned. If the list is empty, returns NULL.
1285  */
1286 POOL_PENDING_MESSAGE *
pool_pending_message_head_message(void)1287 pool_pending_message_head_message(void)
1288 {
1289 	ListCell   *cell;
1290 	POOL_PENDING_MESSAGE *message;
1291 	POOL_PENDING_MESSAGE *m;
1292 	MemoryContext old_context;
1293 
1294 	if (!session_context)
1295 		ereport(ERROR,
1296 				(errmsg("pool_pending_message_head_message: session context is not initialized")));
1297 
1298 	if (list_length(session_context->pending_messages) == 0)
1299 	{
1300 		return NULL;
1301 	}
1302 
1303 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1304 
1305 	cell = list_head(session_context->pending_messages);
1306 	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1307 	message = copy_pending_message(m);
1308 	ereport(Elevel,
1309 			(errmsg("pool_pending_message_head_message: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1310 					pool_pending_message_type_to_string(message->type),
1311 					message->contents_len, message->query, message->statement, message->portal,
1312 					message->node_ids[0], message->node_ids[1])));
1313 
1314 	MemoryContextSwitchTo(old_context);
1315 	return message;
1316 }
1317 
1318 
1319 /*
1320  * Remove one message from the head of the list.  If the list is not empty, a
1321  * copy of the message is returned and the message is removed the message
1322  * list. If the list is empty, returns NULL.
1323  */
1324 POOL_PENDING_MESSAGE *
pool_pending_message_pull_out(void)1325 pool_pending_message_pull_out(void)
1326 {
1327 	ListCell   *cell;
1328 	POOL_PENDING_MESSAGE *message;
1329 	POOL_PENDING_MESSAGE *m;
1330 	MemoryContext old_context;
1331 
1332 	if (!session_context)
1333 		ereport(ERROR,
1334 				(errmsg("pool_pending_message_pull_out: session context is not initialized")));
1335 
1336 	if (list_length(session_context->pending_messages) == 0)
1337 	{
1338 		return NULL;
1339 	}
1340 
1341 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1342 
1343 	cell = list_head(session_context->pending_messages);
1344 	m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1345 	message = copy_pending_message(m);
1346 	ereport(Elevel,
1347 			(errmsg("pool_pending_message_pull_out: message type:%s message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1348 					pool_pending_message_type_to_string(message->type),
1349 					message->contents_len, message->query, message->statement, message->portal,
1350 					message->node_ids[0], message->node_ids[1])));
1351 
1352 	pool_pending_message_free_pending_message(m);
1353 	session_context->pending_messages =
1354 		list_delete_cell(session_context->pending_messages, cell, NULL);
1355 
1356 	MemoryContextSwitchTo(old_context);
1357 	return message;
1358 }
1359 
1360 /*
1361  * Try to find the first message specified by the message type in the message
1362  * list. If found, a copy of the message is returned. If not, returns NULL.
1363  */
1364 POOL_PENDING_MESSAGE *
pool_pending_message_get(POOL_MESSAGE_TYPE type)1365 pool_pending_message_get(POOL_MESSAGE_TYPE type)
1366 {
1367 	ListCell   *cell;
1368 	ListCell   *next;
1369 	POOL_PENDING_MESSAGE *msg;
1370 	MemoryContext old_context;
1371 
1372 	if (!session_context)
1373 		ereport(ERROR,
1374 				(errmsg("pool_pending_message_remove: session context is not initialized")));
1375 
1376 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1377 
1378 	msg = NULL;
1379 
1380 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1381 	{
1382 		POOL_PENDING_MESSAGE *m = (POOL_PENDING_MESSAGE *) lfirst(cell);
1383 
1384 		next = lnext(cell);
1385 
1386 		if (m->type == type)
1387 		{
1388 			msg = copy_pending_message(m);
1389 			break;
1390 		}
1391 	}
1392 
1393 	MemoryContextSwitchTo(old_context);
1394 	return msg;
1395 }
1396 
1397 /*
1398  * Get message specification (either statement ('S') or portal ('P')) from a
1399  * close message.
1400  */
1401 char
pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)1402 pool_get_close_message_spec(POOL_PENDING_MESSAGE * msg)
1403 {
1404 	return *msg->contents;
1405 }
1406 
1407 /*
1408  * Get statement or portal name from close message.
1409  * The returned pointer is within "msg".
1410  */
1411 char *
pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)1412 pool_get_close_message_name(POOL_PENDING_MESSAGE * msg)
1413 {
1414 	return (msg->contents) + 1;
1415 }
1416 
1417 /*
1418  * Perform deep copy of POOL_PENDING_MESSAGE object in the current memory
1419  * context except the query context.
1420  */
copy_pending_message(POOL_PENDING_MESSAGE * message)1421 static POOL_PENDING_MESSAGE * copy_pending_message(POOL_PENDING_MESSAGE * message)
1422 {
1423 	POOL_PENDING_MESSAGE *msg;
1424 
1425 	msg = palloc(sizeof(POOL_PENDING_MESSAGE));
1426 	memcpy(msg, message, sizeof(POOL_PENDING_MESSAGE));
1427 	msg->contents = palloc(msg->contents_len);
1428 	memcpy(msg->contents, message->contents, msg->contents_len);
1429 
1430 	return msg;
1431 }
1432 
1433 /*
1434  * Free POOL_PENDING_MESSAGE object in the current memory
1435  * context except the query context.
1436  */
1437 void
pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)1438 pool_pending_message_free_pending_message(POOL_PENDING_MESSAGE * message)
1439 {
1440 	if (message == NULL)
1441 		return;
1442 
1443 	if (message->contents)
1444 		pfree(message->contents);
1445 
1446 	pfree(message);
1447 }
1448 
1449 /*
1450  * Reset previous message.
1451  */
1452 void
pool_pending_message_reset_previous_message(void)1453 pool_pending_message_reset_previous_message(void)
1454 {
1455 	if (!session_context)
1456 	{
1457 		ereport(ERROR,
1458 				(errmsg("pool_pending_message_reset_previous_message: session context is not initialized")));
1459 		return;
1460 	}
1461 	session_context->previous_message_exists = false;
1462 }
1463 
1464 /*
1465  * Set previous message.
1466  */
1467 void
pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)1468 pool_pending_message_set_previous_message(POOL_PENDING_MESSAGE * message)
1469 {
1470 	if (!session_context)
1471 	{
1472 		ereport(ERROR,
1473 				(errmsg("pool_pending_message_set_previous_message: session context is not initialized")));
1474 		return;
1475 	}
1476 	session_context->previous_message_exists = true;
1477 	memcpy(&session_context->previous_message, message, sizeof(POOL_PENDING_MESSAGE));
1478 }
1479 
1480 /*
1481  * Get previous message. This actually returns the address of memory. Do not
1482  * try to free using pool_pending_message_free_pending_message().
1483  */
1484 POOL_PENDING_MESSAGE *
pool_pending_message_get_previous_message(void)1485 pool_pending_message_get_previous_message(void)
1486 {
1487 	if (!session_context)
1488 	{
1489 		ereport(ERROR,
1490 				(errmsg("pool_pending_message_get_previous_message: session context is not initialized")));
1491 		return NULL;
1492 	}
1493 	if (session_context->previous_message_exists == false)
1494 		return NULL;
1495 
1496 	return &session_context->previous_message;
1497 }
1498 
1499 /*
1500  * Return true if there's any pending message.
1501  */
1502 bool
pool_pending_message_exists(void)1503 pool_pending_message_exists(void)
1504 {
1505 	return list_length(session_context->pending_messages) > 0;
1506 }
1507 
1508 /*
1509  * Convert enum pending message type to string. The returned string must not
1510  * be modified or freed.
1511  */
1512 const char *
pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)1513 pool_pending_message_type_to_string(POOL_MESSAGE_TYPE type)
1514 {
1515 	static const char *pending_msg_string[] = {"Parse", "Bind", "Execute",
1516 	"Descripbe", "Close", "Sync"};
1517 
1518 	if (type < 0 || type > POOL_SYNC)
1519 		return "unknown type";
1520 	return pending_msg_string[type];
1521 }
1522 
1523 /*
1524  * Check consistency the message type and backend message kind.
1525  * This function is intended to be used for debugging.
1526  */
1527 void
pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type,char kind)1528 pool_check_pending_message_and_reply(POOL_MESSAGE_TYPE type, char kind)
1529 {
1530 	/*
1531 	 * Backend response message sorted by POOL_MESSAGE_TYPE
1532 	 */
1533 	static char backend_response_kind[] = {
1534 		'1',					/* POOL_PARSE, parse complete */
1535 		'2',					/* POOL_BIND, bind complete */
1536 		'*',					/* POOL_EXECUTE, skip checking */
1537 		'*',					/* POOL_DESCRIBE, skip checking */
1538 		'3',					/* POOL_CLOSE, close complete */
1539 		'Z'						/* POOL_SYNC, ready for query */
1540 	};
1541 
1542 	if (type < POOL_PARSE || type > POOL_SYNC)
1543 	{
1544 		ereport(DEBUG5,
1545 				(errmsg("pool_check_pending_message_and_reply: type out of range: %d", type)));
1546 		return;
1547 	}
1548 
1549 	if (backend_response_kind[type] == '*')
1550 	{
1551 		return;
1552 	}
1553 
1554 	if (backend_response_kind[type] != kind)
1555 	{
1556 		ereport(DEBUG5,
1557 				(errmsg("pool_check_pending_message_and_reply: type: %s but is kind: %c",
1558 						pool_pending_message_type_to_string(type), kind)));
1559 	}
1560 	return;
1561 }
1562 
1563 /*
1564  * Find the latest pending message having specified query context.  The
1565  * returned message is a pointer to the message list. So do not free it using
1566  * pool_pending_message_free_pending_message.
1567  */
1568 POOL_PENDING_MESSAGE *
pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)1569 pool_pending_message_find_lastest_by_query_context(POOL_QUERY_CONTEXT * qc)
1570 {
1571 	List	   *msgs;
1572 	POOL_PENDING_MESSAGE *msg;
1573 	int			len;
1574 	ListCell   *cell;
1575 
1576 	if (!session_context)
1577 	{
1578 		ereport(ERROR,
1579 				(errmsg("pool_pending_message_find_lastest_by_query_context: session context is not initialized")));
1580 	}
1581 
1582 	if (!session_context->pending_messages)
1583 		return NULL;
1584 
1585 	msgs = session_context->pending_messages;
1586 
1587 	len = list_length(msgs);
1588 	if (len <= 0)
1589 		return NULL;
1590 
1591 	ereport(DEBUG5,
1592 			(errmsg("pool_pending_message_find_lastest_by_query_context: num messages: %d",
1593 					len)));
1594 
1595 	while (len--)
1596 	{
1597 		cell = list_nth_cell(msgs, len);
1598 		if (cell)
1599 		{
1600 			msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1601 			if (msg->query_context == qc)
1602 			{
1603 				ereport(DEBUG5,
1604 						(errmsg("pool_pending_message_find_lastest_by_query_context: msg found. type: %s",
1605 								pool_pending_message_type_to_string(msg->type))));
1606 				return msg;
1607 			}
1608 			ereport(DEBUG5,
1609 					(errmsg("pool_pending_message_find_lastest_by_query_context: type: %s",
1610 							pool_pending_message_type_to_string(msg->type))));
1611 		}
1612 	}
1613 	return NULL;
1614 }
1615 
1616 /*
1617  * Get target backend id from pending message assuming that the destination for
1618  * the pending message is one of primary or standby node.
1619  */
1620 int
pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)1621 pool_pending_message_get_target_backend_id(POOL_PENDING_MESSAGE * msg)
1622 {
1623 	int			backend_id = -1;
1624 	int			i;
1625 
1626 	for (i = 0; i < MAX_NUM_BACKENDS; i++)
1627 	{
1628 		if (msg->node_ids[i])
1629 		{
1630 			backend_id = i;
1631 			break;
1632 		}
1633 	}
1634 
1635 	if (backend_id == -1)
1636 		ereport(ERROR,
1637 				(errmsg("pool_pending_message_get_target_backend_id: no target backend id found")));
1638 
1639 	return backend_id;
1640 }
1641 
1642 /*
1643  * Get number of pending message list entries of which target backend is same as specified one.
1644  */
1645 int
pool_pending_message_get_message_num_by_backend_id(int backend_id)1646 pool_pending_message_get_message_num_by_backend_id(int backend_id)
1647 {
1648 	ListCell   *cell;
1649 	ListCell   *next;
1650 	int        cnt = 0;
1651 	int        i;
1652 
1653 	if (!session_context)
1654 	{
1655 		ereport(ERROR,
1656 				(errmsg("pool_pending_message_get_message_num_by_backend_id: session context is not initialized")));
1657 		return 0;
1658 	}
1659 
1660 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1661 	{
1662 		POOL_PENDING_MESSAGE *msg = (POOL_PENDING_MESSAGE *) lfirst(cell);
1663 
1664 		for (i = 0; i < MAX_NUM_BACKENDS; i++)
1665 		{
1666 			if (msg->node_ids[i] && i == backend_id)
1667 			{
1668 				cnt++;
1669 				break;
1670 			}
1671 		}
1672 
1673 		next = lnext(cell);
1674 	}
1675 	return cnt;
1676 }
1677 
1678 /*
1679  * Dump whole pending message list
1680  */
1681 void
dump_pending_message(void)1682 dump_pending_message(void)
1683 {
1684 	ListCell   *cell;
1685 	ListCell   *next;
1686 
1687 	if (!session_context)
1688 	{
1689 		ereport(ERROR,
1690 				(errmsg("dump_pending_message: session context is not initialized")));
1691 		return;
1692 	}
1693 
1694 	ereport(DEBUG5,
1695 			(errmsg("start dumping pending message list")));
1696 
1697 	for (cell = list_head(session_context->pending_messages); cell; cell = next)
1698 	{
1699 		POOL_PENDING_MESSAGE *message = (POOL_PENDING_MESSAGE *) lfirst(cell);
1700 
1701 		ereport(DEBUG5,
1702 				(errmsg("pool_pending_message_dump: message type:%d message len:%d query:%s statement:%s portal:%s node_ids[0]:%d node_ids[1]:%d",
1703 						message->type, message->contents_len, message->query, message->statement, message->portal,
1704 						message->node_ids[0], message->node_ids[1])));
1705 
1706 		next = lnext(cell);
1707 	}
1708 
1709 	ereport(DEBUG5,
1710 			(errmsg("end dumping pending message list")));
1711 }
1712 
1713 /*
1714  * Set protocol major version number
1715  */
1716 void
pool_set_major_version(int major)1717 pool_set_major_version(int major)
1718 {
1719 	if (session_context)
1720 	{
1721 		session_context->major = major;
1722 	}
1723 }
1724 
1725 /*
1726  * Get protocol major version number
1727  */
1728 int
pool_get_major_version(void)1729 pool_get_major_version(void)
1730 {
1731 	if (session_context)
1732 	{
1733 		return session_context->major;
1734 	}
1735 	return PROTO_MAJOR_V3;
1736 }
1737 
1738 /*
1739  * Set protocol minor version number
1740  */
1741 void
pool_set_minor_version(int minor)1742 pool_set_minor_version(int minor)
1743 {
1744 	if (session_context)
1745 	{
1746 		session_context->minor = minor;
1747 	}
1748 }
1749 
1750 /*
1751  * Get protocol minor version number
1752  */
1753 int
pool_get_minor_version(void)1754 pool_get_minor_version(void)
1755 {
1756 	if (session_context)
1757 	{
1758 		return session_context->minor;
1759 	}
1760 	return 0;
1761 }
1762 
1763 /*
1764  * Is suspend_reading_from_frontend flag set?
1765  */
1766 bool
pool_is_suspend_reading_from_frontend(void)1767 pool_is_suspend_reading_from_frontend(void)
1768 {
1769 	return session_context->suspend_reading_from_frontend;
1770 }
1771 
1772 /*
1773  * Set suspend_reading_from_frontend flag.
1774  */
1775 void
pool_set_suspend_reading_from_frontend(void)1776 pool_set_suspend_reading_from_frontend(void)
1777 {
1778 	session_context->suspend_reading_from_frontend = true;
1779 }
1780 
1781 /*
1782  * Unset suspend_reading_from_frontend flag.
1783  */
1784 void
pool_unset_suspend_reading_from_frontend(void)1785 pool_unset_suspend_reading_from_frontend(void)
1786 {
1787 	session_context->suspend_reading_from_frontend = false;
1788 }
1789 
1790 #ifdef NOT_USED
1791 /*
1792  * Set preferred "master" node id.
1793  * Only used for SimpleForwardToFrontend.
1794  */
1795 void
pool_set_preferred_master_node_id(int node_id)1796 pool_set_preferred_master_node_id(int node_id)
1797 {
1798 	session_context->preferred_master_node_id = node_id;
1799 }
1800 
1801 /*
1802  * Return preferred "master" node id.
1803  * Only used for SimpleForwardToFrontend.
1804  */
1805 int
pool_get_preferred_master_node_id(void)1806 pool_get_preferred_master_node_id(void)
1807 {
1808 	return session_context->preferred_master_node_id;
1809 }
1810 
1811 /*
1812  * Reset preferred "master" node id.
1813  * Only used for SimpleForwardToFrontend.
1814  */
1815 void
pool_reset_preferred_master_node_id(void)1816 pool_reset_preferred_master_node_id(void)
1817 {
1818 	session_context->preferred_master_node_id = -1;
1819 }
1820 #endif
1821 
1822 /*-----------------------------------------------------------------------
1823  * Temporary table list management modules.
1824  *-----------------------------------------------------------------------
1825  */
1826 
1827 /*
1828  * Initialize temp table list
1829  */
1830 void
pool_temp_tables_init(void)1831 pool_temp_tables_init(void)
1832 {
1833 	if (!session_context)
1834 		ereport(ERROR,
1835 				(errmsg("pool_temp_tables_init: session context is not initialized")));
1836 
1837 	session_context->temp_tables = NIL;
1838 }
1839 
1840 /*
1841  * Destroy temp table list
1842  */
1843 void
pool_temp_tables_destroy(void)1844 pool_temp_tables_destroy(void)
1845 {
1846 	if (!session_context)
1847 		ereport(ERROR,
1848 				(errmsg("pool_temp_tables_destory: session context is not initialized")));
1849 
1850 	list_free(session_context->temp_tables);
1851 }
1852 
1853 /*
1854  * Add a temp table to the tail of the list.
1855  * If the table already exists, just replace state.
1856  */
1857 void
pool_temp_tables_add(char * tablename,POOL_TEMP_TABLE_STATE state)1858 pool_temp_tables_add(char * tablename, POOL_TEMP_TABLE_STATE state)
1859 {
1860 	MemoryContext old_context;
1861 	POOL_TEMP_TABLE * table;
1862 
1863 	if (!session_context)
1864 		ereport(ERROR,
1865 				(errmsg("pool_temp_tables_add: session context is not initialized")));
1866 
1867 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1868 
1869 	table = pool_temp_tables_find(tablename);
1870 	if (table)
1871 	{
1872 		/* Table already exists. Just replace state. */
1873 		table->state = state;
1874 	}
1875 	else
1876 	{
1877 		table = palloc(sizeof(POOL_TEMP_TABLE));
1878 		StrNCpy(table->tablename, tablename, sizeof(table->tablename));
1879 		table->state = state;
1880 		session_context->temp_tables = lappend(session_context->temp_tables, table);
1881 	}
1882 
1883 	MemoryContextSwitchTo(old_context);
1884 }
1885 
1886 /*
1887  * Returns pointer to the table cell if specified tablename is in the temp table list.
1888  */
1889 
1890 POOL_TEMP_TABLE *
pool_temp_tables_find(char * tablename)1891 pool_temp_tables_find(char * tablename)
1892 {
1893 	ListCell   *cell;
1894 
1895 	if (!session_context)
1896 		ereport(ERROR,
1897 				(errmsg("pool_temp_tables_find: session context is not initialized")));
1898 
1899 	foreach(cell, session_context->temp_tables)
1900 	{
1901 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1902 		if (strcmp(tablename, table->tablename) == 0)
1903 			return table;
1904 	}
1905 	return NULL;
1906 }
1907 
1908 /*
1909  * If requested state or table state is TEMP_TABLE_DROP_COMMITTED, removes the
1910  * temp table entry from the list.  Otherwise just set the requested state to
1911  * the table state.
1912  */
1913 void
pool_temp_tables_delete(char * tablename,POOL_TEMP_TABLE_STATE state)1914 pool_temp_tables_delete(char * tablename, POOL_TEMP_TABLE_STATE state)
1915 {
1916 	POOL_TEMP_TABLE * table;
1917 	MemoryContext old_context;
1918 
1919 	if (!session_context)
1920 		ereport(ERROR,
1921 				(errmsg("pool_temp_tables_delete: session context is not initialized")));
1922 
1923 	ereport(LOG,
1924 			(errmsg("pool_temp_tables_delete: table: %s state: %d", tablename, state)));
1925 
1926 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1927 
1928 	table = pool_temp_tables_find(tablename);
1929 
1930 	if (table)
1931 	{
1932 		if (table->state == TEMP_TABLE_DROP_COMMITTED)
1933 		{
1934 			ereport(DEBUG1,
1935 					(errmsg("pool_temp_tables_delete: remove %s. previous state: %d requested state: %d",
1936 							table->tablename, table->state, state)));
1937 
1938 			session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1939 		}
1940 		else
1941 		{
1942 			ereport(DEBUG1,
1943 					(errmsg("pool_temp_tables_delete: set state %s. previous state: %d requested state: %d",
1944 							table->tablename, table->state, state)));
1945 			table->state = state;
1946 		}
1947 	}
1948 
1949 	MemoryContextSwitchTo(old_context);
1950 }
1951 
1952 /*
1953  * Commits creating entries. Also remove dropping entries. This is supposed to
1954  * be called when an explicit transaction commits.
1955  */
1956 void
pool_temp_tables_commit_pending(void)1957 pool_temp_tables_commit_pending(void)
1958 {
1959 	ListCell   *cell;
1960 	MemoryContext old_context;
1961 
1962 	if (!session_context)
1963 		ereport(ERROR,
1964 				(errmsg("pool_temp_tables_commit_pending: session context is not initialized")));
1965 
1966 	old_context = MemoryContextSwitchTo(session_context->memory_context);
1967 
1968 	pool_temp_tables_dump();
1969 
1970 Retry:
1971 	foreach(cell, session_context->temp_tables)
1972 	{
1973 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
1974 
1975 		if (table->state == TEMP_TABLE_CREATING)
1976 		{
1977 			ereport(DEBUG1,
1978 					(errmsg("pool_temp_tables_commit_pending: commit: %s", table->tablename)));
1979 
1980 			table->state = TEMP_TABLE_CREATE_COMMITTED;
1981 		}
1982 		else if (table->state == TEMP_TABLE_DROPPING)
1983 		{
1984 			ereport(DEBUG1,
1985 					(errmsg("pool_temp_tables_commit_pending: remove: %s", table->tablename)));
1986 			session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
1987 			pool_temp_tables_dump();
1988 			goto Retry;
1989 		}
1990 	}
1991 
1992 	MemoryContextSwitchTo(old_context);
1993 }
1994 
1995 /*
1996  * Removes all ongoing creating or dropping entries. This is supposed to be
1997  * called when an explicit transaction aborts.
1998  */
1999 void
pool_temp_tables_remove_pending(void)2000 pool_temp_tables_remove_pending(void)
2001 {
2002 	ListCell   *cell;
2003 	MemoryContext old_context;
2004 
2005 	if (!session_context)
2006 		ereport(ERROR,
2007 				(errmsg("pool_temp_tables_remove_pending: session context is not initialized")));
2008 
2009 	old_context = MemoryContextSwitchTo(session_context->memory_context);
2010 
2011 	pool_temp_tables_dump();
2012 
2013 Retry:
2014 	foreach(cell, session_context->temp_tables)
2015 	{
2016 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2017 
2018 		if (table->state == TEMP_TABLE_CREATING || table->state == TEMP_TABLE_DROPPING)
2019 		{
2020 			ereport(DEBUG1,
2021 					(errmsg("pool_temp_tables_remove_pending: remove: %s", table->tablename)));
2022 
2023 			session_context->temp_tables = list_delete_ptr(session_context->temp_tables, table);
2024 			pool_temp_tables_dump();
2025 			goto Retry;
2026 		}
2027 	}
2028 
2029 	MemoryContextSwitchTo(old_context);
2030 }
2031 
2032 void
pool_temp_tables_dump(void)2033 pool_temp_tables_dump(void)
2034 {
2035 #ifdef TEMP_TABLES_DEBUG
2036 	ListCell   *cell;
2037 
2038 	if (!session_context)
2039 		ereport(ERROR,
2040 				(errmsg("pool_temp_tables_dump: session context is not initialized")));
2041 
2042 	foreach(cell, session_context->temp_tables)
2043 	{
2044 		POOL_TEMP_TABLE * table = (POOL_TEMP_TABLE *)lfirst(cell);
2045 		ereport(DEBUG1,
2046 				(errmsg("pool_temp_tables_dump: table %s state: %d",
2047 						table->tablename, table->state)));
2048 	}
2049 #endif
2050 }
2051