1 /*-------------------------------------------------------------------------
2 *
3 * scripts_parallel.c
4 * Parallel support for bin/scripts/
5 *
6 *
7 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * src/bin/scripts/scripts_parallel.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #ifdef WIN32
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
17 #endif
18
19 #include "postgres_fe.h"
20
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24
25 #include "common.h"
26 #include "common/logging.h"
27 #include "fe_utils/cancel.h"
28 #include "scripts_parallel.h"
29
30 static void init_slot(ParallelSlot *slot, PGconn *conn);
31 static int select_loop(int maxFd, fd_set *workerset);
32
33 static void
init_slot(ParallelSlot * slot,PGconn * conn)34 init_slot(ParallelSlot *slot, PGconn *conn)
35 {
36 slot->connection = conn;
37 /* Initially assume connection is idle */
38 slot->isFree = true;
39 }
40
41 /*
42 * Wait until a file descriptor from the given set becomes readable.
43 *
44 * Returns the number of ready descriptors, or -1 on failure (including
45 * getting a cancel request).
46 */
47 static int
select_loop(int maxFd,fd_set * workerset)48 select_loop(int maxFd, fd_set *workerset)
49 {
50 int i;
51 fd_set saveSet = *workerset;
52
53 if (CancelRequested)
54 return -1;
55
56 for (;;)
57 {
58 /*
59 * On Windows, we need to check once in a while for cancel requests;
60 * on other platforms we rely on select() returning when interrupted.
61 */
62 struct timeval *tvp;
63 #ifdef WIN32
64 struct timeval tv = {0, 1000000};
65
66 tvp = &tv;
67 #else
68 tvp = NULL;
69 #endif
70
71 *workerset = saveSet;
72 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
73
74 #ifdef WIN32
75 if (i == SOCKET_ERROR)
76 {
77 i = -1;
78
79 if (WSAGetLastError() == WSAEINTR)
80 errno = EINTR;
81 }
82 #endif
83
84 if (i < 0 && errno == EINTR)
85 continue; /* ignore this */
86 if (i < 0 || CancelRequested)
87 return -1; /* but not this */
88 if (i == 0)
89 continue; /* timeout (Win32 only) */
90 break;
91 }
92
93 return i;
94 }
95
96 /*
97 * ParallelSlotsGetIdle
98 * Return a connection slot that is ready to execute a command.
99 *
100 * This returns the first slot we find that is marked isFree, if one is;
101 * otherwise, we loop on select() until one socket becomes available. When
102 * this happens, we read the whole set and mark as free all sockets that
103 * become available. If an error occurs, NULL is returned.
104 */
105 ParallelSlot *
ParallelSlotsGetIdle(ParallelSlot * slots,int numslots)106 ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
107 {
108 int i;
109 int firstFree = -1;
110
111 /*
112 * Look for any connection currently free. If there is one, mark it as
113 * taken and let the caller know the slot to use.
114 */
115 for (i = 0; i < numslots; i++)
116 {
117 if (slots[i].isFree)
118 {
119 slots[i].isFree = false;
120 return slots + i;
121 }
122 }
123
124 /*
125 * No free slot found, so wait until one of the connections has finished
126 * its task and return the available slot.
127 */
128 while (firstFree < 0)
129 {
130 fd_set slotset;
131 int maxFd = 0;
132
133 /* We must reconstruct the fd_set for each call to select_loop */
134 FD_ZERO(&slotset);
135
136 for (i = 0; i < numslots; i++)
137 {
138 int sock = PQsocket(slots[i].connection);
139
140 /*
141 * We don't really expect any connections to lose their sockets
142 * after startup, but just in case, cope by ignoring them.
143 */
144 if (sock < 0)
145 continue;
146
147 FD_SET(sock, &slotset);
148 if (sock > maxFd)
149 maxFd = sock;
150 }
151
152 SetCancelConn(slots->connection);
153 i = select_loop(maxFd, &slotset);
154 ResetCancelConn();
155
156 /* failure? */
157 if (i < 0)
158 return NULL;
159
160 for (i = 0; i < numslots; i++)
161 {
162 int sock = PQsocket(slots[i].connection);
163
164 if (sock >= 0 && FD_ISSET(sock, &slotset))
165 {
166 /* select() says input is available, so consume it */
167 PQconsumeInput(slots[i].connection);
168 }
169
170 /* Collect result(s) as long as any are available */
171 while (!PQisBusy(slots[i].connection))
172 {
173 PGresult *result = PQgetResult(slots[i].connection);
174
175 if (result != NULL)
176 {
177 /* Check and discard the command result */
178 if (!processQueryResult(slots[i].connection, result))
179 return NULL;
180 }
181 else
182 {
183 /* This connection has become idle */
184 slots[i].isFree = true;
185 if (firstFree < 0)
186 firstFree = i;
187 break;
188 }
189 }
190 }
191 }
192
193 slots[firstFree].isFree = false;
194 return slots + firstFree;
195 }
196
197 /*
198 * ParallelSlotsSetup
199 * Prepare a set of parallel slots to use on a given database.
200 *
201 * This creates and initializes a set of connections to the database
202 * using the information given by the caller, marking all parallel slots
203 * as free and ready to use. "conn" is an initial connection set up
204 * by the caller and is associated with the first slot in the parallel
205 * set.
206 */
207 ParallelSlot *
ParallelSlotsSetup(const ConnParams * cparams,const char * progname,bool echo,PGconn * conn,int numslots)208 ParallelSlotsSetup(const ConnParams *cparams,
209 const char *progname, bool echo,
210 PGconn *conn, int numslots)
211 {
212 ParallelSlot *slots;
213 int i;
214
215 Assert(conn != NULL);
216
217 slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
218 init_slot(slots, conn);
219 if (numslots > 1)
220 {
221 for (i = 1; i < numslots; i++)
222 {
223 conn = connectDatabase(cparams, progname, echo, false, true);
224
225 /*
226 * Fail and exit immediately if trying to use a socket in an
227 * unsupported range. POSIX requires open(2) to use the lowest
228 * unused file descriptor and the hint given relies on that.
229 */
230 if (PQsocket(conn) >= FD_SETSIZE)
231 {
232 pg_log_fatal("too many jobs for this platform -- try %d", i);
233 exit(1);
234 }
235
236 init_slot(slots + i, conn);
237 }
238 }
239
240 return slots;
241 }
242
243 /*
244 * ParallelSlotsTerminate
245 * Clean up a set of parallel slots
246 *
247 * Iterate through all connections in a given set of ParallelSlots and
248 * terminate all connections.
249 */
250 void
ParallelSlotsTerminate(ParallelSlot * slots,int numslots)251 ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
252 {
253 int i;
254
255 for (i = 0; i < numslots; i++)
256 {
257 PGconn *conn = slots[i].connection;
258
259 if (conn == NULL)
260 continue;
261
262 disconnectDatabase(conn);
263 }
264 }
265
266 /*
267 * ParallelSlotsWaitCompletion
268 *
269 * Wait for all connections to finish, returning false if at least one
270 * error has been found on the way.
271 */
272 bool
ParallelSlotsWaitCompletion(ParallelSlot * slots,int numslots)273 ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
274 {
275 int i;
276
277 for (i = 0; i < numslots; i++)
278 {
279 if (!consumeQueryResult((slots + i)->connection))
280 return false;
281 }
282
283 return true;
284 }
285