1 /* -*-pgsql-c-*- */
2 /*
3  * $Header$
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2021	PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  *---------------------------------------------------------------------
22  * This file contains modules which process the "Command Complete" and "Empty
23  * query response" message sent from backend. The main function is
24  * "CommandComplete".
25  *---------------------------------------------------------------------
26  */
27 #include <string.h>
28 #include <arpa/inet.h>
29 
30 #include "pool.h"
31 #include "protocol/pool_proto_modules.h"
32 #include "protocol/pool_process_query.h"
33 #include "parser/pg_config_manual.h"
34 #include "parser/pool_string.h"
35 #include "pool_config.h"
36 #include "context/pool_session_context.h"
37 #include "context/pool_query_context.h"
38 #include "utils/elog.h"
39 #include "utils/palloc.h"
40 #include "utils/memutils.h"
41 #include "utils/pool_stream.h"
42 
43 static int	extract_ntuples(char *message);
44 static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete);
45 static int	foward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen);
46 static int	foward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen);
47 static int	foward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen);
48 
49 POOL_STATUS
CommandComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool command_complete)50 CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete)
51 {
52 	int			len,
53 				len1;
54 	char	   *p,
55 			   *p1;
56 	int			i;
57 	POOL_SESSION_CONTEXT *session_context;
58 	POOL_CONNECTION *con;
59 
60 	p1 = NULL;
61 	len1 = 0;
62 
63 	/* Get session context */
64 	session_context = pool_get_session_context(false);
65 
66 	/*
67 	 * Handle misc process which is neccessary when query context exists.
68 	 */
69 	if (session_context->query_context != NULL && (!SL_MODE || (SL_MODE && !pool_is_doing_extended_query_message())))
70 		handle_query_context(backend);
71 
72 	/*
73 	 * If operated in streaming replication mode and doing an extended query,
74 	 * read backend message according to the query context. Also we set the
75 	 * transaction state at this point.
76 	 */
77 	if (SL_MODE && pool_is_doing_extended_query_message())
78 	{
79 		p1 = NULL;
80 
81 		for (i = 0; i < NUM_BACKENDS; i++)
82 		{
83 			if (VALID_BACKEND(i))
84 			{
85 				con = CONNECTION(backend, i);
86 
87 				if (pool_read(con, &len, sizeof(len)) < 0)
88 				{
89 					if (p1 != NULL)
90 						pfree(p1);
91 					return POOL_END;
92 				}
93 
94 				len = ntohl(len);
95 				len -= 4;
96 				len1 = len;
97 
98 				p = pool_read2(con, len);
99 				if (p == NULL)
100 				{
101 					if (p1 != NULL)
102 						pfree(p1);
103 					return POOL_END;
104 				}
105 				if (p1 != NULL)
106 					pfree(p1);
107 				p1 = palloc(len);
108 				memcpy(p1, p, len);
109 
110 				if (session_context->query_context &&
111 					session_context->query_context->parse_tree &&
112 					is_start_transaction_query(session_context->query_context->parse_tree))
113 					TSTATE(backend, i) = 'T';	/* we are inside a transaction */
114 				{
115 					ereport(DEBUG1,
116 							(errmsg("processing command complete"),
117 							 errdetail("set transaction state to T")));
118 				}
119 			}
120 		}
121 	}
122 
123 	/*
124 	 * Otherwise just read from main node.
125 	 */
126 	else
127 	{
128 		con = MAIN(backend);
129 
130 		if (pool_read(con, &len, sizeof(len)) < 0)
131 			return POOL_END;
132 
133 		len = ntohl(len);
134 		len -= 4;
135 		len1 = len;
136 
137 		p = pool_read2(con, len);
138 		if (p == NULL)
139 			return POOL_END;
140 		p1 = palloc(len);
141 		memcpy(p1, p, len);
142 	}
143 
144 	/*
145 	 * If operated in streaming replication mode and extended query mode, just
146 	 * forward the packet to frontend and we are done. Otherwise, we need to
147 	 * do mismatch tuples process (forwarding to frontend is done in
148 	 * handle_mismatch_tuples().
149 	 */
150 	if (SL_MODE && pool_is_doing_extended_query_message())
151 	{
152 		int			status;
153 
154 		if (p1 == NULL)
155 		{
156 			elog(WARNING, "CommandComplete: expected p1 is not NULL");
157 			return POOL_END;
158 		}
159 
160 		if (command_complete)
161 			status = foward_command_complete(frontend, p1, len1);
162 		else
163 			status = foward_empty_query(frontend, p1, len1);
164 
165 		if (status < 0)
166 			return POOL_END;
167 	}
168 	else
169 	{
170 		if (handle_mismatch_tuples(frontend, backend, p1, len1, command_complete) != POOL_CONTINUE)
171 			return POOL_END;
172 	}
173 
174 	/* Save the received result to buffer for each kind */
175 	if (pool_config->memory_cache_enabled)
176 	{
177 		if (pool_is_cache_safe() && !pool_is_cache_exceeded())
178 		{
179 			memqcache_register('C', frontend, p1, len1);
180 		}
181 
182 		/*
183 		 * If we are doing extended query, register the SELECT result to temp
184 		 * query cache now.
185 		 */
186 		if (pool_is_doing_extended_query_message())
187 		{
188 			char	   *query;
189 			Node	   *node;
190 			char		state;
191 
192 			if (session_context->query_context == NULL)
193 			{
194 				elog(WARNING, "expected query_contex is NULL");
195 				return POOL_END;
196 			}
197 			query = session_context->query_context->query_w_hex;
198 			node = pool_get_parse_tree();
199 			state = TSTATE(backend, MAIN_NODE_ID);
200 			pool_handle_query_cache(backend, query, node, state);
201 		}
202 	}
203 
204 	pfree(p1);
205 
206 	if (pool_is_doing_extended_query_message() && pool_is_query_in_progress())
207 	{
208 		pool_set_query_state(session_context->query_context, POOL_EXECUTE_COMPLETE);
209 	}
210 
211 	/*
212 	 * If we are in streaming replication mode and we are doing extended
213 	 * query, reset query in progress flag and prevoius pending message.
214 	 */
215 	if (SL_MODE && pool_is_doing_extended_query_message())
216 	{
217 		pool_at_command_success(frontend, backend);
218 		pool_unset_query_in_progress();
219 		pool_pending_message_reset_previous_message();
220 
221 		if (session_context->query_context == NULL)
222 		{
223 			elog(WARNING, "At command complete there's no query context");
224 		}
225 		else
226 		{
227 			/*
228 			 * Destroy query context if it is not referenced from sent
229 			 * messages and pending messages except bound to named statements
230 			 * or portals.  Named statements and portals should remain until
231 			 * they are explicitly closed.
232 			 */
233 			if (can_query_context_destroy(session_context->query_context))
234 
235 			{
236 				POOL_SENT_MESSAGE * msg = pool_get_sent_message_by_query_context(session_context->query_context);
237 
238 				if (!msg || (msg && *msg->name == '\0'))
239 				{
240 					pool_zap_query_context_in_sent_messages(session_context->query_context);
241 					pool_query_context_destroy(session_context->query_context);
242 				}
243 			}
244 		}
245 	}
246 
247 	return POOL_CONTINUE;
248 }
249 
250 /*
251  * Handle misc process which is neccessary when query context exists.
252  */
253 void
handle_query_context(POOL_CONNECTION_POOL * backend)254 handle_query_context(POOL_CONNECTION_POOL * backend)
255 {
256 	POOL_SESSION_CONTEXT *session_context;
257 	Node	   *node;
258 
259 	/* Get session context */
260 	session_context = pool_get_session_context(false);
261 
262 	node = session_context->query_context->parse_tree;
263 
264 	if (IsA(node, PrepareStmt))
265 	{
266 		if (session_context->uncompleted_message)
267 		{
268 			pool_add_sent_message(session_context->uncompleted_message);
269 			session_context->uncompleted_message = NULL;
270 		}
271 	}
272 	else if (IsA(node, DeallocateStmt))
273 	{
274 		char	   *name;
275 
276 		name = ((DeallocateStmt *) node)->name;
277 		if (name == NULL)
278 		{
279 			pool_remove_sent_messages('Q');
280 			pool_remove_sent_messages('P');
281 		}
282 		else
283 		{
284 			pool_remove_sent_message('Q', name);
285 			pool_remove_sent_message('P', name);
286 		}
287 	}
288 	else if (IsA(node, DiscardStmt))
289 	{
290 		DiscardStmt *stmt = (DiscardStmt *) node;
291 
292 		if (stmt->target == DISCARD_PLANS)
293 		{
294 			pool_remove_sent_messages('Q');
295 			pool_remove_sent_messages('P');
296 		}
297 		else if (stmt->target == DISCARD_ALL)
298 		{
299 			pool_clear_sent_message_list();
300 		}
301 	}
302 
303 	/*
304 	 * JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
305 	 * But it does not send Sync message after "BEGIN" query.  In extended
306 	 * query protocol, PostgreSQL returns ReadyForQuery when a client sends
307 	 * Sync message.  Problem is, pgpool can't know the transaction state
308 	 * without receiving ReadyForQuery. So we remember that we need to send
309 	 * Sync message internally afterward, whenever we receive BEGIN in
310 	 * extended protocol.
311 	 */
312 	else if (IsA(node, TransactionStmt))
313 	{
314 		TransactionStmt *stmt = (TransactionStmt *) node;
315 
316 		if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
317 		{
318 			int			i;
319 
320 			for (i = 0; i < NUM_BACKENDS; i++)
321 			{
322 				if (!VALID_BACKEND(i))
323 					continue;
324 
325 				TSTATE(backend, i) = 'T';
326 			}
327 
328 			if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
329 				pool_unset_writing_transaction();
330 
331 			pool_unset_failed_transaction();
332 			pool_unset_transaction_isolation();
333 		}
334 		else if (stmt->kind == 	TRANS_STMT_COMMIT)
335 		{
336 			/* Commit ongoing CRETAE/DROP temp table status */
337 			pool_temp_tables_commit_pending();
338 		}
339 		else if (stmt->kind == TRANS_STMT_ROLLBACK)
340 		{
341 			/* Remove ongoing CRETAE/DROP temp table status */
342 			pool_temp_tables_remove_pending();
343 		}
344 	}
345 	else if (IsA(node, CreateStmt))
346 	{
347 		CreateStmt *stmt = (CreateStmt *) node;
348 		POOL_TEMP_TABLE_STATE	state;
349 
350 		/* Is this a temporary table? */
351 		if (stmt->relation->relpersistence == 't')
352 		{
353 			if (TSTATE(backend, MAIN_NODE_ID ) == 'T')	/* Are we inside a transaction? */
354 			{
355 				state = TEMP_TABLE_CREATING;
356 			}
357 			else
358 			{
359 				state = TEMP_TABLE_CREATE_COMMITTED;
360 			}
361 			ereport(DEBUG1,
362 					(errmsg("Creating temp table: %s. commit status: %d",
363 							stmt->relation->relname, state)));
364 			pool_temp_tables_add(stmt->relation->relname, state);
365 		}
366 	}
367 	else if (IsA(node, DropStmt))
368 	{
369 		DropStmt *stmt = (DropStmt *) node;
370 		POOL_TEMP_TABLE_STATE	state;
371 
372 		if (stmt->removeType == OBJECT_TABLE)
373 		{
374 			/* Loop through stmt->objects */
375 			ListCell   *cell;
376 			ListCell   *next;
377 
378 			if (TSTATE(backend, MAIN_NODE_ID ) == 'T')	/* Are we inside a transaction? */
379 			{
380 				state = TEMP_TABLE_DROPPING;
381 			}
382 			else
383 			{
384 				state = TEMP_TABLE_DROP_COMMITTED;
385 			}
386 
387 			for (cell = list_head(session_context->temp_tables); cell; cell = next)
388 			{
389 				char *tablename = (char *)lfirst(cell);
390 				ereport(DEBUG1,
391 						(errmsg("Dropping temp table: %s", tablename)));
392 				pool_temp_tables_delete(tablename, state);
393 
394 				if (!session_context->temp_tables)
395 					return;
396 
397 				next = lnext(session_context->temp_tables, cell);
398 			}
399 		}
400 	}
401 }
402 
403 /*
404  * Extract the number of tuples from CommandComplete message
405  */
406 static int
extract_ntuples(char * message)407 extract_ntuples(char *message)
408 {
409 	char	   *rows;
410 
411 	if ((rows = strstr(message, "UPDATE")) || (rows = strstr(message, "DELETE")))
412 		rows += 7;
413 	else if ((rows = strstr(message, "INSERT")))
414 	{
415 		rows += 7;
416 		while (*rows && *rows != ' ')
417 			rows++;
418 	}
419 	else
420 		return 0;
421 
422 	return atoi(rows);
423 }
424 
425 /*
426  * Handle mismatch tuples
427  */
handle_mismatch_tuples(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * packet,int packetlen,bool command_complete)428 static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete)
429 {
430 	POOL_SESSION_CONTEXT *session_context;
431 
432 	int			rows;
433 	int			i;
434 	int			len;
435 	char	   *p;
436 
437 	/* Get session context */
438 	session_context = pool_get_session_context(false);
439 
440 	rows = extract_ntuples(packet);
441 
442 	/*
443 	 * Save number of affected tuples of main node.
444 	 */
445 	session_context->ntuples[MAIN_NODE_ID] = rows;
446 
447 
448 	for (i = 0; i < NUM_BACKENDS; i++)
449 	{
450 		if (!IS_MAIN_NODE_ID(i))
451 		{
452 			if (!VALID_BACKEND(i))
453 			{
454 				session_context->ntuples[i] = -1;
455 				continue;
456 			}
457 
458 			pool_read(CONNECTION(backend, i), &len, sizeof(len));
459 
460 			len = ntohl(len);
461 			len -= 4;
462 
463 			p = pool_read2(CONNECTION(backend, i), len);
464 			if (p == NULL)
465 				return POOL_END;
466 
467 			if (len != packetlen)
468 			{
469 				ereport(DEBUG1,
470 						(errmsg("processing command complete"),
471 						 errdetail("length does not match between backends main(%d) %d th backend(%d)",
472 								   len, i, packetlen)));
473 			}
474 
475 			int			n = extract_ntuples(p);
476 
477 			/*
478 			 * Save number of affected tuples.
479 			 */
480 			session_context->ntuples[i] = n;
481 
482 			if (rows != n)
483 			{
484 				/*
485 				 * Remember that we have different number of UPDATE/DELETE
486 				 * affected tuples in backends.
487 				 */
488 				session_context->mismatch_ntuples = true;
489 			}
490 		}
491 	}
492 
493 	if (session_context->mismatch_ntuples)
494 	{
495 		char		msgbuf[128];
496 
497 		String	   *msg = init_string("pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
498 
499 		string_append_char(msg, query_string_buffer);
500 		string_append_char(msg, "\"");
501 		pool_send_error_message(frontend, MAJOR(backend),
502 								"XX001", msg->data, "",
503 								"check data consistency between main and other db node", __FILE__, __LINE__);
504 		ereport(LOG,
505 				(errmsg("%s", msg->data)));
506 		free_string(msg);
507 
508 		msg = init_string("CommandComplete: Number of affected tuples are:");
509 
510 		for (i = 0; i < NUM_BACKENDS; i++)
511 		{
512 			snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
513 			string_append_char(msg, msgbuf);
514 		}
515 		ereport(LOG,
516 				(errmsg("processing command complete"),
517 				 errdetail("%s", msg->data)));
518 
519 		free_string(msg);
520 	}
521 	else
522 	{
523 		if (command_complete)
524 		{
525 			if (foward_command_complete(frontend, packet, packetlen) < 0)
526 				return POOL_END;
527 		}
528 		else
529 		{
530 			if (foward_empty_query(frontend, packet, packetlen) < 0)
531 				return POOL_END;
532 		}
533 	}
534 
535 	return POOL_CONTINUE;
536 }
537 
538 /*
539  * Forward Command complete packet to frontend
540  */
541 static int
foward_command_complete(POOL_CONNECTION * frontend,char * packet,int packetlen)542 foward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen)
543 {
544 	return foward_packet_to_frontend(frontend, 'C', packet, packetlen);
545 }
546 
547 /*
548  * Forward Empty query response to frontend
549  */
550 static int
foward_empty_query(POOL_CONNECTION * frontend,char * packet,int packetlen)551 foward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen)
552 {
553 	return foward_packet_to_frontend(frontend, 'I', packet, packetlen);
554 }
555 
556 /*
557  * Forward packet to frontend
558  */
559 static int
foward_packet_to_frontend(POOL_CONNECTION * frontend,char kind,char * packet,int packetlen)560 foward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen)
561 {
562 	int			sendlen;
563 
564 	if (pool_write(frontend, &kind, 1) < 0)
565 		return -1;
566 
567 	sendlen = htonl(packetlen + 4);
568 	if (pool_write(frontend, &sendlen, sizeof(sendlen)) < 0)
569 		return -1;
570 
571 	pool_write_and_flush(frontend, packet, packetlen);
572 
573 	return 0;
574 }
575