1 /* 2 * parallel.c 3 * 4 * multi-process support 5 * 6 * Copyright (c) 2010-2018, PostgreSQL Global Development Group 7 * src/bin/pg_upgrade/parallel.c 8 */ 9 10 #include "postgres_fe.h" 11 12 #include <sys/wait.h> 13 #ifdef WIN32 14 #include <io.h> 15 #endif 16 17 #include "pg_upgrade.h" 18 19 20 static int parallel_jobs; 21 22 #ifdef WIN32 23 /* 24 * Array holding all active threads. There can't be any gaps/zeros so 25 * it can be passed to WaitForMultipleObjects(). We use two arrays 26 * so the thread_handles array can be passed to WaitForMultipleObjects(). 27 */ 28 HANDLE *thread_handles; 29 30 typedef struct 31 { 32 char *log_file; 33 char *opt_log_file; 34 char *cmd; 35 } exec_thread_arg; 36 37 typedef struct 38 { 39 DbInfoArr *old_db_arr; 40 DbInfoArr *new_db_arr; 41 char *old_pgdata; 42 char *new_pgdata; 43 char *old_tablespace; 44 } transfer_thread_arg; 45 46 exec_thread_arg **exec_thread_args; 47 transfer_thread_arg **transfer_thread_args; 48 49 /* track current thread_args struct so reap_child() can be used for all cases */ 50 void **cur_thread_args; 51 52 DWORD win32_exec_prog(exec_thread_arg *args); 53 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args); 54 #endif 55 56 /* 57 * parallel_exec_prog 58 * 59 * This has the same API as exec_prog, except it does parallel execution, 60 * and therefore must throw errors and doesn't return an error status. 61 */ 62 void 63 parallel_exec_prog(const char *log_file, const char *opt_log_file, 64 const char *fmt,...) 65 { 66 va_list args; 67 char cmd[MAX_STRING]; 68 69 #ifndef WIN32 70 pid_t child; 71 #else 72 HANDLE child; 73 exec_thread_arg *new_arg; 74 #endif 75 76 va_start(args, fmt); 77 vsnprintf(cmd, sizeof(cmd), fmt, args); 78 va_end(args); 79 80 if (user_opts.jobs <= 1) 81 /* exit_on_error must be true to allow jobs */ 82 exec_prog(log_file, opt_log_file, true, true, "%s", cmd); 83 else 84 { 85 /* parallel */ 86 #ifdef WIN32 87 if (thread_handles == NULL) 88 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); 89 90 if (exec_thread_args == NULL) 91 { 92 int i; 93 94 exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *)); 95 96 /* 97 * For safety and performance, we keep the args allocated during 98 * the entire life of the process, and we don't free the args in a 99 * thread different from the one that allocated it. 100 */ 101 for (i = 0; i < user_opts.jobs; i++) 102 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg)); 103 } 104 105 cur_thread_args = (void **) exec_thread_args; 106 #endif 107 /* harvest any dead children */ 108 while (reap_child(false) == true) 109 ; 110 111 /* must we wait for a dead child? */ 112 if (parallel_jobs >= user_opts.jobs) 113 reap_child(true); 114 115 /* set this before we start the job */ 116 parallel_jobs++; 117 118 /* Ensure stdio state is quiesced before forking */ 119 fflush(NULL); 120 121 #ifndef WIN32 122 child = fork(); 123 if (child == 0) 124 /* use _exit to skip atexit() functions */ 125 _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd)); 126 else if (child < 0) 127 /* fork failed */ 128 pg_fatal("could not create worker process: %s\n", strerror(errno)); 129 #else 130 /* empty array element are always at the end */ 131 new_arg = exec_thread_args[parallel_jobs - 1]; 132 133 /* Can only pass one pointer into the function, so use a struct */ 134 if (new_arg->log_file) 135 pg_free(new_arg->log_file); 136 new_arg->log_file = pg_strdup(log_file); 137 if (new_arg->opt_log_file) 138 pg_free(new_arg->opt_log_file); 139 new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL; 140 if (new_arg->cmd) 141 pg_free(new_arg->cmd); 142 new_arg->cmd = pg_strdup(cmd); 143 144 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, 145 new_arg, 0, NULL); 146 if (child == 0) 147 pg_fatal("could not create worker thread: %s\n", strerror(errno)); 148 149 thread_handles[parallel_jobs - 1] = child; 150 #endif 151 } 152 153 return; 154 } 155 156 157 #ifdef WIN32 158 DWORD 159 win32_exec_prog(exec_thread_arg *args) 160 { 161 int ret; 162 163 ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd); 164 165 /* terminates thread */ 166 return ret; 167 } 168 #endif 169 170 171 /* 172 * parallel_transfer_all_new_dbs 173 * 174 * This has the same API as transfer_all_new_dbs, except it does parallel execution 175 * by transferring multiple tablespaces in parallel 176 */ 177 void 178 parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, 179 char *old_pgdata, char *new_pgdata, 180 char *old_tablespace) 181 { 182 #ifndef WIN32 183 pid_t child; 184 #else 185 HANDLE child; 186 transfer_thread_arg *new_arg; 187 #endif 188 189 if (user_opts.jobs <= 1) 190 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL); 191 else 192 { 193 /* parallel */ 194 #ifdef WIN32 195 if (thread_handles == NULL) 196 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); 197 198 if (transfer_thread_args == NULL) 199 { 200 int i; 201 202 transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *)); 203 204 /* 205 * For safety and performance, we keep the args allocated during 206 * the entire life of the process, and we don't free the args in a 207 * thread different from the one that allocated it. 208 */ 209 for (i = 0; i < user_opts.jobs; i++) 210 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg)); 211 } 212 213 cur_thread_args = (void **) transfer_thread_args; 214 #endif 215 /* harvest any dead children */ 216 while (reap_child(false) == true) 217 ; 218 219 /* must we wait for a dead child? */ 220 if (parallel_jobs >= user_opts.jobs) 221 reap_child(true); 222 223 /* set this before we start the job */ 224 parallel_jobs++; 225 226 /* Ensure stdio state is quiesced before forking */ 227 fflush(NULL); 228 229 #ifndef WIN32 230 child = fork(); 231 if (child == 0) 232 { 233 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, 234 old_tablespace); 235 /* if we take another exit path, it will be non-zero */ 236 /* use _exit to skip atexit() functions */ 237 _exit(0); 238 } 239 else if (child < 0) 240 /* fork failed */ 241 pg_fatal("could not create worker process: %s\n", strerror(errno)); 242 #else 243 /* empty array element are always at the end */ 244 new_arg = transfer_thread_args[parallel_jobs - 1]; 245 246 /* Can only pass one pointer into the function, so use a struct */ 247 new_arg->old_db_arr = old_db_arr; 248 new_arg->new_db_arr = new_db_arr; 249 if (new_arg->old_pgdata) 250 pg_free(new_arg->old_pgdata); 251 new_arg->old_pgdata = pg_strdup(old_pgdata); 252 if (new_arg->new_pgdata) 253 pg_free(new_arg->new_pgdata); 254 new_arg->new_pgdata = pg_strdup(new_pgdata); 255 if (new_arg->old_tablespace) 256 pg_free(new_arg->old_tablespace); 257 new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL; 258 259 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs, 260 new_arg, 0, NULL); 261 if (child == 0) 262 pg_fatal("could not create worker thread: %s\n", strerror(errno)); 263 264 thread_handles[parallel_jobs - 1] = child; 265 #endif 266 } 267 268 return; 269 } 270 271 272 #ifdef WIN32 273 DWORD 274 win32_transfer_all_new_dbs(transfer_thread_arg *args) 275 { 276 transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata, 277 args->new_pgdata, args->old_tablespace); 278 279 /* terminates thread */ 280 return 0; 281 } 282 #endif 283 284 285 /* 286 * collect status from a completed worker child 287 */ 288 bool 289 reap_child(bool wait_for_child) 290 { 291 #ifndef WIN32 292 int work_status; 293 pid_t child; 294 #else 295 int thread_num; 296 DWORD res; 297 #endif 298 299 if (user_opts.jobs <= 1 || parallel_jobs == 0) 300 return false; 301 302 #ifndef WIN32 303 child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); 304 if (child == (pid_t) -1) 305 pg_fatal("waitpid() failed: %s\n", strerror(errno)); 306 if (child == 0) 307 return false; /* no children, or no dead children */ 308 if (work_status != 0) 309 pg_fatal("child process exited abnormally: status %d\n", work_status); 310 #else 311 /* wait for one to finish */ 312 thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles, 313 false, wait_for_child ? INFINITE : 0); 314 315 if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED) 316 return false; 317 318 /* compute thread index in active_threads */ 319 thread_num -= WAIT_OBJECT_0; 320 321 /* get the result */ 322 GetExitCodeThread(thread_handles[thread_num], &res); 323 if (res != 0) 324 pg_fatal("child worker exited abnormally: %s\n", strerror(errno)); 325 326 /* dispose of handle to stop leaks */ 327 CloseHandle(thread_handles[thread_num]); 328 329 /* Move last slot into dead child's position */ 330 if (thread_num != parallel_jobs - 1) 331 { 332 void *tmp_args; 333 334 thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; 335 336 /* 337 * Move last active thead arg struct into the now-dead slot, and the 338 * now-dead slot to the end for reuse by the next thread. Though the 339 * thread struct is in use by another thread, we can safely swap the 340 * struct pointers within the array. 341 */ 342 tmp_args = cur_thread_args[thread_num]; 343 cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1]; 344 cur_thread_args[parallel_jobs - 1] = tmp_args; 345 } 346 #endif 347 348 /* do this after job has been removed */ 349 parallel_jobs--; 350 351 return true; 352 } 353