1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2021 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 *
21 *---------------------------------------------------------------------
22 * This file contains modules which process the "Command Complete" and "Empty
23 * query response" message sent from backend. The main function is
24 * "CommandComplete".
25 *---------------------------------------------------------------------
26 */
27 #include <string.h>
28 #include <arpa/inet.h>
29
30 #include "pool.h"
31 #include "protocol/pool_proto_modules.h"
32 #include "protocol/pool_process_query.h"
33 #include "parser/pg_config_manual.h"
34 #include "parser/pool_string.h"
35 #include "pool_config.h"
36 #include "context/pool_session_context.h"
37 #include "context/pool_query_context.h"
38 #include "utils/elog.h"
39 #include "utils/palloc.h"
40 #include "utils/memutils.h"
41 #include "utils/pool_stream.h"
42
43 static int extract_ntuples(char *message);
44 static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete);
45 static int foward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen);
46 static int foward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen);
47 static int foward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen);
48
49 POOL_STATUS
CommandComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool command_complete)50 CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete)
51 {
52 int len,
53 len1;
54 char *p,
55 *p1;
56 int i;
57 POOL_SESSION_CONTEXT *session_context;
58 POOL_CONNECTION *con;
59
60 p1 = NULL;
61 len1 = 0;
62
63 /* Get session context */
64 session_context = pool_get_session_context(false);
65
66 /*
67 * Handle misc process which is neccessary when query context exists.
68 */
69 if (session_context->query_context != NULL && (!SL_MODE || (SL_MODE && !pool_is_doing_extended_query_message())))
70 handle_query_context(backend);
71
72 /*
73 * If operated in streaming replication mode and doing an extended query,
74 * read backend message according to the query context. Also we set the
75 * transaction state at this point.
76 */
77 if (SL_MODE && pool_is_doing_extended_query_message())
78 {
79 p1 = NULL;
80
81 for (i = 0; i < NUM_BACKENDS; i++)
82 {
83 if (VALID_BACKEND(i))
84 {
85 con = CONNECTION(backend, i);
86
87 if (pool_read(con, &len, sizeof(len)) < 0)
88 {
89 if (p1 != NULL)
90 pfree(p1);
91 return POOL_END;
92 }
93
94 len = ntohl(len);
95 len -= 4;
96 len1 = len;
97
98 p = pool_read2(con, len);
99 if (p == NULL)
100 {
101 if (p1 != NULL)
102 pfree(p1);
103 return POOL_END;
104 }
105 if (p1 != NULL)
106 pfree(p1);
107 p1 = palloc(len);
108 memcpy(p1, p, len);
109
110 if (session_context->query_context &&
111 session_context->query_context->parse_tree &&
112 is_start_transaction_query(session_context->query_context->parse_tree))
113 TSTATE(backend, i) = 'T'; /* we are inside a transaction */
114 {
115 ereport(DEBUG1,
116 (errmsg("processing command complete"),
117 errdetail("set transaction state to T")));
118 }
119 }
120 }
121 }
122
123 /*
124 * Otherwise just read from main node.
125 */
126 else
127 {
128 con = MAIN(backend);
129
130 if (pool_read(con, &len, sizeof(len)) < 0)
131 return POOL_END;
132
133 len = ntohl(len);
134 len -= 4;
135 len1 = len;
136
137 p = pool_read2(con, len);
138 if (p == NULL)
139 return POOL_END;
140 p1 = palloc(len);
141 memcpy(p1, p, len);
142 }
143
144 /*
145 * If operated in streaming replication mode and extended query mode, just
146 * forward the packet to frontend and we are done. Otherwise, we need to
147 * do mismatch tuples process (forwarding to frontend is done in
148 * handle_mismatch_tuples().
149 */
150 if (SL_MODE && pool_is_doing_extended_query_message())
151 {
152 int status;
153
154 if (p1 == NULL)
155 {
156 elog(WARNING, "CommandComplete: expected p1 is not NULL");
157 return POOL_END;
158 }
159
160 if (command_complete)
161 status = foward_command_complete(frontend, p1, len1);
162 else
163 status = foward_empty_query(frontend, p1, len1);
164
165 if (status < 0)
166 return POOL_END;
167 }
168 else
169 {
170 if (handle_mismatch_tuples(frontend, backend, p1, len1, command_complete) != POOL_CONTINUE)
171 return POOL_END;
172 }
173
174 /* Save the received result to buffer for each kind */
175 if (pool_config->memory_cache_enabled)
176 {
177 if (pool_is_cache_safe() && !pool_is_cache_exceeded())
178 {
179 memqcache_register('C', frontend, p1, len1);
180 }
181
182 /*
183 * If we are doing extended query, register the SELECT result to temp
184 * query cache now.
185 */
186 if (pool_is_doing_extended_query_message())
187 {
188 char *query;
189 Node *node;
190 char state;
191
192 if (session_context->query_context == NULL)
193 {
194 elog(WARNING, "expected query_contex is NULL");
195 return POOL_END;
196 }
197 query = session_context->query_context->query_w_hex;
198 node = pool_get_parse_tree();
199 state = TSTATE(backend, MAIN_NODE_ID);
200 pool_handle_query_cache(backend, query, node, state);
201 }
202 }
203
204 pfree(p1);
205
206 if (pool_is_doing_extended_query_message() && pool_is_query_in_progress())
207 {
208 pool_set_query_state(session_context->query_context, POOL_EXECUTE_COMPLETE);
209 }
210
211 /*
212 * If we are in streaming replication mode and we are doing extended
213 * query, reset query in progress flag and prevoius pending message.
214 */
215 if (SL_MODE && pool_is_doing_extended_query_message())
216 {
217 pool_at_command_success(frontend, backend);
218 pool_unset_query_in_progress();
219 pool_pending_message_reset_previous_message();
220
221 if (session_context->query_context == NULL)
222 {
223 elog(WARNING, "At command complete there's no query context");
224 }
225 else
226 {
227 /*
228 * Destroy query context if it is not referenced from sent
229 * messages and pending messages except bound to named statements
230 * or portals. Named statements and portals should remain until
231 * they are explicitly closed.
232 */
233 if (can_query_context_destroy(session_context->query_context))
234
235 {
236 POOL_SENT_MESSAGE * msg = pool_get_sent_message_by_query_context(session_context->query_context);
237
238 if (!msg || (msg && *msg->name == '\0'))
239 {
240 pool_zap_query_context_in_sent_messages(session_context->query_context);
241 pool_query_context_destroy(session_context->query_context);
242 }
243 }
244 }
245 }
246
247 return POOL_CONTINUE;
248 }
249
250 /*
251 * Handle misc process which is neccessary when query context exists.
252 */
253 void
handle_query_context(POOL_CONNECTION_POOL * backend)254 handle_query_context(POOL_CONNECTION_POOL * backend)
255 {
256 POOL_SESSION_CONTEXT *session_context;
257 Node *node;
258
259 /* Get session context */
260 session_context = pool_get_session_context(false);
261
262 node = session_context->query_context->parse_tree;
263
264 if (IsA(node, PrepareStmt))
265 {
266 if (session_context->uncompleted_message)
267 {
268 pool_add_sent_message(session_context->uncompleted_message);
269 session_context->uncompleted_message = NULL;
270 }
271 }
272 else if (IsA(node, DeallocateStmt))
273 {
274 char *name;
275
276 name = ((DeallocateStmt *) node)->name;
277 if (name == NULL)
278 {
279 pool_remove_sent_messages('Q');
280 pool_remove_sent_messages('P');
281 }
282 else
283 {
284 pool_remove_sent_message('Q', name);
285 pool_remove_sent_message('P', name);
286 }
287 }
288 else if (IsA(node, DiscardStmt))
289 {
290 DiscardStmt *stmt = (DiscardStmt *) node;
291
292 if (stmt->target == DISCARD_PLANS)
293 {
294 pool_remove_sent_messages('Q');
295 pool_remove_sent_messages('P');
296 }
297 else if (stmt->target == DISCARD_ALL)
298 {
299 pool_clear_sent_message_list();
300 }
301 }
302
303 /*
304 * JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
305 * But it does not send Sync message after "BEGIN" query. In extended
306 * query protocol, PostgreSQL returns ReadyForQuery when a client sends
307 * Sync message. Problem is, pgpool can't know the transaction state
308 * without receiving ReadyForQuery. So we remember that we need to send
309 * Sync message internally afterward, whenever we receive BEGIN in
310 * extended protocol.
311 */
312 else if (IsA(node, TransactionStmt))
313 {
314 TransactionStmt *stmt = (TransactionStmt *) node;
315
316 if (stmt->kind == TRANS_STMT_BEGIN || stmt->kind == TRANS_STMT_START)
317 {
318 int i;
319
320 for (i = 0; i < NUM_BACKENDS; i++)
321 {
322 if (!VALID_BACKEND(i))
323 continue;
324
325 TSTATE(backend, i) = 'T';
326 }
327
328 if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
329 pool_unset_writing_transaction();
330
331 pool_unset_failed_transaction();
332 pool_unset_transaction_isolation();
333 }
334 else if (stmt->kind == TRANS_STMT_COMMIT)
335 {
336 /* Commit ongoing CRETAE/DROP temp table status */
337 pool_temp_tables_commit_pending();
338 }
339 else if (stmt->kind == TRANS_STMT_ROLLBACK)
340 {
341 /* Remove ongoing CRETAE/DROP temp table status */
342 pool_temp_tables_remove_pending();
343 }
344 }
345 else if (IsA(node, CreateStmt))
346 {
347 CreateStmt *stmt = (CreateStmt *) node;
348 POOL_TEMP_TABLE_STATE state;
349
350 /* Is this a temporary table? */
351 if (stmt->relation->relpersistence == 't')
352 {
353 if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */
354 {
355 state = TEMP_TABLE_CREATING;
356 }
357 else
358 {
359 state = TEMP_TABLE_CREATE_COMMITTED;
360 }
361 ereport(DEBUG1,
362 (errmsg("Creating temp table: %s. commit status: %d",
363 stmt->relation->relname, state)));
364 pool_temp_tables_add(stmt->relation->relname, state);
365 }
366 }
367 else if (IsA(node, DropStmt))
368 {
369 DropStmt *stmt = (DropStmt *) node;
370 POOL_TEMP_TABLE_STATE state;
371
372 if (stmt->removeType == OBJECT_TABLE)
373 {
374 /* Loop through stmt->objects */
375 ListCell *cell;
376 ListCell *next;
377
378 if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */
379 {
380 state = TEMP_TABLE_DROPPING;
381 }
382 else
383 {
384 state = TEMP_TABLE_DROP_COMMITTED;
385 }
386
387 for (cell = list_head(session_context->temp_tables); cell; cell = next)
388 {
389 char *tablename = (char *)lfirst(cell);
390 ereport(DEBUG1,
391 (errmsg("Dropping temp table: %s", tablename)));
392 pool_temp_tables_delete(tablename, state);
393
394 if (!session_context->temp_tables)
395 return;
396
397 next = lnext(session_context->temp_tables, cell);
398 }
399 }
400 }
401 }
402
403 /*
404 * Extract the number of tuples from CommandComplete message
405 */
406 static int
extract_ntuples(char * message)407 extract_ntuples(char *message)
408 {
409 char *rows;
410
411 if ((rows = strstr(message, "UPDATE")) || (rows = strstr(message, "DELETE")))
412 rows += 7;
413 else if ((rows = strstr(message, "INSERT")))
414 {
415 rows += 7;
416 while (*rows && *rows != ' ')
417 rows++;
418 }
419 else
420 return 0;
421
422 return atoi(rows);
423 }
424
425 /*
426 * Handle mismatch tuples
427 */
handle_mismatch_tuples(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * packet,int packetlen,bool command_complete)428 static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete)
429 {
430 POOL_SESSION_CONTEXT *session_context;
431
432 int rows;
433 int i;
434 int len;
435 char *p;
436
437 /* Get session context */
438 session_context = pool_get_session_context(false);
439
440 rows = extract_ntuples(packet);
441
442 /*
443 * Save number of affected tuples of main node.
444 */
445 session_context->ntuples[MAIN_NODE_ID] = rows;
446
447
448 for (i = 0; i < NUM_BACKENDS; i++)
449 {
450 if (!IS_MAIN_NODE_ID(i))
451 {
452 if (!VALID_BACKEND(i))
453 {
454 session_context->ntuples[i] = -1;
455 continue;
456 }
457
458 pool_read(CONNECTION(backend, i), &len, sizeof(len));
459
460 len = ntohl(len);
461 len -= 4;
462
463 p = pool_read2(CONNECTION(backend, i), len);
464 if (p == NULL)
465 return POOL_END;
466
467 if (len != packetlen)
468 {
469 ereport(DEBUG1,
470 (errmsg("processing command complete"),
471 errdetail("length does not match between backends main(%d) %d th backend(%d)",
472 len, i, packetlen)));
473 }
474
475 int n = extract_ntuples(p);
476
477 /*
478 * Save number of affected tuples.
479 */
480 session_context->ntuples[i] = n;
481
482 if (rows != n)
483 {
484 /*
485 * Remember that we have different number of UPDATE/DELETE
486 * affected tuples in backends.
487 */
488 session_context->mismatch_ntuples = true;
489 }
490 }
491 }
492
493 if (session_context->mismatch_ntuples)
494 {
495 char msgbuf[128];
496
497 String *msg = init_string("pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
498
499 string_append_char(msg, query_string_buffer);
500 string_append_char(msg, "\"");
501 pool_send_error_message(frontend, MAJOR(backend),
502 "XX001", msg->data, "",
503 "check data consistency between main and other db node", __FILE__, __LINE__);
504 ereport(LOG,
505 (errmsg("%s", msg->data)));
506 free_string(msg);
507
508 msg = init_string("CommandComplete: Number of affected tuples are:");
509
510 for (i = 0; i < NUM_BACKENDS; i++)
511 {
512 snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
513 string_append_char(msg, msgbuf);
514 }
515 ereport(LOG,
516 (errmsg("processing command complete"),
517 errdetail("%s", msg->data)));
518
519 free_string(msg);
520 }
521 else
522 {
523 if (command_complete)
524 {
525 if (foward_command_complete(frontend, packet, packetlen) < 0)
526 return POOL_END;
527 }
528 else
529 {
530 if (foward_empty_query(frontend, packet, packetlen) < 0)
531 return POOL_END;
532 }
533 }
534
535 return POOL_CONTINUE;
536 }
537
538 /*
539 * Forward Command complete packet to frontend
540 */
541 static int
foward_command_complete(POOL_CONNECTION * frontend,char * packet,int packetlen)542 foward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen)
543 {
544 return foward_packet_to_frontend(frontend, 'C', packet, packetlen);
545 }
546
547 /*
548 * Forward Empty query response to frontend
549 */
550 static int
foward_empty_query(POOL_CONNECTION * frontend,char * packet,int packetlen)551 foward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen)
552 {
553 return foward_packet_to_frontend(frontend, 'I', packet, packetlen);
554 }
555
556 /*
557 * Forward packet to frontend
558 */
559 static int
foward_packet_to_frontend(POOL_CONNECTION * frontend,char kind,char * packet,int packetlen)560 foward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen)
561 {
562 int sendlen;
563
564 if (pool_write(frontend, &kind, 1) < 0)
565 return -1;
566
567 sendlen = htonl(packetlen + 4);
568 if (pool_write(frontend, &sendlen, sizeof(sendlen)) < 0)
569 return -1;
570
571 pool_write_and_flush(frontend, packet, packetlen);
572
573 return 0;
574 }
575