1 /*------------------------------------------------------------------------- 2 * 3 * scripts_parallel.c 4 * Parallel support for bin/scripts/ 5 * 6 * 7 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * src/bin/scripts/scripts_parallel.c 11 * 12 *------------------------------------------------------------------------- 13 */ 14 15 #ifdef WIN32 16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */ 17 #endif 18 19 #include "postgres_fe.h" 20 21 #ifdef HAVE_SYS_SELECT_H 22 #include <sys/select.h> 23 #endif 24 25 #include "common.h" 26 #include "common/logging.h" 27 #include "fe_utils/cancel.h" 28 #include "scripts_parallel.h" 29 30 static void init_slot(ParallelSlot *slot, PGconn *conn); 31 static int select_loop(int maxFd, fd_set *workerset); 32 33 static void 34 init_slot(ParallelSlot *slot, PGconn *conn) 35 { 36 slot->connection = conn; 37 /* Initially assume connection is idle */ 38 slot->isFree = true; 39 } 40 41 /* 42 * Wait until a file descriptor from the given set becomes readable. 43 * 44 * Returns the number of ready descriptors, or -1 on failure (including 45 * getting a cancel request). 46 */ 47 static int 48 select_loop(int maxFd, fd_set *workerset) 49 { 50 int i; 51 fd_set saveSet = *workerset; 52 53 if (CancelRequested) 54 return -1; 55 56 for (;;) 57 { 58 /* 59 * On Windows, we need to check once in a while for cancel requests; 60 * on other platforms we rely on select() returning when interrupted. 61 */ 62 struct timeval *tvp; 63 #ifdef WIN32 64 struct timeval tv = {0, 1000000}; 65 66 tvp = &tv; 67 #else 68 tvp = NULL; 69 #endif 70 71 *workerset = saveSet; 72 i = select(maxFd + 1, workerset, NULL, NULL, tvp); 73 74 #ifdef WIN32 75 if (i == SOCKET_ERROR) 76 { 77 i = -1; 78 79 if (WSAGetLastError() == WSAEINTR) 80 errno = EINTR; 81 } 82 #endif 83 84 if (i < 0 && errno == EINTR) 85 continue; /* ignore this */ 86 if (i < 0 || CancelRequested) 87 return -1; /* but not this */ 88 if (i == 0) 89 continue; /* timeout (Win32 only) */ 90 break; 91 } 92 93 return i; 94 } 95 96 /* 97 * ParallelSlotsGetIdle 98 * Return a connection slot that is ready to execute a command. 99 * 100 * This returns the first slot we find that is marked isFree, if one is; 101 * otherwise, we loop on select() until one socket becomes available. When 102 * this happens, we read the whole set and mark as free all sockets that 103 * become available. If an error occurs, NULL is returned. 104 */ 105 ParallelSlot * 106 ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) 107 { 108 int i; 109 int firstFree = -1; 110 111 /* 112 * Look for any connection currently free. If there is one, mark it as 113 * taken and let the caller know the slot to use. 114 */ 115 for (i = 0; i < numslots; i++) 116 { 117 if (slots[i].isFree) 118 { 119 slots[i].isFree = false; 120 return slots + i; 121 } 122 } 123 124 /* 125 * No free slot found, so wait until one of the connections has finished 126 * its task and return the available slot. 127 */ 128 while (firstFree < 0) 129 { 130 fd_set slotset; 131 int maxFd = 0; 132 133 /* We must reconstruct the fd_set for each call to select_loop */ 134 FD_ZERO(&slotset); 135 136 for (i = 0; i < numslots; i++) 137 { 138 int sock = PQsocket(slots[i].connection); 139 140 /* 141 * We don't really expect any connections to lose their sockets 142 * after startup, but just in case, cope by ignoring them. 143 */ 144 if (sock < 0) 145 continue; 146 147 FD_SET(sock, &slotset); 148 if (sock > maxFd) 149 maxFd = sock; 150 } 151 152 SetCancelConn(slots->connection); 153 i = select_loop(maxFd, &slotset); 154 ResetCancelConn(); 155 156 /* failure? */ 157 if (i < 0) 158 return NULL; 159 160 for (i = 0; i < numslots; i++) 161 { 162 int sock = PQsocket(slots[i].connection); 163 164 if (sock >= 0 && FD_ISSET(sock, &slotset)) 165 { 166 /* select() says input is available, so consume it */ 167 PQconsumeInput(slots[i].connection); 168 } 169 170 /* Collect result(s) as long as any are available */ 171 while (!PQisBusy(slots[i].connection)) 172 { 173 PGresult *result = PQgetResult(slots[i].connection); 174 175 if (result != NULL) 176 { 177 /* Check and discard the command result */ 178 if (!processQueryResult(slots[i].connection, result)) 179 return NULL; 180 } 181 else 182 { 183 /* This connection has become idle */ 184 slots[i].isFree = true; 185 if (firstFree < 0) 186 firstFree = i; 187 break; 188 } 189 } 190 } 191 } 192 193 slots[firstFree].isFree = false; 194 return slots + firstFree; 195 } 196 197 /* 198 * ParallelSlotsSetup 199 * Prepare a set of parallel slots to use on a given database. 200 * 201 * This creates and initializes a set of connections to the database 202 * using the information given by the caller, marking all parallel slots 203 * as free and ready to use. "conn" is an initial connection set up 204 * by the caller and is associated with the first slot in the parallel 205 * set. 206 */ 207 ParallelSlot * 208 ParallelSlotsSetup(const ConnParams *cparams, 209 const char *progname, bool echo, 210 PGconn *conn, int numslots) 211 { 212 ParallelSlot *slots; 213 int i; 214 215 Assert(conn != NULL); 216 217 slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots); 218 init_slot(slots, conn); 219 if (numslots > 1) 220 { 221 for (i = 1; i < numslots; i++) 222 { 223 conn = connectDatabase(cparams, progname, echo, false, true); 224 225 /* 226 * Fail and exit immediately if trying to use a socket in an 227 * unsupported range. POSIX requires open(2) to use the lowest 228 * unused file descriptor and the hint given relies on that. 229 */ 230 if (PQsocket(conn) >= FD_SETSIZE) 231 { 232 pg_log_fatal("too many jobs for this platform -- try %d", i); 233 exit(1); 234 } 235 236 init_slot(slots + i, conn); 237 } 238 } 239 240 return slots; 241 } 242 243 /* 244 * ParallelSlotsTerminate 245 * Clean up a set of parallel slots 246 * 247 * Iterate through all connections in a given set of ParallelSlots and 248 * terminate all connections. 249 */ 250 void 251 ParallelSlotsTerminate(ParallelSlot *slots, int numslots) 252 { 253 int i; 254 255 for (i = 0; i < numslots; i++) 256 { 257 PGconn *conn = slots[i].connection; 258 259 if (conn == NULL) 260 continue; 261 262 disconnectDatabase(conn); 263 } 264 } 265 266 /* 267 * ParallelSlotsWaitCompletion 268 * 269 * Wait for all connections to finish, returning false if at least one 270 * error has been found on the way. 271 */ 272 bool 273 ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) 274 { 275 int i; 276 277 for (i = 0; i < numslots; i++) 278 { 279 if (!consumeQueryResult((slots + i)->connection)) 280 return false; 281 } 282 283 return true; 284 } 285