1 /*
2  *	parallel.c
3  *
4  *	multi-process support
5  *
6  *	Copyright (c) 2010-2020, PostgreSQL Global Development Group
7  *	src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include <sys/wait.h>
13 #ifdef WIN32
14 #include <io.h>
15 #endif
16 
17 #include "pg_upgrade.h"
18 
19 static int	parallel_jobs;
20 
21 #ifdef WIN32
22 /*
23  *	Array holding all active threads.  There can't be any gaps/zeros so
24  *	it can be passed to WaitForMultipleObjects().  We use two arrays
25  *	so the thread_handles array can be passed to WaitForMultipleObjects().
26  */
27 HANDLE	   *thread_handles;
28 
29 typedef struct
30 {
31 	char	   *log_file;
32 	char	   *opt_log_file;
33 	char	   *cmd;
34 } exec_thread_arg;
35 
36 typedef struct
37 {
38 	DbInfoArr  *old_db_arr;
39 	DbInfoArr  *new_db_arr;
40 	char	   *old_pgdata;
41 	char	   *new_pgdata;
42 	char	   *old_tablespace;
43 } transfer_thread_arg;
44 
45 exec_thread_arg **exec_thread_args;
46 transfer_thread_arg **transfer_thread_args;
47 
48 /* track current thread_args struct so reap_child() can be used for all cases */
49 void	  **cur_thread_args;
50 
51 DWORD		win32_exec_prog(exec_thread_arg *args);
52 DWORD		win32_transfer_all_new_dbs(transfer_thread_arg *args);
53 #endif
54 
55 /*
56  *	parallel_exec_prog
57  *
58  *	This has the same API as exec_prog, except it does parallel execution,
59  *	and therefore must throw errors and doesn't return an error status.
60  */
61 void
parallel_exec_prog(const char * log_file,const char * opt_log_file,const char * fmt,...)62 parallel_exec_prog(const char *log_file, const char *opt_log_file,
63 				   const char *fmt,...)
64 {
65 	va_list		args;
66 	char		cmd[MAX_STRING];
67 
68 #ifndef WIN32
69 	pid_t		child;
70 #else
71 	HANDLE		child;
72 	exec_thread_arg *new_arg;
73 #endif
74 
75 	va_start(args, fmt);
76 	vsnprintf(cmd, sizeof(cmd), fmt, args);
77 	va_end(args);
78 
79 	if (user_opts.jobs <= 1)
80 		/* exit_on_error must be true to allow jobs */
81 		exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82 	else
83 	{
84 		/* parallel */
85 #ifdef WIN32
86 		if (thread_handles == NULL)
87 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 
89 		if (exec_thread_args == NULL)
90 		{
91 			int			i;
92 
93 			exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 
95 			/*
96 			 * For safety and performance, we keep the args allocated during
97 			 * the entire life of the process, and we don't free the args in a
98 			 * thread different from the one that allocated it.
99 			 */
100 			for (i = 0; i < user_opts.jobs; i++)
101 				exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102 		}
103 
104 		cur_thread_args = (void **) exec_thread_args;
105 #endif
106 		/* harvest any dead children */
107 		while (reap_child(false) == true)
108 			;
109 
110 		/* must we wait for a dead child? */
111 		if (parallel_jobs >= user_opts.jobs)
112 			reap_child(true);
113 
114 		/* set this before we start the job */
115 		parallel_jobs++;
116 
117 		/* Ensure stdio state is quiesced before forking */
118 		fflush(NULL);
119 
120 #ifndef WIN32
121 		child = fork();
122 		if (child == 0)
123 			/* use _exit to skip atexit() functions */
124 			_exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
125 		else if (child < 0)
126 			/* fork failed */
127 			pg_fatal("could not create worker process: %s\n", strerror(errno));
128 #else
129 		/* empty array element are always at the end */
130 		new_arg = exec_thread_args[parallel_jobs - 1];
131 
132 		/* Can only pass one pointer into the function, so use a struct */
133 		if (new_arg->log_file)
134 			pg_free(new_arg->log_file);
135 		new_arg->log_file = pg_strdup(log_file);
136 		if (new_arg->opt_log_file)
137 			pg_free(new_arg->opt_log_file);
138 		new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
139 		if (new_arg->cmd)
140 			pg_free(new_arg->cmd);
141 		new_arg->cmd = pg_strdup(cmd);
142 
143 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
144 										new_arg, 0, NULL);
145 		if (child == 0)
146 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
147 
148 		thread_handles[parallel_jobs - 1] = child;
149 #endif
150 	}
151 }
152 
153 
154 #ifdef WIN32
155 DWORD
win32_exec_prog(exec_thread_arg * args)156 win32_exec_prog(exec_thread_arg *args)
157 {
158 	int			ret;
159 
160 	ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
161 
162 	/* terminates thread */
163 	return ret;
164 }
165 #endif
166 
167 
168 /*
169  *	parallel_transfer_all_new_dbs
170  *
171  *	This has the same API as transfer_all_new_dbs, except it does parallel execution
172  *	by transferring multiple tablespaces in parallel
173  */
174 void
parallel_transfer_all_new_dbs(DbInfoArr * old_db_arr,DbInfoArr * new_db_arr,char * old_pgdata,char * new_pgdata,char * old_tablespace)175 parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
176 							  char *old_pgdata, char *new_pgdata,
177 							  char *old_tablespace)
178 {
179 #ifndef WIN32
180 	pid_t		child;
181 #else
182 	HANDLE		child;
183 	transfer_thread_arg *new_arg;
184 #endif
185 
186 	if (user_opts.jobs <= 1)
187 		transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
188 	else
189 	{
190 		/* parallel */
191 #ifdef WIN32
192 		if (thread_handles == NULL)
193 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
194 
195 		if (transfer_thread_args == NULL)
196 		{
197 			int			i;
198 
199 			transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
200 
201 			/*
202 			 * For safety and performance, we keep the args allocated during
203 			 * the entire life of the process, and we don't free the args in a
204 			 * thread different from the one that allocated it.
205 			 */
206 			for (i = 0; i < user_opts.jobs; i++)
207 				transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
208 		}
209 
210 		cur_thread_args = (void **) transfer_thread_args;
211 #endif
212 		/* harvest any dead children */
213 		while (reap_child(false) == true)
214 			;
215 
216 		/* must we wait for a dead child? */
217 		if (parallel_jobs >= user_opts.jobs)
218 			reap_child(true);
219 
220 		/* set this before we start the job */
221 		parallel_jobs++;
222 
223 		/* Ensure stdio state is quiesced before forking */
224 		fflush(NULL);
225 
226 #ifndef WIN32
227 		child = fork();
228 		if (child == 0)
229 		{
230 			transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
231 								 old_tablespace);
232 			/* if we take another exit path, it will be non-zero */
233 			/* use _exit to skip atexit() functions */
234 			_exit(0);
235 		}
236 		else if (child < 0)
237 			/* fork failed */
238 			pg_fatal("could not create worker process: %s\n", strerror(errno));
239 #else
240 		/* empty array element are always at the end */
241 		new_arg = transfer_thread_args[parallel_jobs - 1];
242 
243 		/* Can only pass one pointer into the function, so use a struct */
244 		new_arg->old_db_arr = old_db_arr;
245 		new_arg->new_db_arr = new_db_arr;
246 		if (new_arg->old_pgdata)
247 			pg_free(new_arg->old_pgdata);
248 		new_arg->old_pgdata = pg_strdup(old_pgdata);
249 		if (new_arg->new_pgdata)
250 			pg_free(new_arg->new_pgdata);
251 		new_arg->new_pgdata = pg_strdup(new_pgdata);
252 		if (new_arg->old_tablespace)
253 			pg_free(new_arg->old_tablespace);
254 		new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
255 
256 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
257 										new_arg, 0, NULL);
258 		if (child == 0)
259 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
260 
261 		thread_handles[parallel_jobs - 1] = child;
262 #endif
263 	}
264 }
265 
266 
267 #ifdef WIN32
268 DWORD
win32_transfer_all_new_dbs(transfer_thread_arg * args)269 win32_transfer_all_new_dbs(transfer_thread_arg *args)
270 {
271 	transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
272 						 args->new_pgdata, args->old_tablespace);
273 
274 	/* terminates thread */
275 	return 0;
276 }
277 #endif
278 
279 
280 /*
281  *	collect status from a completed worker child
282  */
283 bool
reap_child(bool wait_for_child)284 reap_child(bool wait_for_child)
285 {
286 #ifndef WIN32
287 	int			work_status;
288 	pid_t		child;
289 #else
290 	int			thread_num;
291 	DWORD		res;
292 #endif
293 
294 	if (user_opts.jobs <= 1 || parallel_jobs == 0)
295 		return false;
296 
297 #ifndef WIN32
298 	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
299 	if (child == (pid_t) -1)
300 		pg_fatal("waitpid() failed: %s\n", strerror(errno));
301 	if (child == 0)
302 		return false;			/* no children, or no dead children */
303 	if (work_status != 0)
304 		pg_fatal("child process exited abnormally: status %d\n", work_status);
305 #else
306 	/* wait for one to finish */
307 	thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
308 										false, wait_for_child ? INFINITE : 0);
309 
310 	if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
311 		return false;
312 
313 	/* compute thread index in active_threads */
314 	thread_num -= WAIT_OBJECT_0;
315 
316 	/* get the result */
317 	GetExitCodeThread(thread_handles[thread_num], &res);
318 	if (res != 0)
319 		pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
320 
321 	/* dispose of handle to stop leaks */
322 	CloseHandle(thread_handles[thread_num]);
323 
324 	/* Move last slot into dead child's position */
325 	if (thread_num != parallel_jobs - 1)
326 	{
327 		void	   *tmp_args;
328 
329 		thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
330 
331 		/*
332 		 * Move last active thread arg struct into the now-dead slot, and the
333 		 * now-dead slot to the end for reuse by the next thread. Though the
334 		 * thread struct is in use by another thread, we can safely swap the
335 		 * struct pointers within the array.
336 		 */
337 		tmp_args = cur_thread_args[thread_num];
338 		cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
339 		cur_thread_args[parallel_jobs - 1] = tmp_args;
340 	}
341 #endif
342 
343 	/* do this after job has been removed */
344 	parallel_jobs--;
345 
346 	return true;
347 }
348