1 /*-------------------------------------------------------------------------
2 *
3 * remote_commands.c
4 * Helpers to make it easier to execute command on remote nodes.
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12 #include "pgstat.h"
13
14 #include "libpq-fe.h"
15
16 #include "distributed/connection_management.h"
17 #include "distributed/errormessage.h"
18 #include "distributed/listutils.h"
19 #include "distributed/log_utils.h"
20 #include "distributed/remote_commands.h"
21 #include "distributed/cancel_utils.h"
22 #include "lib/stringinfo.h"
23 #include "miscadmin.h"
24 #include "storage/latch.h"
25 #include "utils/palloc.h"
26
27
28 /*
29 * Setting that controls how many bytes of COPY data libpq is allowed to buffer
30 * internally before we force a flush.
31 */
32 int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
33
34
35 /* GUC, determining whether statements sent to remote nodes are logged */
36 bool LogRemoteCommands = false;
37
38
39 static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
40 bool discardWarnings);
41 static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
42 static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
43 int totalConnectionCount,
44 int pendingConnectionsStartIndex);
45
46
47 /* simple helpers */
48
49 /*
50 * IsResponseOK checks whether the result is a successful one.
51 */
52 bool
IsResponseOK(PGresult * result)53 IsResponseOK(PGresult *result)
54 {
55 ExecStatusType resultStatus = PQresultStatus(result);
56
57 if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
58 resultStatus == PGRES_COMMAND_OK)
59 {
60 return true;
61 }
62
63 return false;
64 }
65
66
67 /*
68 * ForgetResults clears a connection from pending activity.
69 *
70 * Note that this might require network IO. If that's not acceptable, use
71 * ClearResultsIfReady().
72 *
73 * ClearResults is variant of this function which can also raise errors.
74 */
75 void
ForgetResults(MultiConnection * connection)76 ForgetResults(MultiConnection *connection)
77 {
78 ClearResults(connection, false);
79 }
80
81
82 /*
83 * ClearResultsInternal clears a connection from pending activity,
84 * returns true if all pending commands return success. It raises
85 * error if raiseErrors flag is set, any command fails and transaction
86 * is marked critical.
87 *
88 * Note that this might require network IO. If that's not acceptable, use
89 * ClearResultsIfReady().
90 */
91 bool
ClearResults(MultiConnection * connection,bool raiseErrors)92 ClearResults(MultiConnection *connection, bool raiseErrors)
93 {
94 return ClearResultsInternal(connection, raiseErrors, false);
95 }
96
97
98 /*
99 * ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't
100 * emit warnings.
101 */
102 bool
ClearResultsDiscardWarnings(MultiConnection * connection,bool raiseErrors)103 ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors)
104 {
105 return ClearResultsInternal(connection, raiseErrors, true);
106 }
107
108
109 /*
110 * ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings.
111 */
112 static bool
ClearResultsInternal(MultiConnection * connection,bool raiseErrors,bool discardWarnings)113 ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings)
114 {
115 bool success = true;
116
117 while (true)
118 {
119 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
120 if (result == NULL)
121 {
122 break;
123 }
124
125 /*
126 * End any pending copy operation. Transaction will be marked
127 * as failed by the following part.
128 */
129 if (PQresultStatus(result) == PGRES_COPY_IN)
130 {
131 PQputCopyEnd(connection->pgConn, NULL);
132 }
133
134 if (!IsResponseOK(result))
135 {
136 if (!discardWarnings)
137 {
138 ReportResultError(connection, result, WARNING);
139 }
140
141 MarkRemoteTransactionFailed(connection, raiseErrors);
142
143 success = false;
144
145 /* an error happened, there is nothing we can do more */
146 if (PQresultStatus(result) == PGRES_FATAL_ERROR)
147 {
148 PQclear(result);
149
150 break;
151 }
152 }
153
154 PQclear(result);
155 }
156
157 return success;
158 }
159
160
161 /*
162 * ClearResultsIfReady clears a connection from pending activity if doing
163 * so does not require network IO. Returns true if successful, false
164 * otherwise.
165 */
166 bool
ClearResultsIfReady(MultiConnection * connection)167 ClearResultsIfReady(MultiConnection *connection)
168 {
169 PGconn *pgConn = connection->pgConn;
170
171 if (PQstatus(pgConn) != CONNECTION_OK)
172 {
173 return false;
174 }
175
176 Assert(PQisnonblocking(pgConn));
177
178 while (true)
179 {
180 /*
181 * If busy, there might still be results already received and buffered
182 * by the OS. As connection is in non-blocking mode, we can check for
183 * that without blocking.
184 */
185 if (PQisBusy(pgConn))
186 {
187 if (PQflush(pgConn) == -1)
188 {
189 /* write failed */
190 return false;
191 }
192 if (PQconsumeInput(pgConn) == 0)
193 {
194 /* some low-level failure */
195 return false;
196 }
197 }
198
199 /* clearing would require blocking IO, return */
200 if (PQisBusy(pgConn))
201 {
202 return false;
203 }
204
205 PGresult *result = PQgetResult(pgConn);
206 if (result == NULL)
207 {
208 /* no more results available */
209 return true;
210 }
211
212 ExecStatusType resultStatus = PQresultStatus(result);
213
214 /* only care about the status, can clear now */
215 PQclear(result);
216
217 if (resultStatus == PGRES_COPY_IN || resultStatus == PGRES_COPY_OUT)
218 {
219 /* in copy, can't reliably recover without blocking */
220 return false;
221 }
222
223 if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
224 resultStatus == PGRES_COMMAND_OK))
225 {
226 /* an error occurred just when we were aborting */
227 return false;
228 }
229
230 /* check if there are more results to consume */
231 }
232
233 pg_unreachable();
234 }
235
236
237 /* report errors & warnings */
238
239 /*
240 * Report libpq failure that's not associated with a result.
241 */
242 void
ReportConnectionError(MultiConnection * connection,int elevel)243 ReportConnectionError(MultiConnection *connection, int elevel)
244 {
245 char *nodeName = connection->hostname;
246 int nodePort = connection->port;
247 PGconn *pgConn = connection->pgConn;
248 char *messageDetail = NULL;
249
250 if (pgConn != NULL)
251 {
252 messageDetail = pchomp(PQerrorMessage(pgConn));
253 }
254
255 if (messageDetail)
256 {
257 /*
258 * We don't use ApplyLogRedaction(messageDetail) as we expect any error
259 * detail that requires log reduction should have done it locally.
260 */
261 ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
262 errmsg("connection to the remote node %s:%d failed with the "
263 "following error: %s", nodeName, nodePort,
264 messageDetail)));
265 }
266 else
267 {
268 ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
269 errmsg("connection to the remote node %s:%d failed",
270 nodeName, nodePort)));
271 }
272 }
273
274
275 /*
276 * ReportResultError reports libpq failure associated with a result.
277 */
278 void
ReportResultError(MultiConnection * connection,PGresult * result,int elevel)279 ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
280 {
281 /* we release PQresult when throwing an error because the caller can't */
282 PG_TRY();
283 {
284 char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
285 char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
286 char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
287 char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
288 char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
289
290 char *nodeName = connection->hostname;
291 int nodePort = connection->port;
292 int sqlState = ERRCODE_INTERNAL_ERROR;
293
294 if (sqlStateString != NULL)
295 {
296 sqlState = MAKE_SQLSTATE(sqlStateString[0],
297 sqlStateString[1],
298 sqlStateString[2],
299 sqlStateString[3],
300 sqlStateString[4]);
301 }
302
303 /*
304 * If the PGresult did not contain a message, the connection may provide a
305 * suitable top level one. At worst, this is an empty string.
306 */
307 if (messagePrimary == NULL)
308 {
309 messagePrimary = pchomp(PQerrorMessage(connection->pgConn));
310 }
311
312 ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
313 messageDetail ?
314 errdetail("%s", ApplyLogRedaction(messageDetail)) : 0,
315 messageHint ? errhint("%s", messageHint) : 0,
316 messageContext ? errcontext("%s", messageContext) : 0,
317 errcontext("while executing command on %s:%d",
318 nodeName, nodePort)));
319 }
320 PG_CATCH();
321 {
322 PQclear(result);
323 PG_RE_THROW();
324 }
325 PG_END_TRY();
326 }
327
328
329 /* *INDENT-ON* */
330
331
332 /*
333 * LogRemoteCommand logs commands send to remote nodes if
334 * citus.log_remote_commands wants us to do so.
335 */
336 void
LogRemoteCommand(MultiConnection * connection,const char * command)337 LogRemoteCommand(MultiConnection *connection, const char *command)
338 {
339 if (!LogRemoteCommands)
340 {
341 return;
342 }
343
344 ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
345 errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
346 connection->hostname,
347 connection->port, connection->connectionId)));
348 }
349
350
351 /* wrappers around libpq functions, with command logging support */
352
353
354 /*
355 * ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every
356 * command in the commandList.
357 */
358 void
ExecuteCriticalRemoteCommandList(MultiConnection * connection,List * commandList)359 ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList)
360 {
361 const char *command = NULL;
362 foreach_ptr(command, commandList)
363 {
364 ExecuteCriticalRemoteCommand(connection, command);
365 }
366 }
367
368
369 /*
370 * ExecuteCriticalRemoteCommand executes a remote command that is critical
371 * to the transaction. If the command fails then the transaction aborts.
372 */
373 void
ExecuteCriticalRemoteCommand(MultiConnection * connection,const char * command)374 ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
375 {
376 bool raiseInterrupts = true;
377
378 int querySent = SendRemoteCommand(connection, command);
379 if (querySent == 0)
380 {
381 ReportConnectionError(connection, ERROR);
382 }
383
384 PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
385 if (!IsResponseOK(result))
386 {
387 ReportResultError(connection, result, ERROR);
388 }
389
390 PQclear(result);
391 ForgetResults(connection);
392 }
393
394
395 /*
396 * ExecuteOptionalRemoteCommand executes a remote command. If the command fails a WARNING
397 * is emitted but execution continues.
398 *
399 * could return 0, QUERY_SEND_FAILED, or RESPONSE_NOT_OKAY
400 * result is only set if there was no error
401 */
402 int
ExecuteOptionalRemoteCommand(MultiConnection * connection,const char * command,PGresult ** result)403 ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
404 PGresult **result)
405 {
406 bool raiseInterrupts = true;
407
408 int querySent = SendRemoteCommand(connection, command);
409 if (querySent == 0)
410 {
411 ReportConnectionError(connection, WARNING);
412 return QUERY_SEND_FAILED;
413 }
414
415 PGresult *localResult = GetRemoteCommandResult(connection, raiseInterrupts);
416 if (!IsResponseOK(localResult))
417 {
418 ReportResultError(connection, localResult, WARNING);
419 PQclear(localResult);
420 ForgetResults(connection);
421 return RESPONSE_NOT_OKAY;
422 }
423
424 /*
425 * store result if result has been set, when the user is not interested in the result
426 * a NULL pointer could be passed and the result will be cleared.
427 */
428 if (result != NULL)
429 {
430 *result = localResult;
431 }
432 else
433 {
434 PQclear(localResult);
435 ForgetResults(connection);
436 }
437
438 return RESPONSE_OKAY;
439 }
440
441
442 /*
443 * SendRemoteCommandParams is a PQsendQueryParams wrapper that logs remote commands,
444 * and accepts a MultiConnection instead of a plain PGconn. It makes sure it can
445 * send commands asynchronously without blocking (at the potential expense of
446 * an additional memory allocation). The command string can only include a single
447 * command since PQsendQueryParams() supports only that.
448 */
449 int
SendRemoteCommandParams(MultiConnection * connection,const char * command,int parameterCount,const Oid * parameterTypes,const char * const * parameterValues,bool binaryResults)450 SendRemoteCommandParams(MultiConnection *connection, const char *command,
451 int parameterCount, const Oid *parameterTypes,
452 const char *const *parameterValues, bool binaryResults)
453 {
454 PGconn *pgConn = connection->pgConn;
455
456 LogRemoteCommand(connection, command);
457
458 /*
459 * Don't try to send command if connection is entirely gone
460 * (PQisnonblocking() would crash).
461 */
462 if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
463 {
464 return 0;
465 }
466
467 Assert(PQisnonblocking(pgConn));
468
469 int rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
470 parameterValues, NULL, NULL, binaryResults ? 1 : 0);
471
472 return rc;
473 }
474
475
476 /*
477 * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
478 * accepts a MultiConnection instead of a plain PGconn. It makes sure it can
479 * send commands asynchronously without blocking (at the potential expense of
480 * an additional memory allocation). The command string can include multiple
481 * commands since PQsendQuery() supports that.
482 */
483 int
SendRemoteCommand(MultiConnection * connection,const char * command)484 SendRemoteCommand(MultiConnection *connection, const char *command)
485 {
486 PGconn *pgConn = connection->pgConn;
487
488 LogRemoteCommand(connection, command);
489
490 /*
491 * Don't try to send command if connection is entirely gone
492 * (PQisnonblocking() would crash).
493 */
494 if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
495 {
496 return 0;
497 }
498
499 Assert(PQisnonblocking(pgConn));
500
501 int rc = PQsendQuery(pgConn, command);
502
503 return rc;
504 }
505
506
507 /*
508 * ReadFirstColumnAsText reads the first column of result tuples from the given
509 * PGresult struct and returns them in a StringInfo list.
510 */
511 List *
ReadFirstColumnAsText(PGresult * queryResult)512 ReadFirstColumnAsText(PGresult *queryResult)
513 {
514 List *resultRowList = NIL;
515 const int columnIndex = 0;
516 int64 rowCount = 0;
517
518 ExecStatusType status = PQresultStatus(queryResult);
519 if (status == PGRES_TUPLES_OK)
520 {
521 rowCount = PQntuples(queryResult);
522 }
523
524 for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
525 {
526 char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
527
528 StringInfo rowValueString = makeStringInfo();
529 appendStringInfoString(rowValueString, rowValue);
530
531 resultRowList = lappend(resultRowList, rowValueString);
532 }
533
534 return resultRowList;
535 }
536
537
538 /*
539 * GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
540 *
541 * If raiseInterrupts is true and an interrupt arrives, e.g. the query is
542 * being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws
543 * an error.
544 *
545 * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
546 * an error, GetRemoteCommandResult returns NULL, and the transaction is
547 * marked as having failed. While that's not a perfect way to signal failure,
548 * callers will usually treat that as an error, and it's easy to use.
549 *
550 * Handling of interrupts is important to allow queries being cancelled while
551 * waiting on remote nodes. In a distributed deadlock scenario cancelling
552 * might be the only way to resolve the deadlock.
553 */
554 PGresult *
GetRemoteCommandResult(MultiConnection * connection,bool raiseInterrupts)555 GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
556 {
557 PGconn *pgConn = connection->pgConn;
558
559 /*
560 * Short circuit tests around the more expensive parts of this
561 * routine. This'd also trigger a return in the, unlikely, case of a
562 * failed/nonexistant connection.
563 */
564 if (!PQisBusy(pgConn))
565 {
566 return PQgetResult(connection->pgConn);
567 }
568
569 if (!FinishConnectionIO(connection, raiseInterrupts))
570 {
571 /* some error(s) happened while doing the I/O, signal the callers */
572 if (PQstatus(pgConn) == CONNECTION_BAD)
573 {
574 return PQmakeEmptyPGresult(pgConn, PGRES_FATAL_ERROR);
575 }
576
577 return NULL;
578 }
579
580 /* no IO should be necessary to get result */
581 Assert(!PQisBusy(pgConn));
582
583 PGresult *result = PQgetResult(connection->pgConn);
584
585 return result;
586 }
587
588
589 /*
590 * PutRemoteCopyData is a wrapper around PQputCopyData() that handles
591 * interrupts.
592 *
593 * Returns false if PQputCopyData() failed, true otherwise.
594 */
595 bool
PutRemoteCopyData(MultiConnection * connection,const char * buffer,int nbytes)596 PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
597 {
598 PGconn *pgConn = connection->pgConn;
599 bool allowInterrupts = true;
600
601 if (PQstatus(pgConn) != CONNECTION_OK)
602 {
603 return false;
604 }
605
606 Assert(PQisnonblocking(pgConn));
607
608 int copyState = PQputCopyData(pgConn, buffer, nbytes);
609 if (copyState == -1)
610 {
611 return false;
612 }
613
614 /*
615 * PQputCopyData may have queued up part of the data even if it managed
616 * to send some of it succesfully. We provide back pressure by waiting
617 * until the socket is writable to prevent the internal libpq buffers
618 * from growing excessively.
619 *
620 * We currently allow the internal buffer to grow to 8MB before
621 * providing back pressure based on experimentation that showed
622 * throughput get worse at 4MB and lower due to the number of CPU
623 * cycles spent in networking system calls.
624 */
625
626 connection->copyBytesWrittenSinceLastFlush += nbytes;
627 if (connection->copyBytesWrittenSinceLastFlush > RemoteCopyFlushThreshold)
628 {
629 connection->copyBytesWrittenSinceLastFlush = 0;
630 return FinishConnectionIO(connection, allowInterrupts);
631 }
632
633 return true;
634 }
635
636
637 /*
638 * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles
639 * interrupts.
640 *
641 * Returns false if PQputCopyEnd() failed, true otherwise.
642 */
643 bool
PutRemoteCopyEnd(MultiConnection * connection,const char * errormsg)644 PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
645 {
646 PGconn *pgConn = connection->pgConn;
647 bool allowInterrupts = true;
648
649 if (PQstatus(pgConn) != CONNECTION_OK)
650 {
651 return false;
652 }
653
654 Assert(PQisnonblocking(pgConn));
655
656 int copyState = PQputCopyEnd(pgConn, errormsg);
657 if (copyState == -1)
658 {
659 return false;
660 }
661
662 /* see PutRemoteCopyData() */
663
664 connection->copyBytesWrittenSinceLastFlush = 0;
665
666 return FinishConnectionIO(connection, allowInterrupts);
667 }
668
669
670 /*
671 * FinishConnectionIO performs pending IO for the connection, while accepting
672 * interrupts.
673 *
674 * See GetRemoteCommandResult() for documentation of interrupt handling
675 * behaviour.
676 *
677 * Returns true if IO was successfully completed, false otherwise.
678 */
679 static bool
FinishConnectionIO(MultiConnection * connection,bool raiseInterrupts)680 FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
681 {
682 PGconn *pgConn = connection->pgConn;
683 int sock = PQsocket(pgConn);
684
685 Assert(pgConn);
686 Assert(PQisnonblocking(pgConn));
687
688 if (raiseInterrupts)
689 {
690 CHECK_FOR_INTERRUPTS();
691 }
692
693 /* perform the necessary IO */
694 while (true)
695 {
696 int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
697
698 /* try to send all pending data */
699 int sendStatus = PQflush(pgConn);
700
701 /* if sending failed, there's nothing more we can do */
702 if (sendStatus == -1)
703 {
704 return false;
705 }
706 else if (sendStatus == 1)
707 {
708 waitFlags |= WL_SOCKET_WRITEABLE;
709 }
710
711 /* if reading fails, there's not much we can do */
712 if (PQconsumeInput(pgConn) == 0)
713 {
714 return false;
715 }
716 if (PQisBusy(pgConn))
717 {
718 waitFlags |= WL_SOCKET_READABLE;
719 }
720
721 if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0)
722 {
723 /* no IO necessary anymore, we're done */
724 return true;
725 }
726
727 int rc = WaitLatchOrSocket(MyLatch, waitFlags, sock, 0, PG_WAIT_EXTENSION);
728 if (rc & WL_POSTMASTER_DEATH)
729 {
730 ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
731 }
732
733 if (rc & WL_LATCH_SET)
734 {
735 ResetLatch(MyLatch);
736
737 /* if allowed raise errors */
738 if (raiseInterrupts)
739 {
740 CHECK_FOR_INTERRUPTS();
741 }
742
743 /*
744 * If raising errors allowed, or called within in a section with
745 * interrupts held, return instead, and mark the transaction as
746 * failed.
747 */
748 if (IsHoldOffCancellationReceived())
749 {
750 connection->remoteTransaction.transactionFailed = true;
751 break;
752 }
753 }
754 }
755
756 return false;
757 }
758
759
760 /*
761 * WaitForAllConnections blocks until all connections in the list are no
762 * longer busy, meaning the pending command has either finished or failed.
763 */
764 void
WaitForAllConnections(List * connectionList,bool raiseInterrupts)765 WaitForAllConnections(List *connectionList, bool raiseInterrupts)
766 {
767 int totalConnectionCount = list_length(connectionList);
768 int pendingConnectionsStartIndex = 0;
769 int connectionIndex = 0;
770
771 MultiConnection **allConnections =
772 palloc(totalConnectionCount * sizeof(MultiConnection *));
773 WaitEvent *events = palloc(totalConnectionCount * sizeof(WaitEvent));
774 bool *connectionReady = palloc(totalConnectionCount * sizeof(bool));
775 WaitEventSet *waitEventSet = NULL;
776
777 /* convert connection list to an array such that we can move items around */
778 MultiConnection *connectionItem = NULL;
779 foreach_ptr(connectionItem, connectionList)
780 {
781 allConnections[connectionIndex] = connectionItem;
782 connectionReady[connectionIndex] = false;
783 connectionIndex++;
784 }
785
786 /* make an initial pass to check for failed and idle connections */
787 for (connectionIndex = 0; connectionIndex < totalConnectionCount; connectionIndex++)
788 {
789 MultiConnection *connection = allConnections[connectionIndex];
790
791 if (PQstatus(connection->pgConn) == CONNECTION_BAD ||
792 !PQisBusy(connection->pgConn))
793 {
794 /* connection is already done; keep non-ready connections at the end */
795 allConnections[connectionIndex] =
796 allConnections[pendingConnectionsStartIndex];
797 pendingConnectionsStartIndex++;
798 }
799 }
800
801 PG_TRY();
802 {
803 bool rebuildWaitEventSet = true;
804
805 while (pendingConnectionsStartIndex < totalConnectionCount)
806 {
807 bool cancellationReceived = false;
808 int eventIndex = 0;
809 long timeout = -1;
810 int pendingConnectionCount = totalConnectionCount -
811 pendingConnectionsStartIndex;
812
813 /* rebuild the WaitEventSet whenever connections are ready */
814 if (rebuildWaitEventSet)
815 {
816 if (waitEventSet != NULL)
817 {
818 FreeWaitEventSet(waitEventSet);
819 }
820
821 waitEventSet = BuildWaitEventSet(allConnections, totalConnectionCount,
822 pendingConnectionsStartIndex);
823
824 rebuildWaitEventSet = false;
825 }
826
827 /* wait for I/O events */
828 int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
829 pendingConnectionCount,
830 WAIT_EVENT_CLIENT_READ);
831
832 /* process I/O events */
833 for (; eventIndex < eventCount; eventIndex++)
834 {
835 WaitEvent *event = &events[eventIndex];
836 bool connectionIsReady = false;
837
838 if (event->events & WL_POSTMASTER_DEATH)
839 {
840 ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
841 }
842
843 if (event->events & WL_LATCH_SET)
844 {
845 ResetLatch(MyLatch);
846
847 if (raiseInterrupts)
848 {
849 CHECK_FOR_INTERRUPTS();
850 }
851
852 if (IsHoldOffCancellationReceived())
853 {
854 /*
855 * Break out of event loop immediately in case of cancellation.
856 * We cannot use "return" here inside a PG_TRY() block since
857 * then the exception stack won't be reset.
858 */
859 cancellationReceived = true;
860 break;
861 }
862
863 continue;
864 }
865
866 MultiConnection *connection = (MultiConnection *) event->user_data;
867
868 if (event->events & WL_SOCKET_WRITEABLE)
869 {
870 int sendStatus = PQflush(connection->pgConn);
871 if (sendStatus == -1)
872 {
873 /* send failed, done with this connection */
874 connectionIsReady = true;
875 }
876 else if (sendStatus == 0)
877 {
878 /* done writing, only wait for read events */
879 ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE,
880 NULL);
881 }
882 }
883
884 /*
885 * Check whether the connection is done is the socket is either readable
886 * or writable. If it was only writable, we performed a PQflush which
887 * might have read from the socket, meaning we may not see the socket
888 * becoming readable again, so better to check it now.
889 */
890 if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
891 {
892 int receiveStatus = PQconsumeInput(connection->pgConn);
893 if (receiveStatus == 0)
894 {
895 /* receive failed, done with this connection */
896 connectionIsReady = true;
897 }
898 else if (!PQisBusy(connection->pgConn))
899 {
900 /* result was received */
901 connectionIsReady = true;
902 }
903 }
904
905 if (connectionIsReady)
906 {
907 /*
908 * All pending connections are kept at the end of the allConnections
909 * array and the connectionReady array matches the allConnections
910 * array. The wait event set corresponds to the pending connections
911 * subarray so we can get the index in the allConnections array by
912 * taking the event index + the offset of the subarray.
913 */
914 connectionIndex = event->pos + pendingConnectionsStartIndex;
915
916 connectionReady[connectionIndex] = true;
917
918 /*
919 * When a connection is ready, we should build a new wait event
920 * set that excludes this connection.
921 */
922 rebuildWaitEventSet = true;
923 }
924 }
925
926 if (cancellationReceived)
927 {
928 break;
929 }
930
931 /* move non-ready connections to the back of the array */
932 for (connectionIndex = pendingConnectionsStartIndex;
933 connectionIndex < totalConnectionCount; connectionIndex++)
934 {
935 if (connectionReady[connectionIndex])
936 {
937 /*
938 * Replace the ready connection with a connection from
939 * the start of the pending connections subarray. This
940 * may be the connection itself, in which case this is
941 * a noop.
942 */
943 allConnections[connectionIndex] =
944 allConnections[pendingConnectionsStartIndex];
945
946 /* offset of the pending connections subarray is now 1 higher */
947 pendingConnectionsStartIndex++;
948
949 /*
950 * We've moved a pending connection into this position,
951 * so we must reset the ready flag. Otherwise, we'd
952 * falsely interpret it as ready in the next round.
953 */
954 connectionReady[connectionIndex] = false;
955 }
956 }
957 }
958
959 if (waitEventSet != NULL)
960 {
961 FreeWaitEventSet(waitEventSet);
962 waitEventSet = NULL;
963 }
964
965 pfree(allConnections);
966 pfree(events);
967 pfree(connectionReady);
968 }
969 PG_CATCH();
970 {
971 /* make sure the epoll file descriptor is always closed */
972 if (waitEventSet != NULL)
973 {
974 FreeWaitEventSet(waitEventSet);
975 waitEventSet = NULL;
976 }
977
978 pfree(allConnections);
979 pfree(events);
980 pfree(connectionReady);
981
982 PG_RE_THROW();
983 }
984 PG_END_TRY();
985 }
986
987
988 /*
989 * BuildWaitEventSet creates a WaitEventSet for the given array of connections
990 * which can be used to wait for any of the sockets to become read-ready or
991 * write-ready.
992 */
993 static WaitEventSet *
BuildWaitEventSet(MultiConnection ** allConnections,int totalConnectionCount,int pendingConnectionsStartIndex)994 BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
995 int pendingConnectionsStartIndex)
996 {
997 int pendingConnectionCount = totalConnectionCount - pendingConnectionsStartIndex;
998
999 /*
1000 * subtract 3 to make room for WL_POSTMASTER_DEATH, WL_LATCH_SET, and
1001 * pgwin32_signal_event.
1002 */
1003 if (pendingConnectionCount > FD_SETSIZE - 3)
1004 {
1005 pendingConnectionCount = FD_SETSIZE - 3;
1006 }
1007
1008 /* allocate pending connections + 2 for the signal latch and postmaster death */
1009 /* (CreateWaitEventSet makes room for pgwin32_signal_event automatically) */
1010 WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext,
1011 pendingConnectionCount + 2);
1012
1013 for (int connectionIndex = 0; connectionIndex < pendingConnectionCount;
1014 connectionIndex++)
1015 {
1016 MultiConnection *connection = allConnections[pendingConnectionsStartIndex +
1017 connectionIndex];
1018 int sock = PQsocket(connection->pgConn);
1019
1020 /*
1021 * Always start by polling for both readability (server sent bytes)
1022 * and writeability (server is ready to receive bytes).
1023 */
1024 int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
1025
1026 AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection);
1027 }
1028
1029 /*
1030 * Put the wait events for the signal latch and postmaster death at the end such that
1031 * event index + pendingConnectionsStartIndex = the connection index in the array.
1032 */
1033 AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
1034 AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
1035
1036 return waitEventSet;
1037 }
1038
1039
1040 /*
1041 * SendCancelationRequest sends a cancelation request on the given connection.
1042 * Return value indicates whether the cancelation request was sent successfully.
1043 */
1044 bool
SendCancelationRequest(MultiConnection * connection)1045 SendCancelationRequest(MultiConnection *connection)
1046 {
1047 char errorBuffer[ERROR_BUFFER_SIZE] = { 0 };
1048
1049 PGcancel *cancelObject = PQgetCancel(connection->pgConn);
1050 if (cancelObject == NULL)
1051 {
1052 /* this can happen if connection is invalid */
1053 return false;
1054 }
1055
1056 bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
1057 if (!cancelSent)
1058 {
1059 ereport(WARNING, (errmsg("could not issue cancel request"),
1060 errdetail("Client error: %s", errorBuffer)));
1061 }
1062
1063 PQfreeCancel(cancelObject);
1064
1065 return cancelSent;
1066 }
1067