1 /*-------------------------------------------------------------------------
2  *
3  *	parallel_slot.c
4  *		Parallel support for front-end parallel database connections
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/fe_utils/parallel_slot.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #ifdef WIN32
16 #define FD_SETSIZE 1024			/* must set before winsock2.h is included */
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24 
25 #include "common/logging.h"
26 #include "fe_utils/cancel.h"
27 #include "fe_utils/parallel_slot.h"
28 #include "fe_utils/query_utils.h"
29 
30 #define ERRCODE_UNDEFINED_TABLE  "42P01"
31 
32 static int	select_loop(int maxFd, fd_set *workerset);
33 static bool processQueryResult(ParallelSlot *slot, PGresult *result);
34 
35 /*
36  * Process (and delete) a query result.  Returns true if there's no problem,
37  * false otherwise. It's up to the handler to decide what constitutes a
38  * problem.
39  */
40 static bool
processQueryResult(ParallelSlot * slot,PGresult * result)41 processQueryResult(ParallelSlot *slot, PGresult *result)
42 {
43 	Assert(slot->handler != NULL);
44 
45 	/* On failure, the handler should return NULL after freeing the result */
46 	if (!slot->handler(result, slot->connection, slot->handler_context))
47 		return false;
48 
49 	/* Ok, we have to free it ourself */
50 	PQclear(result);
51 	return true;
52 }
53 
54 /*
55  * Consume all the results generated for the given connection until
56  * nothing remains.  If at least one error is encountered, return false.
57  * Note that this will block if the connection is busy.
58  */
59 static bool
consumeQueryResult(ParallelSlot * slot)60 consumeQueryResult(ParallelSlot *slot)
61 {
62 	bool		ok = true;
63 	PGresult   *result;
64 
65 	SetCancelConn(slot->connection);
66 	while ((result = PQgetResult(slot->connection)) != NULL)
67 	{
68 		if (!processQueryResult(slot, result))
69 			ok = false;
70 	}
71 	ResetCancelConn();
72 	return ok;
73 }
74 
75 /*
76  * Wait until a file descriptor from the given set becomes readable.
77  *
78  * Returns the number of ready descriptors, or -1 on failure (including
79  * getting a cancel request).
80  */
81 static int
select_loop(int maxFd,fd_set * workerset)82 select_loop(int maxFd, fd_set *workerset)
83 {
84 	int			i;
85 	fd_set		saveSet = *workerset;
86 
87 	if (CancelRequested)
88 		return -1;
89 
90 	for (;;)
91 	{
92 		/*
93 		 * On Windows, we need to check once in a while for cancel requests;
94 		 * on other platforms we rely on select() returning when interrupted.
95 		 */
96 		struct timeval *tvp;
97 #ifdef WIN32
98 		struct timeval tv = {0, 1000000};
99 
100 		tvp = &tv;
101 #else
102 		tvp = NULL;
103 #endif
104 
105 		*workerset = saveSet;
106 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
107 
108 #ifdef WIN32
109 		if (i == SOCKET_ERROR)
110 		{
111 			i = -1;
112 
113 			if (WSAGetLastError() == WSAEINTR)
114 				errno = EINTR;
115 		}
116 #endif
117 
118 		if (i < 0 && errno == EINTR)
119 			continue;			/* ignore this */
120 		if (i < 0 || CancelRequested)
121 			return -1;			/* but not this */
122 		if (i == 0)
123 			continue;			/* timeout (Win32 only) */
124 		break;
125 	}
126 
127 	return i;
128 }
129 
130 /*
131  * Return the offset of a suitable idle slot, or -1 if none are available.  If
132  * the given dbname is not null, only idle slots connected to the given
133  * database are considered suitable, otherwise all idle connected slots are
134  * considered suitable.
135  */
136 static int
find_matching_idle_slot(const ParallelSlotArray * sa,const char * dbname)137 find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
138 {
139 	int			i;
140 
141 	for (i = 0; i < sa->numslots; i++)
142 	{
143 		if (sa->slots[i].inUse)
144 			continue;
145 
146 		if (sa->slots[i].connection == NULL)
147 			continue;
148 
149 		if (dbname == NULL ||
150 			strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
151 			return i;
152 	}
153 	return -1;
154 }
155 
156 /*
157  * Return the offset of the first slot without a database connection, or -1 if
158  * all slots are connected.
159  */
160 static int
find_unconnected_slot(const ParallelSlotArray * sa)161 find_unconnected_slot(const ParallelSlotArray *sa)
162 {
163 	int			i;
164 
165 	for (i = 0; i < sa->numslots; i++)
166 	{
167 		if (sa->slots[i].inUse)
168 			continue;
169 
170 		if (sa->slots[i].connection == NULL)
171 			return i;
172 	}
173 
174 	return -1;
175 }
176 
177 /*
178  * Return the offset of the first idle slot, or -1 if all slots are busy.
179  */
180 static int
find_any_idle_slot(const ParallelSlotArray * sa)181 find_any_idle_slot(const ParallelSlotArray *sa)
182 {
183 	int			i;
184 
185 	for (i = 0; i < sa->numslots; i++)
186 		if (!sa->slots[i].inUse)
187 			return i;
188 
189 	return -1;
190 }
191 
192 /*
193  * Wait for any slot's connection to have query results, consume the results,
194  * and update the slot's status as appropriate.  Returns true on success,
195  * false on cancellation, on error, or if no slots are connected.
196  */
197 static bool
wait_on_slots(ParallelSlotArray * sa)198 wait_on_slots(ParallelSlotArray *sa)
199 {
200 	int			i;
201 	fd_set		slotset;
202 	int			maxFd = 0;
203 	PGconn	   *cancelconn = NULL;
204 
205 	/* We must reconstruct the fd_set for each call to select_loop */
206 	FD_ZERO(&slotset);
207 
208 	for (i = 0; i < sa->numslots; i++)
209 	{
210 		int			sock;
211 
212 		/* We shouldn't get here if we still have slots without connections */
213 		Assert(sa->slots[i].connection != NULL);
214 
215 		sock = PQsocket(sa->slots[i].connection);
216 
217 		/*
218 		 * We don't really expect any connections to lose their sockets after
219 		 * startup, but just in case, cope by ignoring them.
220 		 */
221 		if (sock < 0)
222 			continue;
223 
224 		/* Keep track of the first valid connection we see. */
225 		if (cancelconn == NULL)
226 			cancelconn = sa->slots[i].connection;
227 
228 		FD_SET(sock, &slotset);
229 		if (sock > maxFd)
230 			maxFd = sock;
231 	}
232 
233 	/*
234 	 * If we get this far with no valid connections, processing cannot
235 	 * continue.
236 	 */
237 	if (cancelconn == NULL)
238 		return false;
239 
240 	SetCancelConn(sa->slots->connection);
241 	i = select_loop(maxFd, &slotset);
242 	ResetCancelConn();
243 
244 	/* failure? */
245 	if (i < 0)
246 		return false;
247 
248 	for (i = 0; i < sa->numslots; i++)
249 	{
250 		int			sock;
251 
252 		sock = PQsocket(sa->slots[i].connection);
253 
254 		if (sock >= 0 && FD_ISSET(sock, &slotset))
255 		{
256 			/* select() says input is available, so consume it */
257 			PQconsumeInput(sa->slots[i].connection);
258 		}
259 
260 		/* Collect result(s) as long as any are available */
261 		while (!PQisBusy(sa->slots[i].connection))
262 		{
263 			PGresult   *result = PQgetResult(sa->slots[i].connection);
264 
265 			if (result != NULL)
266 			{
267 				/* Handle and discard the command result */
268 				if (!processQueryResult(&sa->slots[i], result))
269 					return false;
270 			}
271 			else
272 			{
273 				/* This connection has become idle */
274 				sa->slots[i].inUse = false;
275 				ParallelSlotClearHandler(&sa->slots[i]);
276 				break;
277 			}
278 		}
279 	}
280 	return true;
281 }
282 
283 /*
284  * Open a new database connection using the stored connection parameters and
285  * optionally a given dbname if not null, execute the stored initial command if
286  * any, and associate the new connection with the given slot.
287  */
288 static void
connect_slot(ParallelSlotArray * sa,int slotno,const char * dbname)289 connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
290 {
291 	const char *old_override;
292 	ParallelSlot *slot = &sa->slots[slotno];
293 
294 	old_override = sa->cparams->override_dbname;
295 	if (dbname)
296 		sa->cparams->override_dbname = dbname;
297 	slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
298 	sa->cparams->override_dbname = old_override;
299 
300 	if (PQsocket(slot->connection) >= FD_SETSIZE)
301 	{
302 		pg_log_fatal("too many jobs for this platform");
303 		exit(1);
304 	}
305 
306 	/* Setup the connection using the supplied command, if any. */
307 	if (sa->initcmd)
308 		executeCommand(slot->connection, sa->initcmd, sa->echo);
309 }
310 
311 /*
312  * ParallelSlotsGetIdle
313  *		Return a connection slot that is ready to execute a command.
314  *
315  * The slot returned is chosen as follows:
316  *
317  * If any idle slot already has an open connection, and if either dbname is
318  * null or the existing connection is to the given database, that slot will be
319  * returned allowing the connection to be reused.
320  *
321  * Otherwise, if any idle slot is not yet connected to any database, the slot
322  * will be returned with it's connection opened using the stored cparams and
323  * optionally the given dbname if not null.
324  *
325  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
326  * after having it's connection disconnected and reconnected using the stored
327  * cparams and optionally the given dbname if not null.
328  *
329  * Otherwise, if any slots have connections that are busy, we loop on select()
330  * until one socket becomes available.  When this happens, we read the whole
331  * set and mark as free all sockets that become available.  We then select a
332  * slot using the same rules as above.
333  *
334  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
335  *
336  * For any connection created, if the stored initcmd is not null, it will be
337  * executed as a command on the newly formed connection before the slot is
338  * returned.
339  *
340  * If an error occurs, NULL is returned.
341  */
342 ParallelSlot *
ParallelSlotsGetIdle(ParallelSlotArray * sa,const char * dbname)343 ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
344 {
345 	int			offset;
346 
347 	Assert(sa);
348 	Assert(sa->numslots > 0);
349 
350 	while (1)
351 	{
352 		/* First choice: a slot already connected to the desired database. */
353 		offset = find_matching_idle_slot(sa, dbname);
354 		if (offset >= 0)
355 		{
356 			sa->slots[offset].inUse = true;
357 			return &sa->slots[offset];
358 		}
359 
360 		/* Second choice: a slot not connected to any database. */
361 		offset = find_unconnected_slot(sa);
362 		if (offset >= 0)
363 		{
364 			connect_slot(sa, offset, dbname);
365 			sa->slots[offset].inUse = true;
366 			return &sa->slots[offset];
367 		}
368 
369 		/* Third choice: a slot connected to the wrong database. */
370 		offset = find_any_idle_slot(sa);
371 		if (offset >= 0)
372 		{
373 			disconnectDatabase(sa->slots[offset].connection);
374 			sa->slots[offset].connection = NULL;
375 			connect_slot(sa, offset, dbname);
376 			sa->slots[offset].inUse = true;
377 			return &sa->slots[offset];
378 		}
379 
380 		/*
381 		 * Fourth choice: block until one or more slots become available. If
382 		 * any slots hit a fatal error, we'll find out about that here and
383 		 * return NULL.
384 		 */
385 		if (!wait_on_slots(sa))
386 			return NULL;
387 	}
388 }
389 
390 /*
391  * ParallelSlotsSetup
392  *		Prepare a set of parallel slots but do not connect to any database.
393  *
394  * This creates and initializes a set of slots, marking all parallel slots as
395  * free and ready to use.  Establishing connections is delayed until requesting
396  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
397  * use and must remain valid for the lifetime of the returned array.
398  */
399 ParallelSlotArray *
ParallelSlotsSetup(int numslots,ConnParams * cparams,const char * progname,bool echo,const char * initcmd)400 ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
401 				   bool echo, const char *initcmd)
402 {
403 	ParallelSlotArray *sa;
404 
405 	Assert(numslots > 0);
406 	Assert(cparams != NULL);
407 	Assert(progname != NULL);
408 
409 	sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
410 									   numslots * sizeof(ParallelSlot));
411 
412 	sa->numslots = numslots;
413 	sa->cparams = cparams;
414 	sa->progname = progname;
415 	sa->echo = echo;
416 	sa->initcmd = initcmd;
417 
418 	return sa;
419 }
420 
421 /*
422  * ParallelSlotsAdoptConn
423  *		Assign an open connection to the slots array for reuse.
424  *
425  * This turns over ownership of an open connection to a slots array.  The
426  * caller should not further use or close the connection.  All the connection's
427  * parameters (user, host, port, etc.) except possibly dbname should match
428  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
429  * these parameters differ, subsequent behavior is undefined.
430  */
431 void
ParallelSlotsAdoptConn(ParallelSlotArray * sa,PGconn * conn)432 ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
433 {
434 	int			offset;
435 
436 	offset = find_unconnected_slot(sa);
437 	if (offset >= 0)
438 		sa->slots[offset].connection = conn;
439 	else
440 		disconnectDatabase(conn);
441 }
442 
443 /*
444  * ParallelSlotsTerminate
445  *		Clean up a set of parallel slots
446  *
447  * Iterate through all connections in a given set of ParallelSlots and
448  * terminate all connections.
449  */
450 void
ParallelSlotsTerminate(ParallelSlotArray * sa)451 ParallelSlotsTerminate(ParallelSlotArray *sa)
452 {
453 	int			i;
454 
455 	for (i = 0; i < sa->numslots; i++)
456 	{
457 		PGconn	   *conn = sa->slots[i].connection;
458 
459 		if (conn == NULL)
460 			continue;
461 
462 		disconnectDatabase(conn);
463 	}
464 }
465 
466 /*
467  * ParallelSlotsWaitCompletion
468  *
469  * Wait for all connections to finish, returning false if at least one
470  * error has been found on the way.
471  */
472 bool
ParallelSlotsWaitCompletion(ParallelSlotArray * sa)473 ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
474 {
475 	int			i;
476 
477 	for (i = 0; i < sa->numslots; i++)
478 	{
479 		if (sa->slots[i].connection == NULL)
480 			continue;
481 		if (!consumeQueryResult(&sa->slots[i]))
482 			return false;
483 	}
484 
485 	return true;
486 }
487 
488 /*
489  * TableCommandResultHandler
490  *
491  * ParallelSlotResultHandler for results of commands (not queries) against
492  * tables.
493  *
494  * Requires that the result status is either PGRES_COMMAND_OK or an error about
495  * a missing table.  This is useful for utilities that compile a list of tables
496  * to process and then run commands (vacuum, reindex, or whatever) against
497  * those tables, as there is a race condition between the time the list is
498  * compiled and the time the command attempts to open the table.
499  *
500  * For missing tables, logs an error but allows processing to continue.
501  *
502  * For all other errors, logs an error and terminates further processing.
503  *
504  * res: PGresult from the query executed on the slot's connection
505  * conn: connection belonging to the slot
506  * context: unused
507  */
508 bool
TableCommandResultHandler(PGresult * res,PGconn * conn,void * context)509 TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
510 {
511 	Assert(res != NULL);
512 	Assert(conn != NULL);
513 
514 	/*
515 	 * If it's an error, report it.  Errors about a missing table are harmless
516 	 * so we continue processing; but die for other errors.
517 	 */
518 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
519 	{
520 		char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
521 
522 		pg_log_error("processing of database \"%s\" failed: %s",
523 					 PQdb(conn), PQerrorMessage(conn));
524 
525 		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
526 		{
527 			PQclear(res);
528 			return false;
529 		}
530 	}
531 
532 	return true;
533 }
534