1 /*
2  *	parallel.c
3  *
4  *	multi-process support
5  *
6  *	Copyright (c) 2010-2018, PostgreSQL Global Development Group
7  *	src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include <sys/wait.h>
13 #ifdef WIN32
14 #include <io.h>
15 #endif
16 
17 #include "pg_upgrade.h"
18 
19 
20 static int	parallel_jobs;
21 
22 #ifdef WIN32
23 /*
24  *	Array holding all active threads.  There can't be any gaps/zeros so
25  *	it can be passed to WaitForMultipleObjects().  We use two arrays
26  *	so the thread_handles array can be passed to WaitForMultipleObjects().
27  */
28 HANDLE	   *thread_handles;
29 
30 typedef struct
31 {
32 	char	   *log_file;
33 	char	   *opt_log_file;
34 	char	   *cmd;
35 } exec_thread_arg;
36 
37 typedef struct
38 {
39 	DbInfoArr  *old_db_arr;
40 	DbInfoArr  *new_db_arr;
41 	char	   *old_pgdata;
42 	char	   *new_pgdata;
43 	char	   *old_tablespace;
44 } transfer_thread_arg;
45 
46 exec_thread_arg **exec_thread_args;
47 transfer_thread_arg **transfer_thread_args;
48 
49 /* track current thread_args struct so reap_child() can be used for all cases */
50 void	  **cur_thread_args;
51 
52 DWORD		win32_exec_prog(exec_thread_arg *args);
53 DWORD		win32_transfer_all_new_dbs(transfer_thread_arg *args);
54 #endif
55 
56 /*
57  *	parallel_exec_prog
58  *
59  *	This has the same API as exec_prog, except it does parallel execution,
60  *	and therefore must throw errors and doesn't return an error status.
61  */
62 void
parallel_exec_prog(const char * log_file,const char * opt_log_file,const char * fmt,...)63 parallel_exec_prog(const char *log_file, const char *opt_log_file,
64 				   const char *fmt,...)
65 {
66 	va_list		args;
67 	char		cmd[MAX_STRING];
68 
69 #ifndef WIN32
70 	pid_t		child;
71 #else
72 	HANDLE		child;
73 	exec_thread_arg *new_arg;
74 #endif
75 
76 	va_start(args, fmt);
77 	vsnprintf(cmd, sizeof(cmd), fmt, args);
78 	va_end(args);
79 
80 	if (user_opts.jobs <= 1)
81 		/* exit_on_error must be true to allow jobs */
82 		exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
83 	else
84 	{
85 		/* parallel */
86 #ifdef WIN32
87 		if (thread_handles == NULL)
88 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
89 
90 		if (exec_thread_args == NULL)
91 		{
92 			int			i;
93 
94 			exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
95 
96 			/*
97 			 * For safety and performance, we keep the args allocated during
98 			 * the entire life of the process, and we don't free the args in a
99 			 * thread different from the one that allocated it.
100 			 */
101 			for (i = 0; i < user_opts.jobs; i++)
102 				exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
103 		}
104 
105 		cur_thread_args = (void **) exec_thread_args;
106 #endif
107 		/* harvest any dead children */
108 		while (reap_child(false) == true)
109 			;
110 
111 		/* must we wait for a dead child? */
112 		if (parallel_jobs >= user_opts.jobs)
113 			reap_child(true);
114 
115 		/* set this before we start the job */
116 		parallel_jobs++;
117 
118 		/* Ensure stdio state is quiesced before forking */
119 		fflush(NULL);
120 
121 #ifndef WIN32
122 		child = fork();
123 		if (child == 0)
124 			/* use _exit to skip atexit() functions */
125 			_exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
126 		else if (child < 0)
127 			/* fork failed */
128 			pg_fatal("could not create worker process: %s\n", strerror(errno));
129 #else
130 		/* empty array element are always at the end */
131 		new_arg = exec_thread_args[parallel_jobs - 1];
132 
133 		/* Can only pass one pointer into the function, so use a struct */
134 		if (new_arg->log_file)
135 			pg_free(new_arg->log_file);
136 		new_arg->log_file = pg_strdup(log_file);
137 		if (new_arg->opt_log_file)
138 			pg_free(new_arg->opt_log_file);
139 		new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
140 		if (new_arg->cmd)
141 			pg_free(new_arg->cmd);
142 		new_arg->cmd = pg_strdup(cmd);
143 
144 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
145 										new_arg, 0, NULL);
146 		if (child == 0)
147 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
148 
149 		thread_handles[parallel_jobs - 1] = child;
150 #endif
151 	}
152 
153 	return;
154 }
155 
156 
157 #ifdef WIN32
158 DWORD
win32_exec_prog(exec_thread_arg * args)159 win32_exec_prog(exec_thread_arg *args)
160 {
161 	int			ret;
162 
163 	ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
164 
165 	/* terminates thread */
166 	return ret;
167 }
168 #endif
169 
170 
171 /*
172  *	parallel_transfer_all_new_dbs
173  *
174  *	This has the same API as transfer_all_new_dbs, except it does parallel execution
175  *	by transferring multiple tablespaces in parallel
176  */
177 void
parallel_transfer_all_new_dbs(DbInfoArr * old_db_arr,DbInfoArr * new_db_arr,char * old_pgdata,char * new_pgdata,char * old_tablespace)178 parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
179 							  char *old_pgdata, char *new_pgdata,
180 							  char *old_tablespace)
181 {
182 #ifndef WIN32
183 	pid_t		child;
184 #else
185 	HANDLE		child;
186 	transfer_thread_arg *new_arg;
187 #endif
188 
189 	if (user_opts.jobs <= 1)
190 		transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
191 	else
192 	{
193 		/* parallel */
194 #ifdef WIN32
195 		if (thread_handles == NULL)
196 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
197 
198 		if (transfer_thread_args == NULL)
199 		{
200 			int			i;
201 
202 			transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
203 
204 			/*
205 			 * For safety and performance, we keep the args allocated during
206 			 * the entire life of the process, and we don't free the args in a
207 			 * thread different from the one that allocated it.
208 			 */
209 			for (i = 0; i < user_opts.jobs; i++)
210 				transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
211 		}
212 
213 		cur_thread_args = (void **) transfer_thread_args;
214 #endif
215 		/* harvest any dead children */
216 		while (reap_child(false) == true)
217 			;
218 
219 		/* must we wait for a dead child? */
220 		if (parallel_jobs >= user_opts.jobs)
221 			reap_child(true);
222 
223 		/* set this before we start the job */
224 		parallel_jobs++;
225 
226 		/* Ensure stdio state is quiesced before forking */
227 		fflush(NULL);
228 
229 #ifndef WIN32
230 		child = fork();
231 		if (child == 0)
232 		{
233 			transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
234 								 old_tablespace);
235 			/* if we take another exit path, it will be non-zero */
236 			/* use _exit to skip atexit() functions */
237 			_exit(0);
238 		}
239 		else if (child < 0)
240 			/* fork failed */
241 			pg_fatal("could not create worker process: %s\n", strerror(errno));
242 #else
243 		/* empty array element are always at the end */
244 		new_arg = transfer_thread_args[parallel_jobs - 1];
245 
246 		/* Can only pass one pointer into the function, so use a struct */
247 		new_arg->old_db_arr = old_db_arr;
248 		new_arg->new_db_arr = new_db_arr;
249 		if (new_arg->old_pgdata)
250 			pg_free(new_arg->old_pgdata);
251 		new_arg->old_pgdata = pg_strdup(old_pgdata);
252 		if (new_arg->new_pgdata)
253 			pg_free(new_arg->new_pgdata);
254 		new_arg->new_pgdata = pg_strdup(new_pgdata);
255 		if (new_arg->old_tablespace)
256 			pg_free(new_arg->old_tablespace);
257 		new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
258 
259 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
260 										new_arg, 0, NULL);
261 		if (child == 0)
262 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
263 
264 		thread_handles[parallel_jobs - 1] = child;
265 #endif
266 	}
267 
268 	return;
269 }
270 
271 
272 #ifdef WIN32
273 DWORD
win32_transfer_all_new_dbs(transfer_thread_arg * args)274 win32_transfer_all_new_dbs(transfer_thread_arg *args)
275 {
276 	transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
277 						 args->new_pgdata, args->old_tablespace);
278 
279 	/* terminates thread */
280 	return 0;
281 }
282 #endif
283 
284 
285 /*
286  *	collect status from a completed worker child
287  */
288 bool
reap_child(bool wait_for_child)289 reap_child(bool wait_for_child)
290 {
291 #ifndef WIN32
292 	int			work_status;
293 	pid_t		child;
294 #else
295 	int			thread_num;
296 	DWORD		res;
297 #endif
298 
299 	if (user_opts.jobs <= 1 || parallel_jobs == 0)
300 		return false;
301 
302 #ifndef WIN32
303 	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
304 	if (child == (pid_t) -1)
305 		pg_fatal("waitpid() failed: %s\n", strerror(errno));
306 	if (child == 0)
307 		return false;			/* no children, or no dead children */
308 	if (work_status != 0)
309 		pg_fatal("child process exited abnormally: status %d\n", work_status);
310 #else
311 	/* wait for one to finish */
312 	thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
313 										false, wait_for_child ? INFINITE : 0);
314 
315 	if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
316 		return false;
317 
318 	/* compute thread index in active_threads */
319 	thread_num -= WAIT_OBJECT_0;
320 
321 	/* get the result */
322 	GetExitCodeThread(thread_handles[thread_num], &res);
323 	if (res != 0)
324 		pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
325 
326 	/* dispose of handle to stop leaks */
327 	CloseHandle(thread_handles[thread_num]);
328 
329 	/* Move last slot into dead child's position */
330 	if (thread_num != parallel_jobs - 1)
331 	{
332 		void	   *tmp_args;
333 
334 		thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
335 
336 		/*
337 		 * Move last active thead arg struct into the now-dead slot, and the
338 		 * now-dead slot to the end for reuse by the next thread. Though the
339 		 * thread struct is in use by another thread, we can safely swap the
340 		 * struct pointers within the array.
341 		 */
342 		tmp_args = cur_thread_args[thread_num];
343 		cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
344 		cur_thread_args[parallel_jobs - 1] = tmp_args;
345 	}
346 #endif
347 
348 	/* do this after job has been removed */
349 	parallel_jobs--;
350 
351 	return true;
352 }
353