1 /*-------------------------------------------------------------------------
2  *
3  * remote_commands.c
4  *   Helpers to make it easier to execute command on remote nodes.
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "postgres.h"
12 #include "pgstat.h"
13 
14 #include "libpq-fe.h"
15 
16 #include "distributed/connection_management.h"
17 #include "distributed/errormessage.h"
18 #include "distributed/listutils.h"
19 #include "distributed/log_utils.h"
20 #include "distributed/remote_commands.h"
21 #include "distributed/cancel_utils.h"
22 #include "lib/stringinfo.h"
23 #include "miscadmin.h"
24 #include "storage/latch.h"
25 #include "utils/palloc.h"
26 
27 
28 /*
29  * Setting that controls how many bytes of COPY data libpq is allowed to buffer
30  * internally before we force a flush.
31  */
32 int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
33 
34 
35 /* GUC, determining whether statements sent to remote nodes are logged */
36 bool LogRemoteCommands = false;
37 
38 
39 static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
40 								 bool discardWarnings);
41 static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
42 static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
43 										int totalConnectionCount,
44 										int pendingConnectionsStartIndex);
45 
46 
47 /* simple helpers */
48 
49 /*
50  * IsResponseOK checks whether the result is a successful one.
51  */
52 bool
IsResponseOK(PGresult * result)53 IsResponseOK(PGresult *result)
54 {
55 	ExecStatusType resultStatus = PQresultStatus(result);
56 
57 	if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
58 		resultStatus == PGRES_COMMAND_OK)
59 	{
60 		return true;
61 	}
62 
63 	return false;
64 }
65 
66 
67 /*
68  * ForgetResults clears a connection from pending activity.
69  *
70  * Note that this might require network IO. If that's not acceptable, use
71  * ClearResultsIfReady().
72  *
73  * ClearResults is variant of this function which can also raise errors.
74  */
75 void
ForgetResults(MultiConnection * connection)76 ForgetResults(MultiConnection *connection)
77 {
78 	ClearResults(connection, false);
79 }
80 
81 
82 /*
83  * ClearResultsInternal clears a connection from pending activity,
84  * returns true if all pending commands return success. It raises
85  * error if raiseErrors flag is set, any command fails and transaction
86  * is marked critical.
87  *
88  * Note that this might require network IO. If that's not acceptable, use
89  * ClearResultsIfReady().
90  */
91 bool
ClearResults(MultiConnection * connection,bool raiseErrors)92 ClearResults(MultiConnection *connection, bool raiseErrors)
93 {
94 	return ClearResultsInternal(connection, raiseErrors, false);
95 }
96 
97 
98 /*
99  * ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't
100  * emit warnings.
101  */
102 bool
ClearResultsDiscardWarnings(MultiConnection * connection,bool raiseErrors)103 ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors)
104 {
105 	return ClearResultsInternal(connection, raiseErrors, true);
106 }
107 
108 
109 /*
110  * ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings.
111  */
112 static bool
ClearResultsInternal(MultiConnection * connection,bool raiseErrors,bool discardWarnings)113 ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings)
114 {
115 	bool success = true;
116 
117 	while (true)
118 	{
119 		PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
120 		if (result == NULL)
121 		{
122 			break;
123 		}
124 
125 		/*
126 		 * End any pending copy operation. Transaction will be marked
127 		 * as failed by the following part.
128 		 */
129 		if (PQresultStatus(result) == PGRES_COPY_IN)
130 		{
131 			PQputCopyEnd(connection->pgConn, NULL);
132 		}
133 
134 		if (!IsResponseOK(result))
135 		{
136 			if (!discardWarnings)
137 			{
138 				ReportResultError(connection, result, WARNING);
139 			}
140 
141 			MarkRemoteTransactionFailed(connection, raiseErrors);
142 
143 			success = false;
144 
145 			/* an error happened, there is nothing we can do more */
146 			if (PQresultStatus(result) == PGRES_FATAL_ERROR)
147 			{
148 				PQclear(result);
149 
150 				break;
151 			}
152 		}
153 
154 		PQclear(result);
155 	}
156 
157 	return success;
158 }
159 
160 
161 /*
162  * ClearResultsIfReady clears a connection from pending activity if doing
163  * so does not require network IO. Returns true if successful, false
164  * otherwise.
165  */
166 bool
ClearResultsIfReady(MultiConnection * connection)167 ClearResultsIfReady(MultiConnection *connection)
168 {
169 	PGconn *pgConn = connection->pgConn;
170 
171 	if (PQstatus(pgConn) != CONNECTION_OK)
172 	{
173 		return false;
174 	}
175 
176 	Assert(PQisnonblocking(pgConn));
177 
178 	while (true)
179 	{
180 		/*
181 		 * If busy, there might still be results already received and buffered
182 		 * by the OS. As connection is in non-blocking mode, we can check for
183 		 * that without blocking.
184 		 */
185 		if (PQisBusy(pgConn))
186 		{
187 			if (PQflush(pgConn) == -1)
188 			{
189 				/* write failed */
190 				return false;
191 			}
192 			if (PQconsumeInput(pgConn) == 0)
193 			{
194 				/* some low-level failure */
195 				return false;
196 			}
197 		}
198 
199 		/* clearing would require blocking IO, return */
200 		if (PQisBusy(pgConn))
201 		{
202 			return false;
203 		}
204 
205 		PGresult *result = PQgetResult(pgConn);
206 		if (result == NULL)
207 		{
208 			/* no more results available */
209 			return true;
210 		}
211 
212 		ExecStatusType resultStatus = PQresultStatus(result);
213 
214 		/* only care about the status, can clear now */
215 		PQclear(result);
216 
217 		if (resultStatus == PGRES_COPY_IN || resultStatus == PGRES_COPY_OUT)
218 		{
219 			/* in copy, can't reliably recover without blocking */
220 			return false;
221 		}
222 
223 		if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
224 			  resultStatus == PGRES_COMMAND_OK))
225 		{
226 			/* an error occurred just when we were aborting */
227 			return false;
228 		}
229 
230 		/* check if there are more results to consume */
231 	}
232 
233 	pg_unreachable();
234 }
235 
236 
237 /* report errors & warnings */
238 
239 /*
240  * Report libpq failure that's not associated with a result.
241  */
242 void
ReportConnectionError(MultiConnection * connection,int elevel)243 ReportConnectionError(MultiConnection *connection, int elevel)
244 {
245 	char *nodeName = connection->hostname;
246 	int nodePort = connection->port;
247 	PGconn *pgConn = connection->pgConn;
248 	char *messageDetail = NULL;
249 
250 	if (pgConn != NULL)
251 	{
252 		messageDetail = pchomp(PQerrorMessage(pgConn));
253 	}
254 
255 	if (messageDetail)
256 	{
257 		/*
258 		 * We don't use ApplyLogRedaction(messageDetail) as we expect any error
259 		 * detail that requires log reduction should have done it locally.
260 		 */
261 		ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
262 						 errmsg("connection to the remote node %s:%d failed with the "
263 								"following error: %s", nodeName, nodePort,
264 								messageDetail)));
265 	}
266 	else
267 	{
268 		ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
269 						 errmsg("connection to the remote node %s:%d failed",
270 								nodeName, nodePort)));
271 	}
272 }
273 
274 
275 /*
276  * ReportResultError reports libpq failure associated with a result.
277  */
278 void
ReportResultError(MultiConnection * connection,PGresult * result,int elevel)279 ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
280 {
281 	/* we release PQresult when throwing an error because the caller can't */
282 	PG_TRY();
283 	{
284 		char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
285 		char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
286 		char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
287 		char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
288 		char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
289 
290 		char *nodeName = connection->hostname;
291 		int nodePort = connection->port;
292 		int sqlState = ERRCODE_INTERNAL_ERROR;
293 
294 		if (sqlStateString != NULL)
295 		{
296 			sqlState = MAKE_SQLSTATE(sqlStateString[0],
297 									 sqlStateString[1],
298 									 sqlStateString[2],
299 									 sqlStateString[3],
300 									 sqlStateString[4]);
301 		}
302 
303 		/*
304 		 * If the PGresult did not contain a message, the connection may provide a
305 		 * suitable top level one. At worst, this is an empty string.
306 		 */
307 		if (messagePrimary == NULL)
308 		{
309 			messagePrimary = pchomp(PQerrorMessage(connection->pgConn));
310 		}
311 
312 		ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
313 						 messageDetail ?
314 						 errdetail("%s", ApplyLogRedaction(messageDetail)) : 0,
315 						 messageHint ? errhint("%s", messageHint) : 0,
316 						 messageContext ? errcontext("%s", messageContext) : 0,
317 						 errcontext("while executing command on %s:%d",
318 									nodeName, nodePort)));
319 	}
320 	PG_CATCH();
321 	{
322 		PQclear(result);
323 		PG_RE_THROW();
324 	}
325 	PG_END_TRY();
326 }
327 
328 
329 /* *INDENT-ON* */
330 
331 
332 /*
333  * LogRemoteCommand logs commands send to remote nodes if
334  * citus.log_remote_commands wants us to do so.
335  */
336 void
LogRemoteCommand(MultiConnection * connection,const char * command)337 LogRemoteCommand(MultiConnection *connection, const char *command)
338 {
339 	if (!LogRemoteCommands)
340 	{
341 		return;
342 	}
343 
344 	ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
345 					 errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
346 							   connection->hostname,
347 							   connection->port, connection->connectionId)));
348 }
349 
350 
351 /* wrappers around libpq functions, with command logging support */
352 
353 
354 /*
355  * ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every
356  * command in the commandList.
357  */
358 void
ExecuteCriticalRemoteCommandList(MultiConnection * connection,List * commandList)359 ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList)
360 {
361 	const char *command = NULL;
362 	foreach_ptr(command, commandList)
363 	{
364 		ExecuteCriticalRemoteCommand(connection, command);
365 	}
366 }
367 
368 
369 /*
370  * ExecuteCriticalRemoteCommand executes a remote command that is critical
371  * to the transaction. If the command fails then the transaction aborts.
372  */
373 void
ExecuteCriticalRemoteCommand(MultiConnection * connection,const char * command)374 ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
375 {
376 	bool raiseInterrupts = true;
377 
378 	int querySent = SendRemoteCommand(connection, command);
379 	if (querySent == 0)
380 	{
381 		ReportConnectionError(connection, ERROR);
382 	}
383 
384 	PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
385 	if (!IsResponseOK(result))
386 	{
387 		ReportResultError(connection, result, ERROR);
388 	}
389 
390 	PQclear(result);
391 	ForgetResults(connection);
392 }
393 
394 
395 /*
396  * ExecuteOptionalRemoteCommand executes a remote command. If the command fails a WARNING
397  * is emitted but execution continues.
398  *
399  * could return 0, QUERY_SEND_FAILED, or RESPONSE_NOT_OKAY
400  * result is only set if there was no error
401  */
402 int
ExecuteOptionalRemoteCommand(MultiConnection * connection,const char * command,PGresult ** result)403 ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
404 							 PGresult **result)
405 {
406 	bool raiseInterrupts = true;
407 
408 	int querySent = SendRemoteCommand(connection, command);
409 	if (querySent == 0)
410 	{
411 		ReportConnectionError(connection, WARNING);
412 		return QUERY_SEND_FAILED;
413 	}
414 
415 	PGresult *localResult = GetRemoteCommandResult(connection, raiseInterrupts);
416 	if (!IsResponseOK(localResult))
417 	{
418 		ReportResultError(connection, localResult, WARNING);
419 		PQclear(localResult);
420 		ForgetResults(connection);
421 		return RESPONSE_NOT_OKAY;
422 	}
423 
424 	/*
425 	 * store result if result has been set, when the user is not interested in the result
426 	 * a NULL pointer could be passed and the result will be cleared.
427 	 */
428 	if (result != NULL)
429 	{
430 		*result = localResult;
431 	}
432 	else
433 	{
434 		PQclear(localResult);
435 		ForgetResults(connection);
436 	}
437 
438 	return RESPONSE_OKAY;
439 }
440 
441 
442 /*
443  * SendRemoteCommandParams is a PQsendQueryParams wrapper that logs remote commands,
444  * and accepts a MultiConnection instead of a plain PGconn. It makes sure it can
445  * send commands asynchronously without blocking (at the potential expense of
446  * an additional memory allocation). The command string can only include a single
447  * command since PQsendQueryParams() supports only that.
448  */
449 int
SendRemoteCommandParams(MultiConnection * connection,const char * command,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues,bool binaryResults)450 SendRemoteCommandParams(MultiConnection *connection, const char *command,
451 						int parameterCount, const Oid *parameterTypes,
452 						const char *const *parameterValues, bool binaryResults)
453 {
454 	PGconn *pgConn = connection->pgConn;
455 
456 	LogRemoteCommand(connection, command);
457 
458 	/*
459 	 * Don't try to send command if connection is entirely gone
460 	 * (PQisnonblocking() would crash).
461 	 */
462 	if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
463 	{
464 		return 0;
465 	}
466 
467 	Assert(PQisnonblocking(pgConn));
468 
469 	int rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
470 							   parameterValues, NULL, NULL, binaryResults ? 1 : 0);
471 
472 	return rc;
473 }
474 
475 
476 /*
477  * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
478  * accepts a MultiConnection instead of a plain PGconn. It makes sure it can
479  * send commands asynchronously without blocking (at the potential expense of
480  * an additional memory allocation). The command string can include multiple
481  * commands since PQsendQuery() supports that.
482  */
483 int
SendRemoteCommand(MultiConnection * connection,const char * command)484 SendRemoteCommand(MultiConnection *connection, const char *command)
485 {
486 	PGconn *pgConn = connection->pgConn;
487 
488 	LogRemoteCommand(connection, command);
489 
490 	/*
491 	 * Don't try to send command if connection is entirely gone
492 	 * (PQisnonblocking() would crash).
493 	 */
494 	if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
495 	{
496 		return 0;
497 	}
498 
499 	Assert(PQisnonblocking(pgConn));
500 
501 	int rc = PQsendQuery(pgConn, command);
502 
503 	return rc;
504 }
505 
506 
507 /*
508  * ReadFirstColumnAsText reads the first column of result tuples from the given
509  * PGresult struct and returns them in a StringInfo list.
510  */
511 List *
ReadFirstColumnAsText(PGresult * queryResult)512 ReadFirstColumnAsText(PGresult *queryResult)
513 {
514 	List *resultRowList = NIL;
515 	const int columnIndex = 0;
516 	int64 rowCount = 0;
517 
518 	ExecStatusType status = PQresultStatus(queryResult);
519 	if (status == PGRES_TUPLES_OK)
520 	{
521 		rowCount = PQntuples(queryResult);
522 	}
523 
524 	for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
525 	{
526 		char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
527 
528 		StringInfo rowValueString = makeStringInfo();
529 		appendStringInfoString(rowValueString, rowValue);
530 
531 		resultRowList = lappend(resultRowList, rowValueString);
532 	}
533 
534 	return resultRowList;
535 }
536 
537 
538 /*
539  * GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
540  *
541  * If raiseInterrupts is true and an interrupt arrives, e.g. the query is
542  * being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws
543  * an error.
544  *
545  * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
546  * an error, GetRemoteCommandResult returns NULL, and the transaction is
547  * marked as having failed. While that's not a perfect way to signal failure,
548  * callers will usually treat that as an error, and it's easy to use.
549  *
550  * Handling of interrupts is important to allow queries being cancelled while
551  * waiting on remote nodes. In a distributed deadlock scenario cancelling
552  * might be the only way to resolve the deadlock.
553  */
554 PGresult *
GetRemoteCommandResult(MultiConnection * connection,bool raiseInterrupts)555 GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
556 {
557 	PGconn *pgConn = connection->pgConn;
558 
559 	/*
560 	 * Short circuit tests around the more expensive parts of this
561 	 * routine. This'd also trigger a return in the, unlikely, case of a
562 	 * failed/nonexistant connection.
563 	 */
564 	if (!PQisBusy(pgConn))
565 	{
566 		return PQgetResult(connection->pgConn);
567 	}
568 
569 	if (!FinishConnectionIO(connection, raiseInterrupts))
570 	{
571 		/* some error(s) happened while doing the I/O, signal the callers */
572 		if (PQstatus(pgConn) == CONNECTION_BAD)
573 		{
574 			return PQmakeEmptyPGresult(pgConn, PGRES_FATAL_ERROR);
575 		}
576 
577 		return NULL;
578 	}
579 
580 	/* no IO should be necessary to get result */
581 	Assert(!PQisBusy(pgConn));
582 
583 	PGresult *result = PQgetResult(connection->pgConn);
584 
585 	return result;
586 }
587 
588 
589 /*
590  * PutRemoteCopyData is a wrapper around PQputCopyData() that handles
591  * interrupts.
592  *
593  * Returns false if PQputCopyData() failed, true otherwise.
594  */
595 bool
PutRemoteCopyData(MultiConnection * connection,const char * buffer,int nbytes)596 PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
597 {
598 	PGconn *pgConn = connection->pgConn;
599 	bool allowInterrupts = true;
600 
601 	if (PQstatus(pgConn) != CONNECTION_OK)
602 	{
603 		return false;
604 	}
605 
606 	Assert(PQisnonblocking(pgConn));
607 
608 	int copyState = PQputCopyData(pgConn, buffer, nbytes);
609 	if (copyState == -1)
610 	{
611 		return false;
612 	}
613 
614 	/*
615 	 * PQputCopyData may have queued up part of the data even if it managed
616 	 * to send some of it succesfully. We provide back pressure by waiting
617 	 * until the socket is writable to prevent the internal libpq buffers
618 	 * from growing excessively.
619 	 *
620 	 * We currently allow the internal buffer to grow to 8MB before
621 	 * providing back pressure based on experimentation that showed
622 	 * throughput get worse at 4MB and lower due to the number of CPU
623 	 * cycles spent in networking system calls.
624 	 */
625 
626 	connection->copyBytesWrittenSinceLastFlush += nbytes;
627 	if (connection->copyBytesWrittenSinceLastFlush > RemoteCopyFlushThreshold)
628 	{
629 		connection->copyBytesWrittenSinceLastFlush = 0;
630 		return FinishConnectionIO(connection, allowInterrupts);
631 	}
632 
633 	return true;
634 }
635 
636 
637 /*
638  * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles
639  * interrupts.
640  *
641  * Returns false if PQputCopyEnd() failed, true otherwise.
642  */
643 bool
PutRemoteCopyEnd(MultiConnection * connection,const char * errormsg)644 PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
645 {
646 	PGconn *pgConn = connection->pgConn;
647 	bool allowInterrupts = true;
648 
649 	if (PQstatus(pgConn) != CONNECTION_OK)
650 	{
651 		return false;
652 	}
653 
654 	Assert(PQisnonblocking(pgConn));
655 
656 	int copyState = PQputCopyEnd(pgConn, errormsg);
657 	if (copyState == -1)
658 	{
659 		return false;
660 	}
661 
662 	/* see PutRemoteCopyData() */
663 
664 	connection->copyBytesWrittenSinceLastFlush = 0;
665 
666 	return FinishConnectionIO(connection, allowInterrupts);
667 }
668 
669 
670 /*
671  * FinishConnectionIO performs pending IO for the connection, while accepting
672  * interrupts.
673  *
674  * See GetRemoteCommandResult() for documentation of interrupt handling
675  * behaviour.
676  *
677  * Returns true if IO was successfully completed, false otherwise.
678  */
679 static bool
FinishConnectionIO(MultiConnection * connection,bool raiseInterrupts)680 FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
681 {
682 	PGconn *pgConn = connection->pgConn;
683 	int sock = PQsocket(pgConn);
684 
685 	Assert(pgConn);
686 	Assert(PQisnonblocking(pgConn));
687 
688 	if (raiseInterrupts)
689 	{
690 		CHECK_FOR_INTERRUPTS();
691 	}
692 
693 	/* perform the necessary IO */
694 	while (true)
695 	{
696 		int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
697 
698 		/* try to send all pending data */
699 		int sendStatus = PQflush(pgConn);
700 
701 		/* if sending failed, there's nothing more we can do */
702 		if (sendStatus == -1)
703 		{
704 			return false;
705 		}
706 		else if (sendStatus == 1)
707 		{
708 			waitFlags |= WL_SOCKET_WRITEABLE;
709 		}
710 
711 		/* if reading fails, there's not much we can do */
712 		if (PQconsumeInput(pgConn) == 0)
713 		{
714 			return false;
715 		}
716 		if (PQisBusy(pgConn))
717 		{
718 			waitFlags |= WL_SOCKET_READABLE;
719 		}
720 
721 		if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0)
722 		{
723 			/* no IO necessary anymore, we're done */
724 			return true;
725 		}
726 
727 		int rc = WaitLatchOrSocket(MyLatch, waitFlags, sock, 0, PG_WAIT_EXTENSION);
728 		if (rc & WL_POSTMASTER_DEATH)
729 		{
730 			ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
731 		}
732 
733 		if (rc & WL_LATCH_SET)
734 		{
735 			ResetLatch(MyLatch);
736 
737 			/* if allowed raise errors */
738 			if (raiseInterrupts)
739 			{
740 				CHECK_FOR_INTERRUPTS();
741 			}
742 
743 			/*
744 			 * If raising errors allowed, or called within in a section with
745 			 * interrupts held, return instead, and mark the transaction as
746 			 * failed.
747 			 */
748 			if (IsHoldOffCancellationReceived())
749 			{
750 				connection->remoteTransaction.transactionFailed = true;
751 				break;
752 			}
753 		}
754 	}
755 
756 	return false;
757 }
758 
759 
760 /*
761  * WaitForAllConnections blocks until all connections in the list are no
762  * longer busy, meaning the pending command has either finished or failed.
763  */
764 void
WaitForAllConnections(List * connectionList,bool raiseInterrupts)765 WaitForAllConnections(List *connectionList, bool raiseInterrupts)
766 {
767 	int totalConnectionCount = list_length(connectionList);
768 	int pendingConnectionsStartIndex = 0;
769 	int connectionIndex = 0;
770 
771 	MultiConnection **allConnections =
772 		palloc(totalConnectionCount * sizeof(MultiConnection *));
773 	WaitEvent *events = palloc(totalConnectionCount * sizeof(WaitEvent));
774 	bool *connectionReady = palloc(totalConnectionCount * sizeof(bool));
775 	WaitEventSet *waitEventSet = NULL;
776 
777 	/* convert connection list to an array such that we can move items around */
778 	MultiConnection *connectionItem = NULL;
779 	foreach_ptr(connectionItem, connectionList)
780 	{
781 		allConnections[connectionIndex] = connectionItem;
782 		connectionReady[connectionIndex] = false;
783 		connectionIndex++;
784 	}
785 
786 	/* make an initial pass to check for failed and idle connections */
787 	for (connectionIndex = 0; connectionIndex < totalConnectionCount; connectionIndex++)
788 	{
789 		MultiConnection *connection = allConnections[connectionIndex];
790 
791 		if (PQstatus(connection->pgConn) == CONNECTION_BAD ||
792 			!PQisBusy(connection->pgConn))
793 		{
794 			/* connection is already done; keep non-ready connections at the end */
795 			allConnections[connectionIndex] =
796 				allConnections[pendingConnectionsStartIndex];
797 			pendingConnectionsStartIndex++;
798 		}
799 	}
800 
801 	PG_TRY();
802 	{
803 		bool rebuildWaitEventSet = true;
804 
805 		while (pendingConnectionsStartIndex < totalConnectionCount)
806 		{
807 			bool cancellationReceived = false;
808 			int eventIndex = 0;
809 			long timeout = -1;
810 			int pendingConnectionCount = totalConnectionCount -
811 										 pendingConnectionsStartIndex;
812 
813 			/* rebuild the WaitEventSet whenever connections are ready */
814 			if (rebuildWaitEventSet)
815 			{
816 				if (waitEventSet != NULL)
817 				{
818 					FreeWaitEventSet(waitEventSet);
819 				}
820 
821 				waitEventSet = BuildWaitEventSet(allConnections, totalConnectionCount,
822 												 pendingConnectionsStartIndex);
823 
824 				rebuildWaitEventSet = false;
825 			}
826 
827 			/* wait for I/O events */
828 			int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
829 											  pendingConnectionCount,
830 											  WAIT_EVENT_CLIENT_READ);
831 
832 			/* process I/O events */
833 			for (; eventIndex < eventCount; eventIndex++)
834 			{
835 				WaitEvent *event = &events[eventIndex];
836 				bool connectionIsReady = false;
837 
838 				if (event->events & WL_POSTMASTER_DEATH)
839 				{
840 					ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
841 				}
842 
843 				if (event->events & WL_LATCH_SET)
844 				{
845 					ResetLatch(MyLatch);
846 
847 					if (raiseInterrupts)
848 					{
849 						CHECK_FOR_INTERRUPTS();
850 					}
851 
852 					if (IsHoldOffCancellationReceived())
853 					{
854 						/*
855 						 * Break out of event loop immediately in case of cancellation.
856 						 * We cannot use "return" here inside a PG_TRY() block since
857 						 * then the exception stack won't be reset.
858 						 */
859 						cancellationReceived = true;
860 						break;
861 					}
862 
863 					continue;
864 				}
865 
866 				MultiConnection *connection = (MultiConnection *) event->user_data;
867 
868 				if (event->events & WL_SOCKET_WRITEABLE)
869 				{
870 					int sendStatus = PQflush(connection->pgConn);
871 					if (sendStatus == -1)
872 					{
873 						/* send failed, done with this connection */
874 						connectionIsReady = true;
875 					}
876 					else if (sendStatus == 0)
877 					{
878 						/* done writing, only wait for read events */
879 						ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE,
880 										NULL);
881 					}
882 				}
883 
884 				/*
885 				 * Check whether the connection is done is the socket is either readable
886 				 * or writable. If it was only writable, we performed a PQflush which
887 				 * might have read from the socket, meaning we may not see the socket
888 				 * becoming readable again, so better to check it now.
889 				 */
890 				if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
891 				{
892 					int receiveStatus = PQconsumeInput(connection->pgConn);
893 					if (receiveStatus == 0)
894 					{
895 						/* receive failed, done with this connection */
896 						connectionIsReady = true;
897 					}
898 					else if (!PQisBusy(connection->pgConn))
899 					{
900 						/* result was received */
901 						connectionIsReady = true;
902 					}
903 				}
904 
905 				if (connectionIsReady)
906 				{
907 					/*
908 					 * All pending connections are kept at the end of the allConnections
909 					 * array and the connectionReady array matches the allConnections
910 					 * array. The wait event set corresponds to the pending connections
911 					 * subarray so we can get the index in the allConnections array by
912 					 * taking the event index + the offset of the subarray.
913 					 */
914 					connectionIndex = event->pos + pendingConnectionsStartIndex;
915 
916 					connectionReady[connectionIndex] = true;
917 
918 					/*
919 					 * When a connection is ready, we should build a new wait event
920 					 * set that excludes this connection.
921 					 */
922 					rebuildWaitEventSet = true;
923 				}
924 			}
925 
926 			if (cancellationReceived)
927 			{
928 				break;
929 			}
930 
931 			/* move non-ready connections to the back of the array */
932 			for (connectionIndex = pendingConnectionsStartIndex;
933 				 connectionIndex < totalConnectionCount; connectionIndex++)
934 			{
935 				if (connectionReady[connectionIndex])
936 				{
937 					/*
938 					 * Replace the ready connection with a connection from
939 					 * the start of the pending connections subarray. This
940 					 * may be the connection itself, in which case this is
941 					 * a noop.
942 					 */
943 					allConnections[connectionIndex] =
944 						allConnections[pendingConnectionsStartIndex];
945 
946 					/* offset of the pending connections subarray is now 1 higher */
947 					pendingConnectionsStartIndex++;
948 
949 					/*
950 					 * We've moved a pending connection into this position,
951 					 * so we must reset the ready flag. Otherwise, we'd
952 					 * falsely interpret it as ready in the next round.
953 					 */
954 					connectionReady[connectionIndex] = false;
955 				}
956 			}
957 		}
958 
959 		if (waitEventSet != NULL)
960 		{
961 			FreeWaitEventSet(waitEventSet);
962 			waitEventSet = NULL;
963 		}
964 
965 		pfree(allConnections);
966 		pfree(events);
967 		pfree(connectionReady);
968 	}
969 	PG_CATCH();
970 	{
971 		/* make sure the epoll file descriptor is always closed */
972 		if (waitEventSet != NULL)
973 		{
974 			FreeWaitEventSet(waitEventSet);
975 			waitEventSet = NULL;
976 		}
977 
978 		pfree(allConnections);
979 		pfree(events);
980 		pfree(connectionReady);
981 
982 		PG_RE_THROW();
983 	}
984 	PG_END_TRY();
985 }
986 
987 
988 /*
989  * BuildWaitEventSet creates a WaitEventSet for the given array of connections
990  * which can be used to wait for any of the sockets to become read-ready or
991  * write-ready.
992  */
993 static WaitEventSet *
BuildWaitEventSet(MultiConnection ** allConnections,int totalConnectionCount,int pendingConnectionsStartIndex)994 BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
995 				  int pendingConnectionsStartIndex)
996 {
997 	int pendingConnectionCount = totalConnectionCount - pendingConnectionsStartIndex;
998 
999 	/*
1000 	 * subtract 3 to make room for WL_POSTMASTER_DEATH, WL_LATCH_SET, and
1001 	 * pgwin32_signal_event.
1002 	 */
1003 	if (pendingConnectionCount > FD_SETSIZE - 3)
1004 	{
1005 		pendingConnectionCount = FD_SETSIZE - 3;
1006 	}
1007 
1008 	/* allocate pending connections + 2 for the signal latch and postmaster death */
1009 	/* (CreateWaitEventSet makes room for pgwin32_signal_event automatically) */
1010 	WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext,
1011 													pendingConnectionCount + 2);
1012 
1013 	for (int connectionIndex = 0; connectionIndex < pendingConnectionCount;
1014 		 connectionIndex++)
1015 	{
1016 		MultiConnection *connection = allConnections[pendingConnectionsStartIndex +
1017 													 connectionIndex];
1018 		int sock = PQsocket(connection->pgConn);
1019 
1020 		/*
1021 		 * Always start by polling for both readability (server sent bytes)
1022 		 * and writeability (server is ready to receive bytes).
1023 		 */
1024 		int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
1025 
1026 		AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection);
1027 	}
1028 
1029 	/*
1030 	 * Put the wait events for the signal latch and postmaster death at the end such that
1031 	 * event index + pendingConnectionsStartIndex = the connection index in the array.
1032 	 */
1033 	AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
1034 	AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
1035 
1036 	return waitEventSet;
1037 }
1038 
1039 
1040 /*
1041  * SendCancelationRequest sends a cancelation request on the given connection.
1042  * Return value indicates whether the cancelation request was sent successfully.
1043  */
1044 bool
SendCancelationRequest(MultiConnection * connection)1045 SendCancelationRequest(MultiConnection *connection)
1046 {
1047 	char errorBuffer[ERROR_BUFFER_SIZE] = { 0 };
1048 
1049 	PGcancel *cancelObject = PQgetCancel(connection->pgConn);
1050 	if (cancelObject == NULL)
1051 	{
1052 		/* this can happen if connection is invalid */
1053 		return false;
1054 	}
1055 
1056 	bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
1057 	if (!cancelSent)
1058 	{
1059 		ereport(WARNING, (errmsg("could not issue cancel request"),
1060 						  errdetail("Client error: %s", errorBuffer)));
1061 	}
1062 
1063 	PQfreeCancel(cancelObject);
1064 
1065 	return cancelSent;
1066 }
1067