1 /*-------------------------------------------------------------------------
2  *
3  *	scripts_parallel.c
4  *		Parallel support for bin/scripts/
5  *
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/bin/scripts/scripts_parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #ifdef WIN32
16 #define FD_SETSIZE 1024			/* must set before winsock2.h is included */
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24 
25 #include "common.h"
26 #include "common/logging.h"
27 #include "fe_utils/cancel.h"
28 #include "scripts_parallel.h"
29 
30 static void init_slot(ParallelSlot *slot, PGconn *conn);
31 static int	select_loop(int maxFd, fd_set *workerset);
32 
33 static void
34 init_slot(ParallelSlot *slot, PGconn *conn)
35 {
36 	slot->connection = conn;
37 	/* Initially assume connection is idle */
38 	slot->isFree = true;
39 }
40 
41 /*
42  * Wait until a file descriptor from the given set becomes readable.
43  *
44  * Returns the number of ready descriptors, or -1 on failure (including
45  * getting a cancel request).
46  */
47 static int
48 select_loop(int maxFd, fd_set *workerset)
49 {
50 	int			i;
51 	fd_set		saveSet = *workerset;
52 
53 	if (CancelRequested)
54 		return -1;
55 
56 	for (;;)
57 	{
58 		/*
59 		 * On Windows, we need to check once in a while for cancel requests;
60 		 * on other platforms we rely on select() returning when interrupted.
61 		 */
62 		struct timeval *tvp;
63 #ifdef WIN32
64 		struct timeval tv = {0, 1000000};
65 
66 		tvp = &tv;
67 #else
68 		tvp = NULL;
69 #endif
70 
71 		*workerset = saveSet;
72 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
73 
74 #ifdef WIN32
75 		if (i == SOCKET_ERROR)
76 		{
77 			i = -1;
78 
79 			if (WSAGetLastError() == WSAEINTR)
80 				errno = EINTR;
81 		}
82 #endif
83 
84 		if (i < 0 && errno == EINTR)
85 			continue;			/* ignore this */
86 		if (i < 0 || CancelRequested)
87 			return -1;			/* but not this */
88 		if (i == 0)
89 			continue;			/* timeout (Win32 only) */
90 		break;
91 	}
92 
93 	return i;
94 }
95 
96 /*
97  * ParallelSlotsGetIdle
98  *		Return a connection slot that is ready to execute a command.
99  *
100  * This returns the first slot we find that is marked isFree, if one is;
101  * otherwise, we loop on select() until one socket becomes available.  When
102  * this happens, we read the whole set and mark as free all sockets that
103  * become available.  If an error occurs, NULL is returned.
104  */
105 ParallelSlot *
106 ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
107 {
108 	int			i;
109 	int			firstFree = -1;
110 
111 	/*
112 	 * Look for any connection currently free.  If there is one, mark it as
113 	 * taken and let the caller know the slot to use.
114 	 */
115 	for (i = 0; i < numslots; i++)
116 	{
117 		if (slots[i].isFree)
118 		{
119 			slots[i].isFree = false;
120 			return slots + i;
121 		}
122 	}
123 
124 	/*
125 	 * No free slot found, so wait until one of the connections has finished
126 	 * its task and return the available slot.
127 	 */
128 	while (firstFree < 0)
129 	{
130 		fd_set		slotset;
131 		int			maxFd = 0;
132 
133 		/* We must reconstruct the fd_set for each call to select_loop */
134 		FD_ZERO(&slotset);
135 
136 		for (i = 0; i < numslots; i++)
137 		{
138 			int			sock = PQsocket(slots[i].connection);
139 
140 			/*
141 			 * We don't really expect any connections to lose their sockets
142 			 * after startup, but just in case, cope by ignoring them.
143 			 */
144 			if (sock < 0)
145 				continue;
146 
147 			FD_SET(sock, &slotset);
148 			if (sock > maxFd)
149 				maxFd = sock;
150 		}
151 
152 		SetCancelConn(slots->connection);
153 		i = select_loop(maxFd, &slotset);
154 		ResetCancelConn();
155 
156 		/* failure? */
157 		if (i < 0)
158 			return NULL;
159 
160 		for (i = 0; i < numslots; i++)
161 		{
162 			int			sock = PQsocket(slots[i].connection);
163 
164 			if (sock >= 0 && FD_ISSET(sock, &slotset))
165 			{
166 				/* select() says input is available, so consume it */
167 				PQconsumeInput(slots[i].connection);
168 			}
169 
170 			/* Collect result(s) as long as any are available */
171 			while (!PQisBusy(slots[i].connection))
172 			{
173 				PGresult   *result = PQgetResult(slots[i].connection);
174 
175 				if (result != NULL)
176 				{
177 					/* Check and discard the command result */
178 					if (!processQueryResult(slots[i].connection, result))
179 						return NULL;
180 				}
181 				else
182 				{
183 					/* This connection has become idle */
184 					slots[i].isFree = true;
185 					if (firstFree < 0)
186 						firstFree = i;
187 					break;
188 				}
189 			}
190 		}
191 	}
192 
193 	slots[firstFree].isFree = false;
194 	return slots + firstFree;
195 }
196 
197 /*
198  * ParallelSlotsSetup
199  *		Prepare a set of parallel slots to use on a given database.
200  *
201  * This creates and initializes a set of connections to the database
202  * using the information given by the caller, marking all parallel slots
203  * as free and ready to use.  "conn" is an initial connection set up
204  * by the caller and is associated with the first slot in the parallel
205  * set.
206  */
207 ParallelSlot *
208 ParallelSlotsSetup(const ConnParams *cparams,
209 				   const char *progname, bool echo,
210 				   PGconn *conn, int numslots)
211 {
212 	ParallelSlot *slots;
213 	int			i;
214 
215 	Assert(conn != NULL);
216 
217 	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
218 	init_slot(slots, conn);
219 	if (numslots > 1)
220 	{
221 		for (i = 1; i < numslots; i++)
222 		{
223 			conn = connectDatabase(cparams, progname, echo, false, true);
224 
225 			/*
226 			 * Fail and exit immediately if trying to use a socket in an
227 			 * unsupported range.  POSIX requires open(2) to use the lowest
228 			 * unused file descriptor and the hint given relies on that.
229 			 */
230 			if (PQsocket(conn) >= FD_SETSIZE)
231 			{
232 				pg_log_fatal("too many jobs for this platform -- try %d", i);
233 				exit(1);
234 			}
235 
236 			init_slot(slots + i, conn);
237 		}
238 	}
239 
240 	return slots;
241 }
242 
243 /*
244  * ParallelSlotsTerminate
245  *		Clean up a set of parallel slots
246  *
247  * Iterate through all connections in a given set of ParallelSlots and
248  * terminate all connections.
249  */
250 void
251 ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
252 {
253 	int			i;
254 
255 	for (i = 0; i < numslots; i++)
256 	{
257 		PGconn	   *conn = slots[i].connection;
258 
259 		if (conn == NULL)
260 			continue;
261 
262 		disconnectDatabase(conn);
263 	}
264 }
265 
266 /*
267  * ParallelSlotsWaitCompletion
268  *
269  * Wait for all connections to finish, returning false if at least one
270  * error has been found on the way.
271  */
272 bool
273 ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
274 {
275 	int			i;
276 
277 	for (i = 0; i < numslots; i++)
278 	{
279 		if (!consumeQueryResult((slots + i)->connection))
280 			return false;
281 	}
282 
283 	return true;
284 }
285