1 /*-------------------------------------------------------------------------
2  *
3  * worker_transaction.c
4  *
5  * Routines for performing transactions across all workers.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *
9  * $Id$
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 #include "miscadmin.h"
16 #include "libpq-fe.h"
17 
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include "access/xact.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/listutils.h"
24 #include "distributed/metadata_cache.h"
25 #include "distributed/resource_lock.h"
26 #include "distributed/metadata_sync.h"
27 #include "distributed/remote_commands.h"
28 #include "distributed/pg_dist_node.h"
29 #include "distributed/pg_dist_transaction.h"
30 #include "distributed/transaction_recovery.h"
31 #include "distributed/worker_manager.h"
32 #include "distributed/worker_transaction.h"
33 #include "utils/memutils.h"
34 
35 
36 static void SendCommandToMetadataWorkersParams(const char *command,
37 											   const char *user, int parameterCount,
38 											   const Oid *parameterTypes,
39 											   const char *const *parameterValues);
40 static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
41 											   const char *command, const char *user,
42 											   int parameterCount,
43 											   const Oid *parameterTypes,
44 											   const char *const *parameterValues);
45 static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
46 static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,
47 												 const char *user);
48 static void GetConnectionsResults(List *connectionList, bool failOnError);
49 static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,
50 												   const char *command, const char *user,
51 												   bool
52 												   failOnError);
53 
54 /*
55  * SendCommandToWorker sends a command to a particular worker as part of the
56  * 2PC.
57  */
58 void
SendCommandToWorker(const char * nodeName,int32 nodePort,const char * command)59 SendCommandToWorker(const char *nodeName, int32 nodePort, const char *command)
60 {
61 	const char *nodeUser = CurrentUserName();
62 	SendCommandToWorkerAsUser(nodeName, nodePort, nodeUser, command);
63 }
64 
65 
66 /*
67  * SendCommandToWorkersAsUser sends a command to targetWorkerSet as a particular user
68  * as part of the 2PC.
69  */
70 void
SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,const char * nodeUser,const char * command)71 SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser,
72 						   const char *command)
73 {
74 	List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
75 
76 	/* run commands serially */
77 	WorkerNode *workerNode = NULL;
78 	foreach_ptr(workerNode, workerNodeList)
79 	{
80 		const char *nodeName = workerNode->workerName;
81 		int nodePort = workerNode->workerPort;
82 
83 		SendCommandToWorkerAsUser(nodeName, nodePort, nodeUser, command);
84 	}
85 }
86 
87 
88 /*
89  * SendCommandToWorkerAsUser sends a command to a particular worker as a particular user
90  * as part of the 2PC.
91  */
92 void
SendCommandToWorkerAsUser(const char * nodeName,int32 nodePort,const char * nodeUser,const char * command)93 SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser,
94 						  const char *command)
95 {
96 	uint32 connectionFlags = 0;
97 
98 	UseCoordinatedTransaction();
99 	Use2PCForCoordinatedTransaction();
100 
101 	MultiConnection *transactionConnection = GetNodeUserDatabaseConnection(
102 		connectionFlags, nodeName,
103 		nodePort,
104 		nodeUser, NULL);
105 
106 	MarkRemoteTransactionCritical(transactionConnection);
107 	RemoteTransactionBeginIfNecessary(transactionConnection);
108 	ExecuteCriticalRemoteCommand(transactionConnection, command);
109 }
110 
111 
112 /*
113  * SendCommandToWorkers sends a command to all workers in
114  * parallel. Commands are committed on the workers when the local
115  * transaction commits. The connection are made as the extension
116  * owner to ensure write access to the Citus metadata tables.
117  */
118 void
SendCommandToWorkersWithMetadata(const char * command)119 SendCommandToWorkersWithMetadata(const char *command)
120 {
121 	SendCommandToMetadataWorkersParams(command, CurrentUserName(),
122 									   0, NULL, NULL);
123 }
124 
125 
126 /*
127  * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
128  * TargetWorkerSet.
129  */
130 List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet,LOCKMODE lockMode)131 TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
132 {
133 	List *workerNodeList = NIL;
134 	if (targetWorkerSet == ALL_SHARD_NODES)
135 	{
136 		workerNodeList = ActivePrimaryNodeList(lockMode);
137 	}
138 	else
139 	{
140 		workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
141 	}
142 	List *result = NIL;
143 
144 	WorkerNode *workerNode = NULL;
145 	foreach_ptr(workerNode, workerNodeList)
146 	{
147 		if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata)
148 		{
149 			continue;
150 		}
151 
152 		result = lappend(result, workerNode);
153 	}
154 
155 	return result;
156 }
157 
158 
159 /*
160  * SendBareCommandListToMetadataWorkers sends a list of commands to metadata
161  * workers in serial. Commands are committed immediately: new connections are
162  * always used and no transaction block is used (hence "bare"). The connections
163  * are made as the extension owner to ensure write access to the Citus metadata
164  * tables. Primarly useful for INDEX commands using CONCURRENTLY.
165  */
166 void
SendBareCommandListToMetadataWorkers(List * commandList)167 SendBareCommandListToMetadataWorkers(List *commandList)
168 {
169 	TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
170 	List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
171 	char *nodeUser = CurrentUserName();
172 
173 	ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
174 
175 	/* run commands serially */
176 	WorkerNode *workerNode = NULL;
177 	foreach_ptr(workerNode, workerNodeList)
178 	{
179 		const char *nodeName = workerNode->workerName;
180 		int nodePort = workerNode->workerPort;
181 		int connectionFlags = FORCE_NEW_CONNECTION;
182 
183 		MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
184 																		  nodeName,
185 																		  nodePort,
186 																		  nodeUser, NULL);
187 
188 		/* iterate over the commands and execute them in the same connection */
189 		const char *commandString = NULL;
190 		foreach_ptr(commandString, commandList)
191 		{
192 			ExecuteCriticalRemoteCommand(workerConnection, commandString);
193 		}
194 
195 		CloseConnection(workerConnection);
196 	}
197 }
198 
199 
200 /*
201  * SendCommandToMetadataWorkersParams is a wrapper around
202  * SendCommandToWorkersParamsInternal() enforcing some extra checks.
203  */
204 static void
SendCommandToMetadataWorkersParams(const char * command,const char * user,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues)205 SendCommandToMetadataWorkersParams(const char *command,
206 								   const char *user, int parameterCount,
207 								   const Oid *parameterTypes,
208 								   const char *const *parameterValues)
209 {
210 	List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
211 												   ShareLock);
212 
213 	ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
214 
215 	SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
216 									   parameterCount, parameterTypes,
217 									   parameterValues);
218 }
219 
220 
221 /*
222  * SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel.
223  * It does error if there is a problem while sending the query, but it doesn't error
224  * if there is a problem while executing the query.
225  */
226 void
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,const char * command,const char * user)227 SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
228 									   char *command,
229 									   const char *user)
230 {
231 	bool failOnError = false;
232 	SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
233 										   failOnError);
234 }
235 
236 
237 /*
238  * SendCommandToWorkersInParallel sends the given command to workers in parallel.
239  * It does error if there is a problem while sending the query, it errors if there
240  * was any problem when sending/receiving.
241  */
242 void
SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet,const char * command,const char * user)243 SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
244 							   char *command,
245 							   const char *user)
246 {
247 	bool failOnError = true;
248 	SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
249 										   failOnError);
250 }
251 
252 
253 /*
254  * SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel.
255  */
256 static void
SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,const char * command,const char * user,bool failOnError)257 SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
258 									   char *command, const char *user, bool
259 									   failOnError)
260 {
261 	List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user);
262 
263 	/* finish opening connections */
264 	FinishConnectionListEstablishment(connectionList);
265 
266 	/* send commands in parallel */
267 	MultiConnection *connection = NULL;
268 	foreach_ptr(connection, connectionList)
269 	{
270 		int querySent = SendRemoteCommand(connection, command);
271 		if (failOnError && querySent == 0)
272 		{
273 			ReportConnectionError(connection, ERROR);
274 		}
275 	}
276 
277 	GetConnectionsResults(connectionList, failOnError);
278 }
279 
280 
281 /*
282  * OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel,
283  * as the given user.
284  */
285 static List *
OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,const char * user)286 OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user)
287 {
288 	List *connectionList = NIL;
289 
290 	List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
291 
292 	WorkerNode *workerNode = NULL;
293 	foreach_ptr(workerNode, workerNodeList)
294 	{
295 		const char *nodeName = workerNode->workerName;
296 		int nodePort = workerNode->workerPort;
297 		int32 connectionFlags = OUTSIDE_TRANSACTION;
298 
299 		MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
300 																	  nodeName, nodePort,
301 																	  user, NULL);
302 
303 		/*
304 		 * connection can only be NULL for optional connections, which we don't
305 		 * support in this codepath.
306 		 */
307 		Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
308 		Assert(connection != NULL);
309 		connectionList = lappend(connectionList, connection);
310 	}
311 	return connectionList;
312 }
313 
314 
315 /*
316  * GetConnectionsResults gets remote command results
317  * for the given connections. It raises any error if failOnError is true.
318  */
319 static void
GetConnectionsResults(List * connectionList,bool failOnError)320 GetConnectionsResults(List *connectionList, bool failOnError)
321 {
322 	MultiConnection *connection = NULL;
323 	foreach_ptr(connection, connectionList)
324 	{
325 		bool raiseInterrupt = false;
326 		PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
327 
328 		bool isResponseOK = result != NULL && IsResponseOK(result);
329 		if (failOnError && !isResponseOK)
330 		{
331 			ReportResultError(connection, result, ERROR);
332 		}
333 
334 		PQclear(result);
335 
336 		if (isResponseOK)
337 		{
338 			ForgetResults(connection);
339 		}
340 	}
341 }
342 
343 
344 /*
345  * SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
346  * Commands are committed on the workers when the local transaction commits. The
347  * connection are made as the extension owner to ensure write access to the Citus
348  * metadata tables. Parameters can be specified as for PQexecParams, except that
349  * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0
350  * respectively.
351  */
352 static void
SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,const char * command,const char * user,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues)353 SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command,
354 								   const char *user, int parameterCount,
355 								   const Oid *parameterTypes,
356 								   const char *const *parameterValues)
357 {
358 	List *connectionList = NIL;
359 	List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
360 
361 	UseCoordinatedTransaction();
362 	Use2PCForCoordinatedTransaction();
363 
364 	/* open connections in parallel */
365 	WorkerNode *workerNode = NULL;
366 	foreach_ptr(workerNode, workerNodeList)
367 	{
368 		const char *nodeName = workerNode->workerName;
369 		int nodePort = workerNode->workerPort;
370 		int32 connectionFlags = 0;
371 
372 		MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
373 																	  nodeName, nodePort,
374 																	  user, NULL);
375 
376 		/*
377 		 * connection can only be NULL for optional connections, which we don't
378 		 * support in this codepath.
379 		 */
380 		Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
381 		Assert(connection != NULL);
382 		MarkRemoteTransactionCritical(connection);
383 
384 		connectionList = lappend(connectionList, connection);
385 	}
386 
387 	/* finish opening connections */
388 	FinishConnectionListEstablishment(connectionList);
389 
390 	RemoteTransactionsBeginIfNecessary(connectionList);
391 
392 	/* send commands in parallel */
393 	MultiConnection *connection = NULL;
394 	foreach_ptr(connection, connectionList)
395 	{
396 		int querySent = SendRemoteCommandParams(connection, command, parameterCount,
397 												parameterTypes, parameterValues, false);
398 		if (querySent == 0)
399 		{
400 			ReportConnectionError(connection, ERROR);
401 		}
402 	}
403 
404 	/* get results */
405 	foreach_ptr(connection, connectionList)
406 	{
407 		PGresult *result = GetRemoteCommandResult(connection, true);
408 		if (!IsResponseOK(result))
409 		{
410 			ReportResultError(connection, result, ERROR);
411 		}
412 
413 		PQclear(result);
414 
415 		ForgetResults(connection);
416 	}
417 }
418 
419 
420 /*
421  * EnsureNoModificationsHaveBeenDone reports an error if we have performed any
422  * modification in the current transaction to prevent opening a connection is such cases.
423  */
424 void
EnsureNoModificationsHaveBeenDone()425 EnsureNoModificationsHaveBeenDone()
426 {
427 	if (XactModificationLevel > XACT_MODIFICATION_NONE)
428 	{
429 		ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
430 						errmsg("cannot open new connections after the first modification "
431 							   "command within a transaction")));
432 	}
433 }
434 
435 
436 /*
437  * SendCommandListToWorkerOutsideTransaction forces to open a new connection
438  * to the node with the given nodeName and nodePort. Then, the connection starts
439  * a transaction on the remote node and executes the commands in the transaction.
440  * The function raises error if any of the queries fails.
441  */
442 void
SendCommandListToWorkerOutsideTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)443 SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
444 										  const char *nodeUser, List *commandList)
445 {
446 	int connectionFlags = FORCE_NEW_CONNECTION;
447 
448 	MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
449 																	  nodeName, nodePort,
450 																	  nodeUser, NULL);
451 
452 	MarkRemoteTransactionCritical(workerConnection);
453 	RemoteTransactionBegin(workerConnection);
454 
455 	/* iterate over the commands and execute them in the same connection */
456 	const char *commandString = NULL;
457 	foreach_ptr(commandString, commandList)
458 	{
459 		ExecuteCriticalRemoteCommand(workerConnection, commandString);
460 	}
461 
462 	RemoteTransactionCommit(workerConnection);
463 	CloseConnection(workerConnection);
464 }
465 
466 
467 /*
468  * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
469  * with the given nodeName and nodePort. The commands are sent as part of the
470  * coordinated transaction. Any failures aborts the coordinated transaction.
471  */
472 void
SendCommandListToWorkerInCoordinatedTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)473 SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort,
474 												const char *nodeUser, List *commandList)
475 {
476 	int connectionFlags = 0;
477 
478 	UseCoordinatedTransaction();
479 
480 	MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
481 																	  nodeName, nodePort,
482 																	  nodeUser, NULL);
483 
484 	MarkRemoteTransactionCritical(workerConnection);
485 	RemoteTransactionBeginIfNecessary(workerConnection);
486 
487 	/* iterate over the commands and execute them in the same connection */
488 	const char *commandString = NULL;
489 	foreach_ptr(commandString, commandList)
490 	{
491 		ExecuteCriticalRemoteCommand(workerConnection, commandString);
492 	}
493 }
494 
495 
496 /*
497  * SendOptionalCommandListToWorkerOutsideTransaction sends the given command
498  * list to the given worker in a single transaction that is outside of the
499  * coordinated tranaction. If any of the commands fail, it rollbacks the
500  * transaction, and otherwise commits.
501  */
502 bool
SendOptionalCommandListToWorkerOutsideTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)503 SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
504 												  const char *nodeUser, List *commandList)
505 {
506 	int connectionFlags = FORCE_NEW_CONNECTION;
507 	bool failed = false;
508 
509 	MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
510 																	  nodeName, nodePort,
511 																	  nodeUser, NULL);
512 	if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
513 	{
514 		return false;
515 	}
516 	RemoteTransactionBegin(workerConnection);
517 
518 	/* iterate over the commands and execute them in the same connection */
519 	const char *commandString = NULL;
520 	foreach_ptr(commandString, commandList)
521 	{
522 		if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
523 		{
524 			failed = true;
525 			break;
526 		}
527 	}
528 
529 	if (failed)
530 	{
531 		RemoteTransactionAbort(workerConnection);
532 	}
533 	else
534 	{
535 		RemoteTransactionCommit(workerConnection);
536 	}
537 
538 	CloseConnection(workerConnection);
539 
540 	return !failed;
541 }
542 
543 
544 /*
545  * SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given
546  * command list to the given worker as part of the coordinated transaction.
547  * If any of the commands fail, the function returns false.
548  */
549 bool
SendOptionalCommandListToWorkerInCoordinatedTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)550 SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32
551 														nodePort,
552 														const char *nodeUser,
553 														List *commandList)
554 {
555 	int connectionFlags = 0;
556 	bool failed = false;
557 
558 	UseCoordinatedTransaction();
559 
560 	MultiConnection *workerConnection =
561 		GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
562 									  nodeUser, NULL);
563 	if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
564 	{
565 		return false;
566 	}
567 
568 	RemoteTransactionsBeginIfNecessary(list_make1(workerConnection));
569 
570 	/* iterate over the commands and execute them in the same connection */
571 	const char *commandString = NULL;
572 	foreach_ptr(commandString, commandList)
573 	{
574 		if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) !=
575 			RESPONSE_OKAY)
576 		{
577 			failed = true;
578 
579 			bool raiseErrors = false;
580 			MarkRemoteTransactionFailed(workerConnection, raiseErrors);
581 			break;
582 		}
583 	}
584 
585 	return !failed;
586 }
587 
588 
589 /*
590  * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
591  * metadata nodes are out of sync. It is safer to avoid metadata changing
592  * commands (e.g. DDL or node addition) until all metadata nodes have
593  * been synced.
594  *
595  * An example of we could get in a bad situation without doing so is:
596  *  1. Create a reference table
597  *  2. After the node becomes out of sync, add a new active node
598  *  3. Insert into the reference table from the out of sync node
599  *
600  * Since the out-of-sync might not know about the new node, it won't propagate
601  * the changes to the new node and replicas will be in an inconsistent state.
602  */
603 static void
ErrorIfAnyMetadataNodeOutOfSync(List * metadataNodeList)604 ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList)
605 {
606 	WorkerNode *metadataNode = NULL;
607 	foreach_ptr(metadataNode, metadataNodeList)
608 	{
609 		Assert(metadataNode->hasMetadata);
610 
611 		if (!metadataNode->metadataSynced)
612 		{
613 			const char *workerName = metadataNode->workerName;
614 			int workerPort = metadataNode->workerPort;
615 			ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
616 							errmsg("%s:%d is a metadata node, but is out of sync",
617 								   workerName, workerPort),
618 							errhint("If the node is up, wait until metadata"
619 									" gets synced to it and try again.")));
620 		}
621 	}
622 }
623