1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <access/htup_details.h>
8 #include <access/xact.h>
9 #include <utils/hsearch.h>
10 #include <utils/builtins.h>
11 #include <utils/memutils.h>
12 #include <utils/syscache.h>
13 
14 #include "dist_txn.h"
15 #include "connection.h"
16 #include "async.h"
17 #include "txn.h"
18 #include "txn_store.h"
19 #include "guc.h"
20 
21 #ifdef DEBUG
22 
23 static const DistTransactionEventHandler *event_handler = NULL;
24 static const char *eventnames[MAX_DTXN_EVENT] = {
25 	[DTXN_EVENT_ANY] = "any",
26 	[DTXN_EVENT_PRE_COMMIT] = "pre-commit",
27 	[DTXN_EVENT_WAIT_COMMIT] = "waiting-commit",
28 	[DTXN_EVENT_PRE_ABORT] = "pre-abort",
29 	[DTXN_EVENT_PRE_PREPARE] = "pre-prepare-transaction",
30 	[DTXN_EVENT_WAIT_PREPARE] = "waiting-prepare-transaction",
31 	[DTXN_EVENT_POST_PREPARE] = "post-prepare-transaction",
32 	[DTXN_EVENT_PRE_COMMIT_PREPARED] = "pre-commit-prepared",
33 	[DTXN_EVENT_WAIT_COMMIT_PREPARED] = "waiting-commit-prepared",
34 	[DTXN_EVENT_SUB_XACT_ABORT] = "subxact-abort",
35 };
36 
37 void
remote_dist_txn_set_event_handler(const DistTransactionEventHandler * handler)38 remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler)
39 {
40 	event_handler = handler;
41 }
42 
43 static inline void
eventcallback(const DistTransactionEvent event)44 eventcallback(const DistTransactionEvent event)
45 {
46 	if (NULL != event_handler && NULL != event_handler->handler)
47 		event_handler->handler(event, event_handler->data);
48 }
49 
50 DistTransactionEvent
remote_dist_txn_event_from_name(const char * eventname)51 remote_dist_txn_event_from_name(const char *eventname)
52 {
53 	int i;
54 
55 	for (i = 0; i < MAX_DTXN_EVENT; i++)
56 	{
57 		if (strcmp(eventname, eventnames[i]) == 0)
58 			return i;
59 	}
60 
61 	elog(ERROR, "invalid event name");
62 	pg_unreachable();
63 }
64 
65 const char *
remote_dist_txn_event_name(const DistTransactionEvent event)66 remote_dist_txn_event_name(const DistTransactionEvent event)
67 {
68 	return eventnames[event];
69 }
70 
71 #else
72 #define eventcallback(event)                                                                       \
73 	do                                                                                             \
74 	{                                                                                              \
75 	} while (0)
76 #endif
77 
78 static RemoteTxnStore *store = NULL;
79 
80 /*
81  * Get a connection which can be used to execute queries on the remote PostgreSQL
82  * data node with the user's authorization.  A new connection is established
83  * if we don't already have a suitable one, and a transaction is opened at
84  * the right subtransaction nesting depth if we didn't do that already.
85  *
86  * will_prep_stmt must be true if caller intends to create any prepared
87  * statements.  Since those don't go away automatically at transaction end
88  * (not even on error), we need this flag to cue manual cleanup.
89  */
90 TSConnection *
remote_dist_txn_get_connection(TSConnectionId id,RemoteTxnPrepStmtOption prep_stmt_opt)91 remote_dist_txn_get_connection(TSConnectionId id, RemoteTxnPrepStmtOption prep_stmt_opt)
92 {
93 	bool found;
94 	RemoteTxn *remote_txn;
95 
96 	/* First time through, initialize the remote_txn_store */
97 	if (store == NULL)
98 		store = remote_txn_store_create(TopTransactionContext);
99 
100 	remote_txn = remote_txn_store_get(store, id, &found);
101 	remote_txn_begin(remote_txn, GetCurrentTransactionNestLevel());
102 	remote_txn_set_will_prep_statement(remote_txn, prep_stmt_opt);
103 
104 	return remote_txn_get_connection(remote_txn);
105 }
106 
107 /* This potentially deallocates prepared statements that were created in a subtxn
108  * that aborted before it deallocated the statement.
109  */
110 static void
dist_txn_deallocate_prepared_stmts_if_needed()111 dist_txn_deallocate_prepared_stmts_if_needed()
112 {
113 	RemoteTxn *remote_txn;
114 
115 	/* below deallocate only happens on error so not worth making async */
116 	remote_txn_store_foreach(store, remote_txn)
117 	{
118 		remote_txn_deallocate_prepared_stmts_if_needed(remote_txn);
119 	}
120 }
121 
122 /* Perform actions on one-phase pre-commit.
123  * Mainly just send a COMMIT to all remote nodes and wait for successes.
124  */
125 static void
dist_txn_xact_callback_1pc_pre_commit()126 dist_txn_xact_callback_1pc_pre_commit()
127 {
128 	RemoteTxn *remote_txn;
129 	AsyncRequestSet *ars = async_request_set_create();
130 
131 	eventcallback(DTXN_EVENT_PRE_COMMIT);
132 
133 	/* send a commit to all connections */
134 	remote_txn_store_foreach(store, remote_txn)
135 	{
136 		Assert(remote_connection_xact_depth_get(remote_txn_get_connection(remote_txn)) > 0);
137 
138 		/* Commit all remote transactions during pre-commit */
139 		async_request_set_add(ars, remote_txn_async_send_commit(remote_txn));
140 	}
141 
142 	eventcallback(DTXN_EVENT_WAIT_COMMIT);
143 
144 	/* async collect all the replies */
145 	async_request_set_wait_all_ok_commands(ars);
146 	dist_txn_deallocate_prepared_stmts_if_needed();
147 }
148 
149 /*
150  * Abort on the access node.
151  *
152  * The access node needs to send aborts to all of the remote endpoints. This
153  * code should not throw errors itself, since we are already in abort due to a
154  * previous error. Instead, we try to emit errors as warnings. For safety, we
155  * should probaby try-catch and swallow any potential lower-layer errors given
156  * that we're doing remote calls over the network. But the semantics for
157  * capturing and proceeding after such recursive errors are unclear.
158  */
159 static void
dist_txn_xact_callback_abort()160 dist_txn_xact_callback_abort()
161 {
162 	RemoteTxn *remote_txn;
163 
164 	eventcallback(DTXN_EVENT_PRE_ABORT);
165 
166 	remote_txn_store_foreach(store, remote_txn)
167 	{
168 		if (remote_txn_is_ongoing(remote_txn) && !remote_txn_abort(remote_txn))
169 			elog(WARNING,
170 				 "transaction rollback on data node \"%s\" failed",
171 				 remote_connection_node_name(remote_txn_get_connection(remote_txn)));
172 	}
173 }
174 
175 /*
176  * Reject transactions that didn't successfully complete a transaction
177  * transition at some point.
178  */
179 static void
reject_transaction_with_incomplete_transition(RemoteTxn * remote_txn)180 reject_transaction_with_incomplete_transition(RemoteTxn *remote_txn)
181 {
182 	const TSConnection *conn = remote_txn_get_connection(remote_txn);
183 
184 	if (remote_connection_xact_is_transitioning(conn))
185 	{
186 		NameData nodename;
187 
188 		namestrcpy(&nodename, remote_connection_node_name(conn));
189 		remote_txn_store_remove(store, remote_txn_get_connection_id(remote_txn));
190 
191 		ereport(ERROR,
192 				(errcode(ERRCODE_CONNECTION_EXCEPTION),
193 				 errmsg("connection to data node \"%s\" was lost", NameStr(nodename))));
194 	}
195 }
196 
197 static void
reject_transactions_with_incomplete_transitions(void)198 reject_transactions_with_incomplete_transitions(void)
199 {
200 	RemoteTxn *remote_txn;
201 
202 	remote_txn_store_foreach(store, remote_txn)
203 	{
204 		reject_transaction_with_incomplete_transition(remote_txn);
205 	}
206 }
207 
208 static void
cleanup_at_end_of_transaction(void)209 cleanup_at_end_of_transaction(void)
210 {
211 	RemoteTxn *remote_txn;
212 
213 	remote_txn_store_foreach(store, remote_txn)
214 	{
215 		TSConnection *conn = remote_txn_get_connection(remote_txn);
216 
217 		/* The connection could have failed at START TRANSACTION, in which
218 		 * case the depth is 0. Otherwise, we'd expect depth 1. */
219 		if (remote_connection_xact_depth_get(conn) > 0)
220 		{
221 			PGconn *pgconn = remote_connection_get_pg_conn(conn);
222 
223 			/* Indicate we're out of the transaction */
224 			Assert(remote_connection_xact_depth_get(conn) == 1);
225 			remote_connection_xact_depth_dec(conn);
226 
227 			/* Cleanup connections with failed transactions */
228 			if (PQstatus(pgconn) != CONNECTION_OK || PQtransactionStatus(pgconn) != PQTRANS_IDLE ||
229 				remote_connection_xact_is_transitioning(conn))
230 			{
231 				elog(DEBUG3, "discarding connection %p", conn);
232 				remote_txn_store_remove(store, remote_txn_get_connection_id(remote_txn));
233 			}
234 		}
235 	}
236 
237 	remote_txn_store_destroy(store);
238 	store = NULL;
239 
240 	/*
241 	 * cursor are per-connection and txn so it's safe to reset at the end of
242 	 * the txn.
243 	 */
244 	remote_connection_reset_cursor_number();
245 }
246 
247 /*
248  * Transaction callback for one-phase commits.
249  *
250  * With one-phase commits, we send a remote commit during local pre-commit or
251  * a remote abort during local abort.
252  */
253 static void
dist_txn_xact_callback_1pc(XactEvent event,void * arg)254 dist_txn_xact_callback_1pc(XactEvent event, void *arg)
255 {
256 	switch (event)
257 	{
258 		case XACT_EVENT_PRE_COMMIT:
259 		case XACT_EVENT_PARALLEL_PRE_COMMIT:
260 			reject_transactions_with_incomplete_transitions();
261 			dist_txn_xact_callback_1pc_pre_commit();
262 			break;
263 		case XACT_EVENT_PRE_PREPARE:
264 
265 			/*
266 			 * Cannot prepare stuff on the access node.
267 			 */
268 			ereport(ERROR,
269 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
270 					 errmsg("cannot prepare a transaction that modified "
271 							"remote tables")));
272 			break;
273 		case XACT_EVENT_PARALLEL_COMMIT:
274 		case XACT_EVENT_COMMIT:
275 		case XACT_EVENT_PREPARE:
276 			/* Pre-commit should have closed the open transaction in 1pc */
277 			elog(ERROR, "missed cleaning up connection during pre-commit");
278 			break;
279 		case XACT_EVENT_PARALLEL_ABORT:
280 		case XACT_EVENT_ABORT:
281 			dist_txn_xact_callback_abort();
282 			break;
283 	}
284 
285 	/* In one-phase commit, we're done irrespective of event */
286 	cleanup_at_end_of_transaction();
287 }
288 
289 static void
dist_txn_send_prepare_transaction()290 dist_txn_send_prepare_transaction()
291 {
292 	RemoteTxn *remote_txn;
293 	AsyncRequestSet *ars = async_request_set_create();
294 	AsyncResponse *error_response = NULL;
295 	AsyncResponse *res;
296 
297 	eventcallback(DTXN_EVENT_PRE_PREPARE);
298 
299 	/* send a prepare transaction to all connections */
300 	remote_txn_store_foreach(store, remote_txn)
301 	{
302 		AsyncRequest *req;
303 
304 		remote_txn_write_persistent_record(remote_txn);
305 		req = remote_txn_async_send_prepare_transaction(remote_txn);
306 		async_request_set_add(ars, req);
307 	}
308 
309 	eventcallback(DTXN_EVENT_WAIT_PREPARE);
310 
311 	/*
312 	 * async collect the replies. Since errors in PREPARE TRANSACTION are not
313 	 * uncommon, handle them gracefully: delay throwing errors in results
314 	 * until all responses collected since you need to mark
315 	 * changing_xact_state correctly. So throw errors on connection errors but
316 	 * not errors in results.
317 	 */
318 	error_response = NULL;
319 	while ((res = async_request_set_wait_any_response(ars)))
320 	{
321 		switch (async_response_get_type(res))
322 		{
323 			case RESPONSE_COMMUNICATION_ERROR:
324 			case RESPONSE_ERROR:
325 			case RESPONSE_ROW:
326 			case RESPONSE_TIMEOUT:
327 				elog(DEBUG3, "error during second phase of two-phase commit");
328 				async_response_report_error(res, ERROR);
329 				continue;
330 			case RESPONSE_RESULT:
331 			{
332 				AsyncResponseResult *response_result = (AsyncResponseResult *) res;
333 				bool success =
334 					PQresultStatus(async_response_result_get_pg_result(response_result)) ==
335 					PGRES_COMMAND_OK;
336 
337 				if (!success)
338 				{
339 					/* save first error, warn about subsequent errors */
340 					if (error_response == NULL)
341 						error_response = (AsyncResponse *) response_result;
342 					else
343 						async_response_report_error((AsyncResponse *) response_result, WARNING);
344 				}
345 				else
346 					async_response_close(res);
347 				break;
348 			}
349 		}
350 	}
351 
352 	if (error_response != NULL)
353 		async_response_report_error(error_response, ERROR);
354 
355 	eventcallback(DTXN_EVENT_POST_PREPARE);
356 }
357 
358 static void
dist_txn_send_commit_prepared_transaction()359 dist_txn_send_commit_prepared_transaction()
360 {
361 	RemoteTxn *remote_txn;
362 	AsyncRequestSet *ars = async_request_set_create();
363 	AsyncResponse *res;
364 
365 	/*
366 	 * send a commit transaction to all connections and asynchronously collect
367 	 * the replies
368 	 */
369 	remote_txn_store_foreach(store, remote_txn)
370 	{
371 		AsyncRequest *req;
372 
373 		req = remote_txn_async_send_commit_prepared(remote_txn);
374 
375 		if (req == NULL)
376 		{
377 			elog(DEBUG3, "error during second phase of two-phase commit");
378 			continue;
379 		}
380 
381 		async_request_set_add(ars, req);
382 	}
383 
384 	eventcallback(DTXN_EVENT_WAIT_COMMIT_PREPARED);
385 
386 	/* async collect the replies */
387 	while ((res = async_request_set_wait_any_response(ars)))
388 	{
389 		/* throw WARNINGS not ERRORS here */
390 		/*
391 		 * NOTE: warnings make sure that all data nodes get a commit prepared.
392 		 * But, there is arguably some weirdness here in terms of RYOW if
393 		 * there is an error.
394 		 */
395 		AsyncResponseResult *response_result;
396 
397 		switch (async_response_get_type(res))
398 		{
399 			case RESPONSE_COMMUNICATION_ERROR:
400 			case RESPONSE_ERROR:
401 			case RESPONSE_ROW:
402 			case RESPONSE_TIMEOUT:
403 				elog(DEBUG3, "error during second phase of two-phase commit");
404 				async_response_report_error(res, WARNING);
405 				continue;
406 			case RESPONSE_RESULT:
407 				response_result = (AsyncResponseResult *) res;
408 				if (PQresultStatus(async_response_result_get_pg_result(response_result)) !=
409 					PGRES_COMMAND_OK)
410 					async_response_report_error(res, WARNING);
411 				else
412 					async_response_close(res);
413 				break;
414 		}
415 	}
416 }
417 
418 /*
419  * Transaction callback for two-phase commit.
420  *
421  * With two-phase commits, we write a persistent record and send a remote
422  * PREPARE TRANSACTION during local pre-commit. After commit we send a remote
423  * COMMIT TRANSACTION.
424  */
425 static void
dist_txn_xact_callback_2pc(XactEvent event,void * arg)426 dist_txn_xact_callback_2pc(XactEvent event, void *arg)
427 {
428 	switch (event)
429 	{
430 		case XACT_EVENT_PARALLEL_PRE_COMMIT:
431 		case XACT_EVENT_PRE_COMMIT:
432 			reject_transactions_with_incomplete_transitions();
433 			dist_txn_send_prepare_transaction();
434 			dist_txn_deallocate_prepared_stmts_if_needed();
435 			break;
436 		case XACT_EVENT_PRE_PREPARE:
437 		case XACT_EVENT_PREPARE:
438 
439 			/*
440 			 * Cannot prepare stuff on the access node.
441 			 */
442 			ereport(ERROR,
443 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
444 					 errmsg("cannot prepare a transaction that modified "
445 							"remote tables")));
446 			break;
447 		case XACT_EVENT_PARALLEL_COMMIT:
448 		case XACT_EVENT_COMMIT:
449 			eventcallback(DTXN_EVENT_PRE_COMMIT_PREPARED);
450 
451 			/*
452 			 * We send a commit here so that future commands on this
453 			 * connection get read-your-own-writes semantics. Later, we can
454 			 * optimize latency on connections by doing this in a background
455 			 * process and using IPC to assure RYOW
456 			 */
457 			dist_txn_send_commit_prepared_transaction();
458 
459 			/*
460 			 * NOTE: You cannot delete the remote_txn_persistent_record here
461 			 * because you are out of transaction. Therefore cleanup of those
462 			 * entries has to happen in a background process or manually.
463 			 */
464 			cleanup_at_end_of_transaction();
465 			break;
466 		case XACT_EVENT_PARALLEL_ABORT:
467 		case XACT_EVENT_ABORT:
468 			dist_txn_xact_callback_abort();
469 			cleanup_at_end_of_transaction();
470 			break;
471 	}
472 }
473 
474 static void
dist_txn_xact_callback(XactEvent event,void * arg)475 dist_txn_xact_callback(XactEvent event, void *arg)
476 {
477 	/* Quick exit if no connections were touched in this transaction. */
478 	if (store == NULL)
479 		return;
480 
481 	if (ts_guc_enable_2pc)
482 		dist_txn_xact_callback_2pc(event, arg);
483 	else
484 		dist_txn_xact_callback_1pc(event, arg);
485 }
486 
487 /*
488  * Subtransaction callback handler.
489  *
490  * If the subtxn was committed, send a RELEASE SAVEPOINT to the remote nodes.
491  * If the subtxn was aborted, send a ROLLBACK SAVEPOINT and set a deferred
492  * error if that fails.
493  */
494 static void
dist_txn_subxact_callback(SubXactEvent event,SubTransactionId mySubid,SubTransactionId parentSubid,void * arg)495 dist_txn_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
496 						  SubTransactionId parentSubid, void *arg)
497 {
498 	RemoteTxn *remote_txn;
499 	int curlevel;
500 
501 	/* Quick exit if no connections were touched in this transaction. */
502 	if (store == NULL)
503 		return;
504 
505 	switch (event)
506 	{
507 		case SUBXACT_EVENT_START_SUB:
508 		case SUBXACT_EVENT_COMMIT_SUB:
509 			/* Nothing to do at subxact start, nor after commit. */
510 			return;
511 		case SUBXACT_EVENT_PRE_COMMIT_SUB:
512 			reject_transactions_with_incomplete_transitions();
513 			break;
514 		case SUBXACT_EVENT_ABORT_SUB:
515 			eventcallback(DTXN_EVENT_SUB_XACT_ABORT);
516 			break;
517 	}
518 
519 	curlevel = GetCurrentTransactionNestLevel();
520 
521 	remote_txn_store_foreach(store, remote_txn)
522 	{
523 		TSConnection *conn = remote_txn_get_connection(remote_txn);
524 
525 		if (!remote_txn_is_at_sub_txn_level(remote_txn, curlevel))
526 			continue;
527 
528 		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
529 		{
530 			reject_transaction_with_incomplete_transition(remote_txn);
531 			remote_txn_sub_txn_pre_commit(remote_txn, curlevel);
532 		}
533 		else
534 		{
535 			Assert(event == SUBXACT_EVENT_ABORT_SUB);
536 			remote_txn_sub_txn_abort(remote_txn, curlevel);
537 		}
538 
539 		remote_connection_xact_depth_dec(conn);
540 	}
541 }
542 
543 void
_remote_dist_txn_init()544 _remote_dist_txn_init()
545 {
546 	RegisterXactCallback(dist_txn_xact_callback, NULL);
547 	RegisterSubXactCallback(dist_txn_subxact_callback, NULL);
548 }
549 
550 void
_remote_dist_txn_fini()551 _remote_dist_txn_fini()
552 {
553 	/* can't unregister callbacks */
554 	if (NULL != store)
555 	{
556 		remote_txn_store_destroy(store);
557 		store = NULL;
558 	}
559 }
560