1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <access/htup_details.h>
8 #include <access/xact.h>
9 #include <utils/hsearch.h>
10 #include <utils/builtins.h>
11 #include <utils/memutils.h>
12 #include <utils/syscache.h>
13
14 #include "dist_txn.h"
15 #include "connection.h"
16 #include "async.h"
17 #include "txn.h"
18 #include "txn_store.h"
19 #include "guc.h"
20
21 #ifdef DEBUG
22
23 static const DistTransactionEventHandler *event_handler = NULL;
24 static const char *eventnames[MAX_DTXN_EVENT] = {
25 [DTXN_EVENT_ANY] = "any",
26 [DTXN_EVENT_PRE_COMMIT] = "pre-commit",
27 [DTXN_EVENT_WAIT_COMMIT] = "waiting-commit",
28 [DTXN_EVENT_PRE_ABORT] = "pre-abort",
29 [DTXN_EVENT_PRE_PREPARE] = "pre-prepare-transaction",
30 [DTXN_EVENT_WAIT_PREPARE] = "waiting-prepare-transaction",
31 [DTXN_EVENT_POST_PREPARE] = "post-prepare-transaction",
32 [DTXN_EVENT_PRE_COMMIT_PREPARED] = "pre-commit-prepared",
33 [DTXN_EVENT_WAIT_COMMIT_PREPARED] = "waiting-commit-prepared",
34 [DTXN_EVENT_SUB_XACT_ABORT] = "subxact-abort",
35 };
36
37 void
remote_dist_txn_set_event_handler(const DistTransactionEventHandler * handler)38 remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler)
39 {
40 event_handler = handler;
41 }
42
43 static inline void
eventcallback(const DistTransactionEvent event)44 eventcallback(const DistTransactionEvent event)
45 {
46 if (NULL != event_handler && NULL != event_handler->handler)
47 event_handler->handler(event, event_handler->data);
48 }
49
50 DistTransactionEvent
remote_dist_txn_event_from_name(const char * eventname)51 remote_dist_txn_event_from_name(const char *eventname)
52 {
53 int i;
54
55 for (i = 0; i < MAX_DTXN_EVENT; i++)
56 {
57 if (strcmp(eventname, eventnames[i]) == 0)
58 return i;
59 }
60
61 elog(ERROR, "invalid event name");
62 pg_unreachable();
63 }
64
65 const char *
remote_dist_txn_event_name(const DistTransactionEvent event)66 remote_dist_txn_event_name(const DistTransactionEvent event)
67 {
68 return eventnames[event];
69 }
70
71 #else
72 #define eventcallback(event) \
73 do \
74 { \
75 } while (0)
76 #endif
77
78 static RemoteTxnStore *store = NULL;
79
80 /*
81 * Get a connection which can be used to execute queries on the remote PostgreSQL
82 * data node with the user's authorization. A new connection is established
83 * if we don't already have a suitable one, and a transaction is opened at
84 * the right subtransaction nesting depth if we didn't do that already.
85 *
86 * will_prep_stmt must be true if caller intends to create any prepared
87 * statements. Since those don't go away automatically at transaction end
88 * (not even on error), we need this flag to cue manual cleanup.
89 */
90 TSConnection *
remote_dist_txn_get_connection(TSConnectionId id,RemoteTxnPrepStmtOption prep_stmt_opt)91 remote_dist_txn_get_connection(TSConnectionId id, RemoteTxnPrepStmtOption prep_stmt_opt)
92 {
93 bool found;
94 RemoteTxn *remote_txn;
95
96 /* First time through, initialize the remote_txn_store */
97 if (store == NULL)
98 store = remote_txn_store_create(TopTransactionContext);
99
100 remote_txn = remote_txn_store_get(store, id, &found);
101 remote_txn_begin(remote_txn, GetCurrentTransactionNestLevel());
102 remote_txn_set_will_prep_statement(remote_txn, prep_stmt_opt);
103
104 return remote_txn_get_connection(remote_txn);
105 }
106
107 /* This potentially deallocates prepared statements that were created in a subtxn
108 * that aborted before it deallocated the statement.
109 */
110 static void
dist_txn_deallocate_prepared_stmts_if_needed()111 dist_txn_deallocate_prepared_stmts_if_needed()
112 {
113 RemoteTxn *remote_txn;
114
115 /* below deallocate only happens on error so not worth making async */
116 remote_txn_store_foreach(store, remote_txn)
117 {
118 remote_txn_deallocate_prepared_stmts_if_needed(remote_txn);
119 }
120 }
121
122 /* Perform actions on one-phase pre-commit.
123 * Mainly just send a COMMIT to all remote nodes and wait for successes.
124 */
125 static void
dist_txn_xact_callback_1pc_pre_commit()126 dist_txn_xact_callback_1pc_pre_commit()
127 {
128 RemoteTxn *remote_txn;
129 AsyncRequestSet *ars = async_request_set_create();
130
131 eventcallback(DTXN_EVENT_PRE_COMMIT);
132
133 /* send a commit to all connections */
134 remote_txn_store_foreach(store, remote_txn)
135 {
136 Assert(remote_connection_xact_depth_get(remote_txn_get_connection(remote_txn)) > 0);
137
138 /* Commit all remote transactions during pre-commit */
139 async_request_set_add(ars, remote_txn_async_send_commit(remote_txn));
140 }
141
142 eventcallback(DTXN_EVENT_WAIT_COMMIT);
143
144 /* async collect all the replies */
145 async_request_set_wait_all_ok_commands(ars);
146 dist_txn_deallocate_prepared_stmts_if_needed();
147 }
148
149 /*
150 * Abort on the access node.
151 *
152 * The access node needs to send aborts to all of the remote endpoints. This
153 * code should not throw errors itself, since we are already in abort due to a
154 * previous error. Instead, we try to emit errors as warnings. For safety, we
155 * should probaby try-catch and swallow any potential lower-layer errors given
156 * that we're doing remote calls over the network. But the semantics for
157 * capturing and proceeding after such recursive errors are unclear.
158 */
159 static void
dist_txn_xact_callback_abort()160 dist_txn_xact_callback_abort()
161 {
162 RemoteTxn *remote_txn;
163
164 eventcallback(DTXN_EVENT_PRE_ABORT);
165
166 remote_txn_store_foreach(store, remote_txn)
167 {
168 if (remote_txn_is_ongoing(remote_txn) && !remote_txn_abort(remote_txn))
169 elog(WARNING,
170 "transaction rollback on data node \"%s\" failed",
171 remote_connection_node_name(remote_txn_get_connection(remote_txn)));
172 }
173 }
174
175 /*
176 * Reject transactions that didn't successfully complete a transaction
177 * transition at some point.
178 */
179 static void
reject_transaction_with_incomplete_transition(RemoteTxn * remote_txn)180 reject_transaction_with_incomplete_transition(RemoteTxn *remote_txn)
181 {
182 const TSConnection *conn = remote_txn_get_connection(remote_txn);
183
184 if (remote_connection_xact_is_transitioning(conn))
185 {
186 NameData nodename;
187
188 namestrcpy(&nodename, remote_connection_node_name(conn));
189 remote_txn_store_remove(store, remote_txn_get_connection_id(remote_txn));
190
191 ereport(ERROR,
192 (errcode(ERRCODE_CONNECTION_EXCEPTION),
193 errmsg("connection to data node \"%s\" was lost", NameStr(nodename))));
194 }
195 }
196
197 static void
reject_transactions_with_incomplete_transitions(void)198 reject_transactions_with_incomplete_transitions(void)
199 {
200 RemoteTxn *remote_txn;
201
202 remote_txn_store_foreach(store, remote_txn)
203 {
204 reject_transaction_with_incomplete_transition(remote_txn);
205 }
206 }
207
208 static void
cleanup_at_end_of_transaction(void)209 cleanup_at_end_of_transaction(void)
210 {
211 RemoteTxn *remote_txn;
212
213 remote_txn_store_foreach(store, remote_txn)
214 {
215 TSConnection *conn = remote_txn_get_connection(remote_txn);
216
217 /* The connection could have failed at START TRANSACTION, in which
218 * case the depth is 0. Otherwise, we'd expect depth 1. */
219 if (remote_connection_xact_depth_get(conn) > 0)
220 {
221 PGconn *pgconn = remote_connection_get_pg_conn(conn);
222
223 /* Indicate we're out of the transaction */
224 Assert(remote_connection_xact_depth_get(conn) == 1);
225 remote_connection_xact_depth_dec(conn);
226
227 /* Cleanup connections with failed transactions */
228 if (PQstatus(pgconn) != CONNECTION_OK || PQtransactionStatus(pgconn) != PQTRANS_IDLE ||
229 remote_connection_xact_is_transitioning(conn))
230 {
231 elog(DEBUG3, "discarding connection %p", conn);
232 remote_txn_store_remove(store, remote_txn_get_connection_id(remote_txn));
233 }
234 }
235 }
236
237 remote_txn_store_destroy(store);
238 store = NULL;
239
240 /*
241 * cursor are per-connection and txn so it's safe to reset at the end of
242 * the txn.
243 */
244 remote_connection_reset_cursor_number();
245 }
246
247 /*
248 * Transaction callback for one-phase commits.
249 *
250 * With one-phase commits, we send a remote commit during local pre-commit or
251 * a remote abort during local abort.
252 */
253 static void
dist_txn_xact_callback_1pc(XactEvent event,void * arg)254 dist_txn_xact_callback_1pc(XactEvent event, void *arg)
255 {
256 switch (event)
257 {
258 case XACT_EVENT_PRE_COMMIT:
259 case XACT_EVENT_PARALLEL_PRE_COMMIT:
260 reject_transactions_with_incomplete_transitions();
261 dist_txn_xact_callback_1pc_pre_commit();
262 break;
263 case XACT_EVENT_PRE_PREPARE:
264
265 /*
266 * Cannot prepare stuff on the access node.
267 */
268 ereport(ERROR,
269 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
270 errmsg("cannot prepare a transaction that modified "
271 "remote tables")));
272 break;
273 case XACT_EVENT_PARALLEL_COMMIT:
274 case XACT_EVENT_COMMIT:
275 case XACT_EVENT_PREPARE:
276 /* Pre-commit should have closed the open transaction in 1pc */
277 elog(ERROR, "missed cleaning up connection during pre-commit");
278 break;
279 case XACT_EVENT_PARALLEL_ABORT:
280 case XACT_EVENT_ABORT:
281 dist_txn_xact_callback_abort();
282 break;
283 }
284
285 /* In one-phase commit, we're done irrespective of event */
286 cleanup_at_end_of_transaction();
287 }
288
289 static void
dist_txn_send_prepare_transaction()290 dist_txn_send_prepare_transaction()
291 {
292 RemoteTxn *remote_txn;
293 AsyncRequestSet *ars = async_request_set_create();
294 AsyncResponse *error_response = NULL;
295 AsyncResponse *res;
296
297 eventcallback(DTXN_EVENT_PRE_PREPARE);
298
299 /* send a prepare transaction to all connections */
300 remote_txn_store_foreach(store, remote_txn)
301 {
302 AsyncRequest *req;
303
304 remote_txn_write_persistent_record(remote_txn);
305 req = remote_txn_async_send_prepare_transaction(remote_txn);
306 async_request_set_add(ars, req);
307 }
308
309 eventcallback(DTXN_EVENT_WAIT_PREPARE);
310
311 /*
312 * async collect the replies. Since errors in PREPARE TRANSACTION are not
313 * uncommon, handle them gracefully: delay throwing errors in results
314 * until all responses collected since you need to mark
315 * changing_xact_state correctly. So throw errors on connection errors but
316 * not errors in results.
317 */
318 error_response = NULL;
319 while ((res = async_request_set_wait_any_response(ars)))
320 {
321 switch (async_response_get_type(res))
322 {
323 case RESPONSE_COMMUNICATION_ERROR:
324 case RESPONSE_ERROR:
325 case RESPONSE_ROW:
326 case RESPONSE_TIMEOUT:
327 elog(DEBUG3, "error during second phase of two-phase commit");
328 async_response_report_error(res, ERROR);
329 continue;
330 case RESPONSE_RESULT:
331 {
332 AsyncResponseResult *response_result = (AsyncResponseResult *) res;
333 bool success =
334 PQresultStatus(async_response_result_get_pg_result(response_result)) ==
335 PGRES_COMMAND_OK;
336
337 if (!success)
338 {
339 /* save first error, warn about subsequent errors */
340 if (error_response == NULL)
341 error_response = (AsyncResponse *) response_result;
342 else
343 async_response_report_error((AsyncResponse *) response_result, WARNING);
344 }
345 else
346 async_response_close(res);
347 break;
348 }
349 }
350 }
351
352 if (error_response != NULL)
353 async_response_report_error(error_response, ERROR);
354
355 eventcallback(DTXN_EVENT_POST_PREPARE);
356 }
357
358 static void
dist_txn_send_commit_prepared_transaction()359 dist_txn_send_commit_prepared_transaction()
360 {
361 RemoteTxn *remote_txn;
362 AsyncRequestSet *ars = async_request_set_create();
363 AsyncResponse *res;
364
365 /*
366 * send a commit transaction to all connections and asynchronously collect
367 * the replies
368 */
369 remote_txn_store_foreach(store, remote_txn)
370 {
371 AsyncRequest *req;
372
373 req = remote_txn_async_send_commit_prepared(remote_txn);
374
375 if (req == NULL)
376 {
377 elog(DEBUG3, "error during second phase of two-phase commit");
378 continue;
379 }
380
381 async_request_set_add(ars, req);
382 }
383
384 eventcallback(DTXN_EVENT_WAIT_COMMIT_PREPARED);
385
386 /* async collect the replies */
387 while ((res = async_request_set_wait_any_response(ars)))
388 {
389 /* throw WARNINGS not ERRORS here */
390 /*
391 * NOTE: warnings make sure that all data nodes get a commit prepared.
392 * But, there is arguably some weirdness here in terms of RYOW if
393 * there is an error.
394 */
395 AsyncResponseResult *response_result;
396
397 switch (async_response_get_type(res))
398 {
399 case RESPONSE_COMMUNICATION_ERROR:
400 case RESPONSE_ERROR:
401 case RESPONSE_ROW:
402 case RESPONSE_TIMEOUT:
403 elog(DEBUG3, "error during second phase of two-phase commit");
404 async_response_report_error(res, WARNING);
405 continue;
406 case RESPONSE_RESULT:
407 response_result = (AsyncResponseResult *) res;
408 if (PQresultStatus(async_response_result_get_pg_result(response_result)) !=
409 PGRES_COMMAND_OK)
410 async_response_report_error(res, WARNING);
411 else
412 async_response_close(res);
413 break;
414 }
415 }
416 }
417
418 /*
419 * Transaction callback for two-phase commit.
420 *
421 * With two-phase commits, we write a persistent record and send a remote
422 * PREPARE TRANSACTION during local pre-commit. After commit we send a remote
423 * COMMIT TRANSACTION.
424 */
425 static void
dist_txn_xact_callback_2pc(XactEvent event,void * arg)426 dist_txn_xact_callback_2pc(XactEvent event, void *arg)
427 {
428 switch (event)
429 {
430 case XACT_EVENT_PARALLEL_PRE_COMMIT:
431 case XACT_EVENT_PRE_COMMIT:
432 reject_transactions_with_incomplete_transitions();
433 dist_txn_send_prepare_transaction();
434 dist_txn_deallocate_prepared_stmts_if_needed();
435 break;
436 case XACT_EVENT_PRE_PREPARE:
437 case XACT_EVENT_PREPARE:
438
439 /*
440 * Cannot prepare stuff on the access node.
441 */
442 ereport(ERROR,
443 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
444 errmsg("cannot prepare a transaction that modified "
445 "remote tables")));
446 break;
447 case XACT_EVENT_PARALLEL_COMMIT:
448 case XACT_EVENT_COMMIT:
449 eventcallback(DTXN_EVENT_PRE_COMMIT_PREPARED);
450
451 /*
452 * We send a commit here so that future commands on this
453 * connection get read-your-own-writes semantics. Later, we can
454 * optimize latency on connections by doing this in a background
455 * process and using IPC to assure RYOW
456 */
457 dist_txn_send_commit_prepared_transaction();
458
459 /*
460 * NOTE: You cannot delete the remote_txn_persistent_record here
461 * because you are out of transaction. Therefore cleanup of those
462 * entries has to happen in a background process or manually.
463 */
464 cleanup_at_end_of_transaction();
465 break;
466 case XACT_EVENT_PARALLEL_ABORT:
467 case XACT_EVENT_ABORT:
468 dist_txn_xact_callback_abort();
469 cleanup_at_end_of_transaction();
470 break;
471 }
472 }
473
474 static void
dist_txn_xact_callback(XactEvent event,void * arg)475 dist_txn_xact_callback(XactEvent event, void *arg)
476 {
477 /* Quick exit if no connections were touched in this transaction. */
478 if (store == NULL)
479 return;
480
481 if (ts_guc_enable_2pc)
482 dist_txn_xact_callback_2pc(event, arg);
483 else
484 dist_txn_xact_callback_1pc(event, arg);
485 }
486
487 /*
488 * Subtransaction callback handler.
489 *
490 * If the subtxn was committed, send a RELEASE SAVEPOINT to the remote nodes.
491 * If the subtxn was aborted, send a ROLLBACK SAVEPOINT and set a deferred
492 * error if that fails.
493 */
494 static void
dist_txn_subxact_callback(SubXactEvent event,SubTransactionId mySubid,SubTransactionId parentSubid,void * arg)495 dist_txn_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
496 SubTransactionId parentSubid, void *arg)
497 {
498 RemoteTxn *remote_txn;
499 int curlevel;
500
501 /* Quick exit if no connections were touched in this transaction. */
502 if (store == NULL)
503 return;
504
505 switch (event)
506 {
507 case SUBXACT_EVENT_START_SUB:
508 case SUBXACT_EVENT_COMMIT_SUB:
509 /* Nothing to do at subxact start, nor after commit. */
510 return;
511 case SUBXACT_EVENT_PRE_COMMIT_SUB:
512 reject_transactions_with_incomplete_transitions();
513 break;
514 case SUBXACT_EVENT_ABORT_SUB:
515 eventcallback(DTXN_EVENT_SUB_XACT_ABORT);
516 break;
517 }
518
519 curlevel = GetCurrentTransactionNestLevel();
520
521 remote_txn_store_foreach(store, remote_txn)
522 {
523 TSConnection *conn = remote_txn_get_connection(remote_txn);
524
525 if (!remote_txn_is_at_sub_txn_level(remote_txn, curlevel))
526 continue;
527
528 if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
529 {
530 reject_transaction_with_incomplete_transition(remote_txn);
531 remote_txn_sub_txn_pre_commit(remote_txn, curlevel);
532 }
533 else
534 {
535 Assert(event == SUBXACT_EVENT_ABORT_SUB);
536 remote_txn_sub_txn_abort(remote_txn, curlevel);
537 }
538
539 remote_connection_xact_depth_dec(conn);
540 }
541 }
542
543 void
_remote_dist_txn_init()544 _remote_dist_txn_init()
545 {
546 RegisterXactCallback(dist_txn_xact_callback, NULL);
547 RegisterSubXactCallback(dist_txn_subxact_callback, NULL);
548 }
549
550 void
_remote_dist_txn_fini()551 _remote_dist_txn_fini()
552 {
553 /* can't unregister callbacks */
554 if (NULL != store)
555 {
556 remote_txn_store_destroy(store);
557 store = NULL;
558 }
559 }
560