1 /*
2  *	parallel.c
3  *
4  *	multi-process support
5  *
6  *	Copyright (c) 2010-2016, PostgreSQL Global Development Group
7  *	src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include "pg_upgrade.h"
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/wait.h>
18 
19 #ifdef WIN32
20 #include <io.h>
21 #endif
22 
23 static int	parallel_jobs;
24 
25 #ifdef WIN32
26 /*
27  *	Array holding all active threads.  There can't be any gaps/zeros so
28  *	it can be passed to WaitForMultipleObjects().  We use two arrays
29  *	so the thread_handles array can be passed to WaitForMultipleObjects().
30  */
31 HANDLE	   *thread_handles;
32 
33 typedef struct
34 {
35 	char	   *log_file;
36 	char	   *opt_log_file;
37 	char	   *cmd;
38 } exec_thread_arg;
39 
40 typedef struct
41 {
42 	DbInfoArr  *old_db_arr;
43 	DbInfoArr  *new_db_arr;
44 	char	   *old_pgdata;
45 	char	   *new_pgdata;
46 	char	   *old_tablespace;
47 } transfer_thread_arg;
48 
49 exec_thread_arg **exec_thread_args;
50 transfer_thread_arg **transfer_thread_args;
51 
52 /* track current thread_args struct so reap_child() can be used for all cases */
53 void	  **cur_thread_args;
54 
55 DWORD		win32_exec_prog(exec_thread_arg *args);
56 DWORD		win32_transfer_all_new_dbs(transfer_thread_arg *args);
57 #endif
58 
59 /*
60  *	parallel_exec_prog
61  *
62  *	This has the same API as exec_prog, except it does parallel execution,
63  *	and therefore must throw errors and doesn't return an error status.
64  */
65 void
parallel_exec_prog(const char * log_file,const char * opt_log_file,const char * fmt,...)66 parallel_exec_prog(const char *log_file, const char *opt_log_file,
67 				   const char *fmt,...)
68 {
69 	va_list		args;
70 	char		cmd[MAX_STRING];
71 
72 #ifndef WIN32
73 	pid_t		child;
74 #else
75 	HANDLE		child;
76 	exec_thread_arg *new_arg;
77 #endif
78 
79 	va_start(args, fmt);
80 	vsnprintf(cmd, sizeof(cmd), fmt, args);
81 	va_end(args);
82 
83 	if (user_opts.jobs <= 1)
84 		/* throw_error must be true to allow jobs */
85 		exec_prog(log_file, opt_log_file, true, "%s", cmd);
86 	else
87 	{
88 		/* parallel */
89 #ifdef WIN32
90 		if (thread_handles == NULL)
91 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
92 
93 		if (exec_thread_args == NULL)
94 		{
95 			int			i;
96 
97 			exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
98 
99 			/*
100 			 * For safety and performance, we keep the args allocated during
101 			 * the entire life of the process, and we don't free the args in a
102 			 * thread different from the one that allocated it.
103 			 */
104 			for (i = 0; i < user_opts.jobs; i++)
105 				exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
106 		}
107 
108 		cur_thread_args = (void **) exec_thread_args;
109 #endif
110 		/* harvest any dead children */
111 		while (reap_child(false) == true)
112 			;
113 
114 		/* must we wait for a dead child? */
115 		if (parallel_jobs >= user_opts.jobs)
116 			reap_child(true);
117 
118 		/* set this before we start the job */
119 		parallel_jobs++;
120 
121 		/* Ensure stdio state is quiesced before forking */
122 		fflush(NULL);
123 
124 #ifndef WIN32
125 		child = fork();
126 		if (child == 0)
127 			/* use _exit to skip atexit() functions */
128 			_exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
129 		else if (child < 0)
130 			/* fork failed */
131 			pg_fatal("could not create worker process: %s\n", strerror(errno));
132 #else
133 		/* empty array element are always at the end */
134 		new_arg = exec_thread_args[parallel_jobs - 1];
135 
136 		/* Can only pass one pointer into the function, so use a struct */
137 		if (new_arg->log_file)
138 			pg_free(new_arg->log_file);
139 		new_arg->log_file = pg_strdup(log_file);
140 		if (new_arg->opt_log_file)
141 			pg_free(new_arg->opt_log_file);
142 		new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
143 		if (new_arg->cmd)
144 			pg_free(new_arg->cmd);
145 		new_arg->cmd = pg_strdup(cmd);
146 
147 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
148 										new_arg, 0, NULL);
149 		if (child == 0)
150 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
151 
152 		thread_handles[parallel_jobs - 1] = child;
153 #endif
154 	}
155 
156 	return;
157 }
158 
159 
160 #ifdef WIN32
161 DWORD
win32_exec_prog(exec_thread_arg * args)162 win32_exec_prog(exec_thread_arg *args)
163 {
164 	int			ret;
165 
166 	ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
167 
168 	/* terminates thread */
169 	return ret;
170 }
171 #endif
172 
173 
174 /*
175  *	parallel_transfer_all_new_dbs
176  *
177  *	This has the same API as transfer_all_new_dbs, except it does parallel execution
178  *	by transferring multiple tablespaces in parallel
179  */
180 void
parallel_transfer_all_new_dbs(DbInfoArr * old_db_arr,DbInfoArr * new_db_arr,char * old_pgdata,char * new_pgdata,char * old_tablespace)181 parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
182 							  char *old_pgdata, char *new_pgdata,
183 							  char *old_tablespace)
184 {
185 #ifndef WIN32
186 	pid_t		child;
187 #else
188 	HANDLE		child;
189 	transfer_thread_arg *new_arg;
190 #endif
191 
192 	if (user_opts.jobs <= 1)
193 		/* throw_error must be true to allow jobs */
194 		transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
195 	else
196 	{
197 		/* parallel */
198 #ifdef WIN32
199 		if (thread_handles == NULL)
200 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
201 
202 		if (transfer_thread_args == NULL)
203 		{
204 			int			i;
205 
206 			transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
207 
208 			/*
209 			 * For safety and performance, we keep the args allocated during
210 			 * the entire life of the process, and we don't free the args in a
211 			 * thread different from the one that allocated it.
212 			 */
213 			for (i = 0; i < user_opts.jobs; i++)
214 				transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
215 		}
216 
217 		cur_thread_args = (void **) transfer_thread_args;
218 #endif
219 		/* harvest any dead children */
220 		while (reap_child(false) == true)
221 			;
222 
223 		/* must we wait for a dead child? */
224 		if (parallel_jobs >= user_opts.jobs)
225 			reap_child(true);
226 
227 		/* set this before we start the job */
228 		parallel_jobs++;
229 
230 		/* Ensure stdio state is quiesced before forking */
231 		fflush(NULL);
232 
233 #ifndef WIN32
234 		child = fork();
235 		if (child == 0)
236 		{
237 			transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
238 								 old_tablespace);
239 			/* if we take another exit path, it will be non-zero */
240 			/* use _exit to skip atexit() functions */
241 			_exit(0);
242 		}
243 		else if (child < 0)
244 			/* fork failed */
245 			pg_fatal("could not create worker process: %s\n", strerror(errno));
246 #else
247 		/* empty array element are always at the end */
248 		new_arg = transfer_thread_args[parallel_jobs - 1];
249 
250 		/* Can only pass one pointer into the function, so use a struct */
251 		new_arg->old_db_arr = old_db_arr;
252 		new_arg->new_db_arr = new_db_arr;
253 		if (new_arg->old_pgdata)
254 			pg_free(new_arg->old_pgdata);
255 		new_arg->old_pgdata = pg_strdup(old_pgdata);
256 		if (new_arg->new_pgdata)
257 			pg_free(new_arg->new_pgdata);
258 		new_arg->new_pgdata = pg_strdup(new_pgdata);
259 		if (new_arg->old_tablespace)
260 			pg_free(new_arg->old_tablespace);
261 		new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
262 
263 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
264 										new_arg, 0, NULL);
265 		if (child == 0)
266 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
267 
268 		thread_handles[parallel_jobs - 1] = child;
269 #endif
270 	}
271 
272 	return;
273 }
274 
275 
276 #ifdef WIN32
277 DWORD
win32_transfer_all_new_dbs(transfer_thread_arg * args)278 win32_transfer_all_new_dbs(transfer_thread_arg *args)
279 {
280 	transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
281 						 args->new_pgdata, args->old_tablespace);
282 
283 	/* terminates thread */
284 	return 0;
285 }
286 #endif
287 
288 
289 /*
290  *	collect status from a completed worker child
291  */
292 bool
reap_child(bool wait_for_child)293 reap_child(bool wait_for_child)
294 {
295 #ifndef WIN32
296 	int			work_status;
297 	pid_t		child;
298 #else
299 	int			thread_num;
300 	DWORD		res;
301 #endif
302 
303 	if (user_opts.jobs <= 1 || parallel_jobs == 0)
304 		return false;
305 
306 #ifndef WIN32
307 	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
308 	if (child == (pid_t) -1)
309 		pg_fatal("waitpid() failed: %s\n", strerror(errno));
310 	if (child == 0)
311 		return false;			/* no children, or no dead children */
312 	if (work_status != 0)
313 		pg_fatal("child process exited abnormally: status %d\n", work_status);
314 #else
315 	/* wait for one to finish */
316 	thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
317 										false, wait_for_child ? INFINITE : 0);
318 
319 	if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
320 		return false;
321 
322 	/* compute thread index in active_threads */
323 	thread_num -= WAIT_OBJECT_0;
324 
325 	/* get the result */
326 	GetExitCodeThread(thread_handles[thread_num], &res);
327 	if (res != 0)
328 		pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
329 
330 	/* dispose of handle to stop leaks */
331 	CloseHandle(thread_handles[thread_num]);
332 
333 	/* Move last slot into dead child's position */
334 	if (thread_num != parallel_jobs - 1)
335 	{
336 		void	   *tmp_args;
337 
338 		thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
339 
340 		/*
341 		 * Move last active thead arg struct into the now-dead slot, and the
342 		 * now-dead slot to the end for reuse by the next thread. Though the
343 		 * thread struct is in use by another thread, we can safely swap the
344 		 * struct pointers within the array.
345 		 */
346 		tmp_args = cur_thread_args[thread_num];
347 		cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
348 		cur_thread_args[parallel_jobs - 1] = tmp_args;
349 	}
350 #endif
351 
352 	/* do this after job has been removed */
353 	parallel_jobs--;
354 
355 	return true;
356 }
357