1 /*-------------------------------------------------------------------------
2 *
3 * worker_transaction.c
4 *
5 * Routines for performing transactions across all workers.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *
9 * $Id$
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15 #include "miscadmin.h"
16 #include "libpq-fe.h"
17
18 #include <sys/stat.h>
19 #include <unistd.h>
20
21 #include "access/xact.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/listutils.h"
24 #include "distributed/metadata_cache.h"
25 #include "distributed/resource_lock.h"
26 #include "distributed/metadata_sync.h"
27 #include "distributed/remote_commands.h"
28 #include "distributed/pg_dist_node.h"
29 #include "distributed/pg_dist_transaction.h"
30 #include "distributed/transaction_recovery.h"
31 #include "distributed/worker_manager.h"
32 #include "distributed/worker_transaction.h"
33 #include "utils/memutils.h"
34
35
36 static void SendCommandToMetadataWorkersParams(const char *command,
37 const char *user, int parameterCount,
38 const Oid *parameterTypes,
39 const char *const *parameterValues);
40 static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
41 const char *command, const char *user,
42 int parameterCount,
43 const Oid *parameterTypes,
44 const char *const *parameterValues);
45 static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
46 static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,
47 const char *user);
48 static void GetConnectionsResults(List *connectionList, bool failOnError);
49 static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,
50 const char *command, const char *user,
51 bool
52 failOnError);
53
54 /*
55 * SendCommandToWorker sends a command to a particular worker as part of the
56 * 2PC.
57 */
58 void
SendCommandToWorker(const char * nodeName,int32 nodePort,const char * command)59 SendCommandToWorker(const char *nodeName, int32 nodePort, const char *command)
60 {
61 const char *nodeUser = CurrentUserName();
62 SendCommandToWorkerAsUser(nodeName, nodePort, nodeUser, command);
63 }
64
65
66 /*
67 * SendCommandToWorkersAsUser sends a command to targetWorkerSet as a particular user
68 * as part of the 2PC.
69 */
70 void
SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,const char * nodeUser,const char * command)71 SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser,
72 const char *command)
73 {
74 List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
75
76 /* run commands serially */
77 WorkerNode *workerNode = NULL;
78 foreach_ptr(workerNode, workerNodeList)
79 {
80 const char *nodeName = workerNode->workerName;
81 int nodePort = workerNode->workerPort;
82
83 SendCommandToWorkerAsUser(nodeName, nodePort, nodeUser, command);
84 }
85 }
86
87
88 /*
89 * SendCommandToWorkerAsUser sends a command to a particular worker as a particular user
90 * as part of the 2PC.
91 */
92 void
SendCommandToWorkerAsUser(const char * nodeName,int32 nodePort,const char * nodeUser,const char * command)93 SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser,
94 const char *command)
95 {
96 uint32 connectionFlags = 0;
97
98 UseCoordinatedTransaction();
99 Use2PCForCoordinatedTransaction();
100
101 MultiConnection *transactionConnection = GetNodeUserDatabaseConnection(
102 connectionFlags, nodeName,
103 nodePort,
104 nodeUser, NULL);
105
106 MarkRemoteTransactionCritical(transactionConnection);
107 RemoteTransactionBeginIfNecessary(transactionConnection);
108 ExecuteCriticalRemoteCommand(transactionConnection, command);
109 }
110
111
112 /*
113 * SendCommandToWorkers sends a command to all workers in
114 * parallel. Commands are committed on the workers when the local
115 * transaction commits. The connection are made as the extension
116 * owner to ensure write access to the Citus metadata tables.
117 */
118 void
SendCommandToWorkersWithMetadata(const char * command)119 SendCommandToWorkersWithMetadata(const char *command)
120 {
121 SendCommandToMetadataWorkersParams(command, CurrentUserName(),
122 0, NULL, NULL);
123 }
124
125
126 /*
127 * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
128 * TargetWorkerSet.
129 */
130 List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet,LOCKMODE lockMode)131 TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
132 {
133 List *workerNodeList = NIL;
134 if (targetWorkerSet == ALL_SHARD_NODES)
135 {
136 workerNodeList = ActivePrimaryNodeList(lockMode);
137 }
138 else
139 {
140 workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
141 }
142 List *result = NIL;
143
144 WorkerNode *workerNode = NULL;
145 foreach_ptr(workerNode, workerNodeList)
146 {
147 if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata)
148 {
149 continue;
150 }
151
152 result = lappend(result, workerNode);
153 }
154
155 return result;
156 }
157
158
159 /*
160 * SendBareCommandListToMetadataWorkers sends a list of commands to metadata
161 * workers in serial. Commands are committed immediately: new connections are
162 * always used and no transaction block is used (hence "bare"). The connections
163 * are made as the extension owner to ensure write access to the Citus metadata
164 * tables. Primarly useful for INDEX commands using CONCURRENTLY.
165 */
166 void
SendBareCommandListToMetadataWorkers(List * commandList)167 SendBareCommandListToMetadataWorkers(List *commandList)
168 {
169 TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
170 List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
171 char *nodeUser = CurrentUserName();
172
173 ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
174
175 /* run commands serially */
176 WorkerNode *workerNode = NULL;
177 foreach_ptr(workerNode, workerNodeList)
178 {
179 const char *nodeName = workerNode->workerName;
180 int nodePort = workerNode->workerPort;
181 int connectionFlags = FORCE_NEW_CONNECTION;
182
183 MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
184 nodeName,
185 nodePort,
186 nodeUser, NULL);
187
188 /* iterate over the commands and execute them in the same connection */
189 const char *commandString = NULL;
190 foreach_ptr(commandString, commandList)
191 {
192 ExecuteCriticalRemoteCommand(workerConnection, commandString);
193 }
194
195 CloseConnection(workerConnection);
196 }
197 }
198
199
200 /*
201 * SendCommandToMetadataWorkersParams is a wrapper around
202 * SendCommandToWorkersParamsInternal() enforcing some extra checks.
203 */
204 static void
SendCommandToMetadataWorkersParams(const char * command,const char * user,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues)205 SendCommandToMetadataWorkersParams(const char *command,
206 const char *user, int parameterCount,
207 const Oid *parameterTypes,
208 const char *const *parameterValues)
209 {
210 List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
211 ShareLock);
212
213 ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
214
215 SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
216 parameterCount, parameterTypes,
217 parameterValues);
218 }
219
220
221 /*
222 * SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel.
223 * It does error if there is a problem while sending the query, but it doesn't error
224 * if there is a problem while executing the query.
225 */
226 void
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,const char * command,const char * user)227 SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
228 char *command,
229 const char *user)
230 {
231 bool failOnError = false;
232 SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
233 failOnError);
234 }
235
236
237 /*
238 * SendCommandToWorkersInParallel sends the given command to workers in parallel.
239 * It does error if there is a problem while sending the query, it errors if there
240 * was any problem when sending/receiving.
241 */
242 void
SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet,const char * command,const char * user)243 SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
244 char *command,
245 const char *user)
246 {
247 bool failOnError = true;
248 SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
249 failOnError);
250 }
251
252
253 /*
254 * SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel.
255 */
256 static void
SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,const char * command,const char * user,bool failOnError)257 SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
258 char *command, const char *user, bool
259 failOnError)
260 {
261 List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user);
262
263 /* finish opening connections */
264 FinishConnectionListEstablishment(connectionList);
265
266 /* send commands in parallel */
267 MultiConnection *connection = NULL;
268 foreach_ptr(connection, connectionList)
269 {
270 int querySent = SendRemoteCommand(connection, command);
271 if (failOnError && querySent == 0)
272 {
273 ReportConnectionError(connection, ERROR);
274 }
275 }
276
277 GetConnectionsResults(connectionList, failOnError);
278 }
279
280
281 /*
282 * OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel,
283 * as the given user.
284 */
285 static List *
OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,const char * user)286 OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user)
287 {
288 List *connectionList = NIL;
289
290 List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
291
292 WorkerNode *workerNode = NULL;
293 foreach_ptr(workerNode, workerNodeList)
294 {
295 const char *nodeName = workerNode->workerName;
296 int nodePort = workerNode->workerPort;
297 int32 connectionFlags = OUTSIDE_TRANSACTION;
298
299 MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
300 nodeName, nodePort,
301 user, NULL);
302
303 /*
304 * connection can only be NULL for optional connections, which we don't
305 * support in this codepath.
306 */
307 Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
308 Assert(connection != NULL);
309 connectionList = lappend(connectionList, connection);
310 }
311 return connectionList;
312 }
313
314
315 /*
316 * GetConnectionsResults gets remote command results
317 * for the given connections. It raises any error if failOnError is true.
318 */
319 static void
GetConnectionsResults(List * connectionList,bool failOnError)320 GetConnectionsResults(List *connectionList, bool failOnError)
321 {
322 MultiConnection *connection = NULL;
323 foreach_ptr(connection, connectionList)
324 {
325 bool raiseInterrupt = false;
326 PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
327
328 bool isResponseOK = result != NULL && IsResponseOK(result);
329 if (failOnError && !isResponseOK)
330 {
331 ReportResultError(connection, result, ERROR);
332 }
333
334 PQclear(result);
335
336 if (isResponseOK)
337 {
338 ForgetResults(connection);
339 }
340 }
341 }
342
343
344 /*
345 * SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
346 * Commands are committed on the workers when the local transaction commits. The
347 * connection are made as the extension owner to ensure write access to the Citus
348 * metadata tables. Parameters can be specified as for PQexecParams, except that
349 * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0
350 * respectively.
351 */
352 static void
SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,const char * command,const char * user,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues)353 SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command,
354 const char *user, int parameterCount,
355 const Oid *parameterTypes,
356 const char *const *parameterValues)
357 {
358 List *connectionList = NIL;
359 List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
360
361 UseCoordinatedTransaction();
362 Use2PCForCoordinatedTransaction();
363
364 /* open connections in parallel */
365 WorkerNode *workerNode = NULL;
366 foreach_ptr(workerNode, workerNodeList)
367 {
368 const char *nodeName = workerNode->workerName;
369 int nodePort = workerNode->workerPort;
370 int32 connectionFlags = 0;
371
372 MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
373 nodeName, nodePort,
374 user, NULL);
375
376 /*
377 * connection can only be NULL for optional connections, which we don't
378 * support in this codepath.
379 */
380 Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
381 Assert(connection != NULL);
382 MarkRemoteTransactionCritical(connection);
383
384 connectionList = lappend(connectionList, connection);
385 }
386
387 /* finish opening connections */
388 FinishConnectionListEstablishment(connectionList);
389
390 RemoteTransactionsBeginIfNecessary(connectionList);
391
392 /* send commands in parallel */
393 MultiConnection *connection = NULL;
394 foreach_ptr(connection, connectionList)
395 {
396 int querySent = SendRemoteCommandParams(connection, command, parameterCount,
397 parameterTypes, parameterValues, false);
398 if (querySent == 0)
399 {
400 ReportConnectionError(connection, ERROR);
401 }
402 }
403
404 /* get results */
405 foreach_ptr(connection, connectionList)
406 {
407 PGresult *result = GetRemoteCommandResult(connection, true);
408 if (!IsResponseOK(result))
409 {
410 ReportResultError(connection, result, ERROR);
411 }
412
413 PQclear(result);
414
415 ForgetResults(connection);
416 }
417 }
418
419
420 /*
421 * EnsureNoModificationsHaveBeenDone reports an error if we have performed any
422 * modification in the current transaction to prevent opening a connection is such cases.
423 */
424 void
EnsureNoModificationsHaveBeenDone()425 EnsureNoModificationsHaveBeenDone()
426 {
427 if (XactModificationLevel > XACT_MODIFICATION_NONE)
428 {
429 ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
430 errmsg("cannot open new connections after the first modification "
431 "command within a transaction")));
432 }
433 }
434
435
436 /*
437 * SendCommandListToWorkerOutsideTransaction forces to open a new connection
438 * to the node with the given nodeName and nodePort. Then, the connection starts
439 * a transaction on the remote node and executes the commands in the transaction.
440 * The function raises error if any of the queries fails.
441 */
442 void
SendCommandListToWorkerOutsideTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)443 SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
444 const char *nodeUser, List *commandList)
445 {
446 int connectionFlags = FORCE_NEW_CONNECTION;
447
448 MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
449 nodeName, nodePort,
450 nodeUser, NULL);
451
452 MarkRemoteTransactionCritical(workerConnection);
453 RemoteTransactionBegin(workerConnection);
454
455 /* iterate over the commands and execute them in the same connection */
456 const char *commandString = NULL;
457 foreach_ptr(commandString, commandList)
458 {
459 ExecuteCriticalRemoteCommand(workerConnection, commandString);
460 }
461
462 RemoteTransactionCommit(workerConnection);
463 CloseConnection(workerConnection);
464 }
465
466
467 /*
468 * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
469 * with the given nodeName and nodePort. The commands are sent as part of the
470 * coordinated transaction. Any failures aborts the coordinated transaction.
471 */
472 void
SendCommandListToWorkerInCoordinatedTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)473 SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort,
474 const char *nodeUser, List *commandList)
475 {
476 int connectionFlags = 0;
477
478 UseCoordinatedTransaction();
479
480 MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
481 nodeName, nodePort,
482 nodeUser, NULL);
483
484 MarkRemoteTransactionCritical(workerConnection);
485 RemoteTransactionBeginIfNecessary(workerConnection);
486
487 /* iterate over the commands and execute them in the same connection */
488 const char *commandString = NULL;
489 foreach_ptr(commandString, commandList)
490 {
491 ExecuteCriticalRemoteCommand(workerConnection, commandString);
492 }
493 }
494
495
496 /*
497 * SendOptionalCommandListToWorkerOutsideTransaction sends the given command
498 * list to the given worker in a single transaction that is outside of the
499 * coordinated tranaction. If any of the commands fail, it rollbacks the
500 * transaction, and otherwise commits.
501 */
502 bool
SendOptionalCommandListToWorkerOutsideTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)503 SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
504 const char *nodeUser, List *commandList)
505 {
506 int connectionFlags = FORCE_NEW_CONNECTION;
507 bool failed = false;
508
509 MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
510 nodeName, nodePort,
511 nodeUser, NULL);
512 if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
513 {
514 return false;
515 }
516 RemoteTransactionBegin(workerConnection);
517
518 /* iterate over the commands and execute them in the same connection */
519 const char *commandString = NULL;
520 foreach_ptr(commandString, commandList)
521 {
522 if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != 0)
523 {
524 failed = true;
525 break;
526 }
527 }
528
529 if (failed)
530 {
531 RemoteTransactionAbort(workerConnection);
532 }
533 else
534 {
535 RemoteTransactionCommit(workerConnection);
536 }
537
538 CloseConnection(workerConnection);
539
540 return !failed;
541 }
542
543
544 /*
545 * SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given
546 * command list to the given worker as part of the coordinated transaction.
547 * If any of the commands fail, the function returns false.
548 */
549 bool
SendOptionalCommandListToWorkerInCoordinatedTransaction(const char * nodeName,int32 nodePort,const char * nodeUser,List * commandList)550 SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32
551 nodePort,
552 const char *nodeUser,
553 List *commandList)
554 {
555 int connectionFlags = 0;
556 bool failed = false;
557
558 UseCoordinatedTransaction();
559
560 MultiConnection *workerConnection =
561 GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
562 nodeUser, NULL);
563 if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
564 {
565 return false;
566 }
567
568 RemoteTransactionsBeginIfNecessary(list_make1(workerConnection));
569
570 /* iterate over the commands and execute them in the same connection */
571 const char *commandString = NULL;
572 foreach_ptr(commandString, commandList)
573 {
574 if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) !=
575 RESPONSE_OKAY)
576 {
577 failed = true;
578
579 bool raiseErrors = false;
580 MarkRemoteTransactionFailed(workerConnection, raiseErrors);
581 break;
582 }
583 }
584
585 return !failed;
586 }
587
588
589 /*
590 * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
591 * metadata nodes are out of sync. It is safer to avoid metadata changing
592 * commands (e.g. DDL or node addition) until all metadata nodes have
593 * been synced.
594 *
595 * An example of we could get in a bad situation without doing so is:
596 * 1. Create a reference table
597 * 2. After the node becomes out of sync, add a new active node
598 * 3. Insert into the reference table from the out of sync node
599 *
600 * Since the out-of-sync might not know about the new node, it won't propagate
601 * the changes to the new node and replicas will be in an inconsistent state.
602 */
603 static void
ErrorIfAnyMetadataNodeOutOfSync(List * metadataNodeList)604 ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList)
605 {
606 WorkerNode *metadataNode = NULL;
607 foreach_ptr(metadataNode, metadataNodeList)
608 {
609 Assert(metadataNode->hasMetadata);
610
611 if (!metadataNode->metadataSynced)
612 {
613 const char *workerName = metadataNode->workerName;
614 int workerPort = metadataNode->workerPort;
615 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
616 errmsg("%s:%d is a metadata node, but is out of sync",
617 workerName, workerPort),
618 errhint("If the node is up, wait until metadata"
619 " gets synced to it and try again.")));
620 }
621 }
622 }
623