1 /* ----------------------------------------------------------------------
2  * scheduler.c
3  *
4  *	Event scheduling subsystem for slon.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  * ----------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 
24 #ifndef WIN32
25 #include <unistd.h>
26 #include <sys/time.h>
27 #endif
28 
29 #include "slon.h"
30 
31 
32 /*
33  * If PF_LOCAL is not defined, use the old BSD name PF_UNIX
34  */
35 #ifndef PF_LOCAL
36 #define PF_LOCAL PF_UNIX
37 #endif
38 
39 /* ----------
40  * Static data
41  * ----------
42  */
43 static ScheduleStatus sched_status = SCHED_STATUS_OK;
44 
45 static int	sched_numfd = 0;
46 static fd_set sched_fdset_read;
47 static fd_set sched_fdset_write;
48 static SlonConn *sched_waitqueue_head = NULL;
49 static SlonConn *sched_waitqueue_tail = NULL;
50 
51 static pthread_t sched_main_thread;
52 static pthread_t sched_scheduler_thread;
53 
54 static pthread_mutex_t sched_master_lock;
55 static pthread_cond_t sched_master_cond;
56 
57 
58 /* ----------
59  * Local functions
60  * ----------
61  */
62 static void *sched_mainloop(void *);
63 static void sched_add_fdset(int fd, fd_set *fds);
64 static void sched_remove_fdset(int fd, fd_set *fds);
65 
66 /* static void sched_shutdown(); */
67 
68 
69 /* ----------
70  * sched_start_mainloop
71  *
72  * Called from SlonMain() before starting up any worker thread.
73  *
74  * This will spawn the event scheduling thread that does the central select(2)
75  * system call.
76  * ----------
77  */
78 int
sched_start_mainloop(void)79 sched_start_mainloop(void)
80 {
81 	sched_status = SCHED_STATUS_OK;
82 	sched_waitqueue_head = NULL;
83 	sched_waitqueue_tail = NULL;
84 	sched_numfd = 0;
85 	FD_ZERO(&sched_fdset_read);
86 	FD_ZERO(&sched_fdset_write);
87 
88 	/*
89 	 * Remember the main threads identifier
90 	 */
91 	sched_main_thread = pthread_self();
92 
93 	/*
94 	 * Initialize the master lock and condition variables
95 	 */
96 	if (pthread_mutex_init(&sched_master_lock, NULL) < 0)
97 	{
98 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_init() - %s\n",
99 				 strerror(errno));
100 		return -1;
101 	}
102 	if (pthread_cond_init(&sched_master_cond, NULL) < 0)
103 	{
104 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_init() - %s\n",
105 				 strerror(errno));
106 		return -1;
107 	}
108 
109 	/*
110 	 * Grab the scheduler master lock
111 	 */
112 	if (pthread_mutex_lock(&sched_master_lock) < 0)
113 	{
114 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_lock() - %s\n",
115 				 strerror(errno));
116 		return -1;
117 	}
118 
119 	/*
120 	 * Start the scheduler thread
121 	 */
122 	if (pthread_create(&sched_scheduler_thread, NULL, sched_mainloop, NULL) < 0)
123 	{
124 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_create() - %s\n",
125 				 strerror(errno));
126 		return -1;
127 	}
128 
129 	/*
130 	 * When the scheduler is ready, he'll signal the scheduler cond
131 	 */
132 	if (pthread_cond_wait(&sched_master_cond, &sched_master_lock) < 0)
133 	{
134 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_wait() - %s\n",
135 				 strerror(errno));
136 		return -1;
137 	}
138 
139 	/*
140 	 * Release the scheduler lock
141 	 */
142 	if (pthread_mutex_unlock(&sched_master_lock) < 0)
143 	{
144 		slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_unlock() - %s\n",
145 				 strerror(errno));
146 		return -1;
147 	}
148 
149 	/*
150 	 * Check for errors
151 	 */
152 	if (sched_status != SCHED_STATUS_OK)
153 		return -1;
154 
155 	/*
156 	 * Scheduler started successfully
157 	 */
158 	return 0;
159 }
160 
161 
162 /* ----------
163  * sched_wait_mainloop
164  *
165  * Called from main() after all working threads according to the initial
166  * configuration are started. Will wait until the scheduler mainloop
167  * terminates.
168  * ----------
169  */
170 int
sched_wait_mainloop(void)171 sched_wait_mainloop(void)
172 {
173 	/*
174 	 * Wait for the scheduler to finish.
175 	 */
176 	if (pthread_join(sched_scheduler_thread, NULL) < 0)
177 	{
178 		perror("sched_wait_mainloop: pthread_join()");
179 		return -1;
180 	}
181 	return 0;
182 }
183 
184 
185 /* ----------
186  * sched_wait_conn
187  *
188  * Assumes that the thread holds the lock on conn->conn_lock.
189  *
190  * Adds the connection to the central wait queue and wakes up the scheduler
191  * thread to reloop onto the select(2) call.
192  * ----------
193  */
194 int
sched_wait_conn(SlonConn * conn,int condition)195 sched_wait_conn(SlonConn * conn, int condition)
196 {
197 	ScheduleStatus rc;
198 	int			fds;
199 
200 	/*
201 	 * Grab the master lock and check that we're in normal runmode
202 	 */
203 	pthread_mutex_lock(&sched_master_lock);
204 	if (sched_status != SCHED_STATUS_OK)
205 	{
206 		pthread_mutex_unlock(&sched_master_lock);
207 		return -1;
208 	}
209 
210 	/*
211 	 * Remember the event we're waiting for and add the database connection to
212 	 * the fdset(s)
213 	 */
214 	conn->condition = condition;
215 	if (condition & SCHED_WAIT_SOCK_READ)
216 	{
217 		fds = PQsocket(conn->dbconn);
218 		sched_add_fdset(fds, &sched_fdset_read);
219 	}
220 	if (condition & SCHED_WAIT_SOCK_WRITE)
221 	{
222 		fds = PQsocket(conn->dbconn);
223 		sched_add_fdset(fds, &sched_fdset_write);
224 	}
225 
226 	/*
227 	 * Add the connection to the wait queue
228 	 */
229 	DLLIST_ADD_HEAD(sched_waitqueue_head, sched_waitqueue_tail, conn);
230 
231 	/*
232 	 * Give the scheduler thread a heads up, release the master lock and wait
233 	 * for it to tell us that the event we're waiting for happened.
234 	 */
235 	if (pipewrite(sched_wakeuppipe[1], "x", 1) < 0)
236 	{
237 		perror("sched_wait_conn: write()");
238 		exit(-1);
239 	}
240 	pthread_mutex_unlock(&sched_master_lock);
241 	pthread_cond_wait(&(conn->conn_cond), &(conn->conn_lock));
242 
243 	/*
244 	 * Determine the return code
245 	 */
246 	pthread_mutex_lock(&sched_master_lock);
247 	if ((rc = sched_status) == SCHED_STATUS_OK)
248 	{
249 		if (conn->condition & SCHED_WAIT_CANCEL)
250 		{
251 			conn->condition &= ~(SCHED_WAIT_CANCEL);
252 			rc = SCHED_STATUS_CANCEL;
253 		}
254 	}
255 	pthread_mutex_unlock(&sched_master_lock);
256 
257 	return rc;
258 }
259 
260 
261 /* ----------
262  * sched_wait_time
263  *
264  * Assumes that the thread holds the lock on conn->conn_lock.
265  *
266  * Like sched_wait_conn() but with a timeout. Can be called without any
267  * read/write condition to wait for to resemble a pure timeout mechanism.
268  * ----------
269  */
270 int
sched_wait_time(SlonConn * conn,int condition,int msec)271 sched_wait_time(SlonConn * conn, int condition, int msec)
272 {
273 	struct timeval *tv = &(conn->timeout);
274 
275 	/*
276 	 * Calculate the end-time of the desired timeout.
277 	 */
278 	gettimeofday(tv, NULL);
279 	tv->tv_sec += (long) (msec / 1000) +
280 		(((msec % 1000) * 1000) + tv->tv_usec) / 1000000;
281 	tv->tv_usec = (tv->tv_usec + (msec % 1000) * 1000) % 1000000;
282 
283 	/*
284 	 * Let sched_wait_conn() do the rest.
285 	 */
286 	return sched_wait_conn(conn, condition | SCHED_WAIT_TIMEOUT);
287 }
288 
289 
290 /* ----------
291  * sched_msleep
292  *
293  * Use the schedulers event loop to sleep for msec milliseconds.
294  * ----------
295  */
296 int
sched_msleep(SlonNode * node,int msec)297 sched_msleep(SlonNode * node, int msec)
298 {
299 	SlonConn   *conn;
300 	char		dummyconn_name[64];
301 	int			rc;
302 
303 	if (node)
304 	{
305 		snprintf(dummyconn_name, 64, "msleep_node_%d", node->no_id);
306 		conn = slon_make_dummyconn(dummyconn_name);
307 	}
308 	else
309 		conn = slon_make_dummyconn("msleep_local");
310 
311 	rc = sched_wait_time(conn, 0, msec);
312 	slon_free_dummyconn(conn);
313 
314 	return rc;
315 }
316 
317 
318 /* ----------
319  * sched_get_status
320  *
321  * Return the current scheduler status in a thread safe fashion
322  * ----------
323  */
324 int
sched_get_status(void)325 sched_get_status(void)
326 {
327 	ScheduleStatus status;
328 
329 	pthread_mutex_lock(&sched_master_lock);
330 	status = sched_status;
331 	pthread_mutex_unlock(&sched_master_lock);
332 	return status;
333 }
334 
335 
336 /* ----------
337  * sched_wakeup_node
338  *
339  * Wakeup the threads (listen and worker) of one or all remote nodes to cause
340  * them rechecking the current runtime status or adjust their configuration
341  * to changes.
342  * ----------
343  */
344 int
sched_wakeup_node(int no_id)345 sched_wakeup_node(int no_id)
346 {
347 	SlonConn   *conn;
348 	int			num_wakeup = 0;
349 
350 	pthread_mutex_lock(&sched_master_lock);
351 
352 	/*
353 	 * Set all waiters that belong to that node to cancel
354 	 */
355 	for (conn = sched_waitqueue_head; conn; conn = conn->next)
356 	{
357 		if (conn->node != NULL)
358 		{
359 			if (no_id < 0 || conn->node->no_id == no_id)
360 			{
361 				conn->condition |= SCHED_WAIT_CANCEL;
362 				num_wakeup++;
363 			}
364 		}
365 	}
366 
367 	/*
368 	 * Give the scheduler thread a heads up if some wait was canceled;
369 	 */
370 	if (num_wakeup > 0)
371 	{
372 		if (pipewrite(sched_wakeuppipe[1], "x", 1) < 0)
373 		{
374 			perror("sched_wait_conn: write()");
375 			slon_restart();
376 		}
377 	}
378 	pthread_mutex_unlock(&sched_master_lock);
379 
380 	remoteWorker_wakeup(no_id);
381 
382 	slon_log(SLON_DEBUG2, "sched_wakeup_node(): no_id=%d "
383 			 "(%d threads + worker signaled)\n", no_id, num_wakeup);
384 
385 	return num_wakeup;
386 }
387 
388 
389 /* ----------
390  * sched_mainloop
391  *
392  * The thread handling the master scheduling.
393  * ----------
394  */
395 static void *
sched_mainloop(void * dummy)396 sched_mainloop(void *dummy)
397 {
398 	fd_set		rfds;
399 	fd_set		wfds;
400 	int			rc;
401 	SlonConn   *conn;
402 	SlonConn   *next;
403 	struct timeval min_timeout;
404 	struct timeval *tv;
405 	int			i;
406 
407 	/*
408 	 * Grab the scheduler master lock. This will wait until the main thread
409 	 * acutally blocks on the master cond.
410 	 */
411 	pthread_mutex_lock(&sched_master_lock);
412 
413 	/*
414 	 * Initialize the fdsets for select(2)
415 	 */
416 	FD_ZERO(&sched_fdset_read);
417 	FD_ZERO(&sched_fdset_write);
418 
419 	sched_add_fdset(sched_wakeuppipe[0], &sched_fdset_read);
420 
421 	/*
422 	 * Done with all initialization. Let the main thread go ahead and get
423 	 * everyone else dancing.
424 	 */
425 	pthread_cond_signal(&sched_master_cond);
426 
427 	/*
428 	 * And we are now entering the endless loop of scheduling events
429 	 */
430 	while (sched_status == SCHED_STATUS_OK)
431 	{
432 		struct timeval now;
433 		struct timeval timeout;
434 
435 		/*
436 		 * Check if any of the connections in the wait queue have reached
437 		 * their timeout. While doing so, we also remember the closest timeout
438 		 * in the future.
439 		 */
440 		tv = NULL;
441 		gettimeofday(&now, NULL);
442 		for (conn = sched_waitqueue_head; conn;)
443 		{
444 			next = conn->next;
445 
446 			if (conn->condition & SCHED_WAIT_CANCEL)
447 			{
448 				/*
449 				 * Some other thread wants this thread to wake up.
450 				 */
451 				DLLIST_REMOVE(sched_waitqueue_head,
452 							  sched_waitqueue_tail, conn);
453 
454 				if (conn->condition & SCHED_WAIT_SOCK_READ)
455 					sched_remove_fdset(PQsocket(conn->dbconn),
456 									   &sched_fdset_read);
457 				if (conn->condition & SCHED_WAIT_SOCK_WRITE)
458 					sched_remove_fdset(PQsocket(conn->dbconn),
459 									   &sched_fdset_write);
460 
461 				pthread_mutex_lock(&(conn->conn_lock));
462 				pthread_cond_signal(&(conn->conn_cond));
463 				pthread_mutex_unlock(&(conn->conn_lock));
464 
465 				conn = next;
466 				continue;
467 			}
468 			if (conn->condition & SCHED_WAIT_TIMEOUT)
469 			{
470 				/*
471 				 * This connection has a timeout. Calculate the time until
472 				 * that.
473 				 */
474 				timeout.tv_sec = conn->timeout.tv_sec - now.tv_sec;
475 				timeout.tv_usec = conn->timeout.tv_usec - now.tv_usec;
476 				while (timeout.tv_usec < 0)
477 				{
478 					timeout.tv_sec--;
479 					timeout.tv_usec += 1000000;
480 				}
481 
482 				/*
483 				 * Check if the timeout has elapsed
484 				 */
485 				if (timeout.tv_sec < 0 ||
486 					(timeout.tv_sec == 0 && timeout.tv_usec < 20000))
487 				{
488 					/*
489 					 * Remove the connection from the wait queue. We consider
490 					 * everything closer than 20 msec being elapsed to avoid a
491 					 * full scheduler round just for one kernel tick.
492 					 */
493 					DLLIST_REMOVE(sched_waitqueue_head,
494 								  sched_waitqueue_tail, conn);
495 
496 					if (conn->condition & SCHED_WAIT_SOCK_READ)
497 						sched_remove_fdset(PQsocket(conn->dbconn),
498 										   &sched_fdset_read);
499 					if (conn->condition & SCHED_WAIT_SOCK_WRITE)
500 						sched_remove_fdset(PQsocket(conn->dbconn),
501 										   &sched_fdset_write);
502 
503 					pthread_mutex_lock(&(conn->conn_lock));
504 					pthread_cond_signal(&(conn->conn_cond));
505 					pthread_mutex_unlock(&(conn->conn_lock));
506 				}
507 				else
508 				{
509 					/*
510 					 * Timeout not elapsed. Remember the nearest.
511 					 */
512 					if (tv == NULL ||
513 						timeout.tv_sec < min_timeout.tv_sec ||
514 						(timeout.tv_sec == min_timeout.tv_sec &&
515 						 timeout.tv_usec < min_timeout.tv_usec))
516 					{
517 						tv = &min_timeout;
518 						min_timeout.tv_sec = timeout.tv_sec;
519 						min_timeout.tv_usec = timeout.tv_usec;
520 					}
521 				}
522 			}
523 			conn = next;
524 		}
525 
526 		/*
527 		 * Make copies of the file descriptor sets for select(2)
528 		 */
529 		FD_ZERO(&rfds);
530 		FD_ZERO(&wfds);
531 		for (i = 0; i < sched_numfd; i++)
532 		{
533 			if (FD_ISSET(i, &sched_fdset_read))
534 				FD_SET(i, &rfds);
535 			if (FD_ISSET(i, &sched_fdset_write))
536 				FD_SET(i, &wfds);
537 		}
538 
539 		/*
540 		 * Do the select(2) while unlocking the master lock.
541 		 */
542 		pthread_mutex_unlock(&sched_master_lock);
543 		rc = select(sched_numfd, &rfds, &wfds, NULL, tv);
544 		pthread_mutex_lock(&sched_master_lock);
545 
546 		/*
547 		 * Check for errors
548 		 */
549 		if (rc < 0)
550 		{
551 			perror("sched_mainloop: select()");
552 			sched_status = SCHED_STATUS_ERROR;
553 			break;
554 		}
555 
556 		/*
557 		 * Check the special pipe for a heads up.
558 		 */
559 		if (FD_ISSET(sched_wakeuppipe[0], &rfds))
560 		{
561 			char		buf[1];
562 
563 			rc--;
564 			if (piperead(sched_wakeuppipe[0], buf, 1) != 1)
565 			{
566 				perror("sched_mainloop: read()");
567 				sched_status = SCHED_STATUS_ERROR;
568 				break;
569 			}
570 
571 			if (buf[0] == 'p')
572 			{
573 				sched_status = SCHED_STATUS_SHUTDOWN;
574 			}
575 		}
576 
577 		/*
578 		 * Check all remaining connections if the IO condition the thread is
579 		 * waiting for has occured.
580 		 */
581 		conn = sched_waitqueue_head;
582 		while (rc > 0 && conn)
583 		{
584 			int			fd_check = PQsocket(conn->dbconn);
585 
586 			if (conn->condition & SCHED_WAIT_SOCK_READ)
587 			{
588 
589 
590 				if (fd_check >= 0 && FD_ISSET(fd_check, &rfds))
591 				{
592 					next = conn->next;
593 
594 					pthread_mutex_lock(&(conn->conn_lock));
595 					pthread_cond_signal(&(conn->conn_cond));
596 					pthread_mutex_unlock(&(conn->conn_lock));
597 					rc--;
598 
599 					DLLIST_REMOVE(sched_waitqueue_head,
600 								  sched_waitqueue_tail, conn);
601 
602 					if (conn->condition & SCHED_WAIT_SOCK_READ)
603 						sched_remove_fdset(PQsocket(conn->dbconn),
604 										   &sched_fdset_read);
605 					if (conn->condition & SCHED_WAIT_SOCK_WRITE)
606 						sched_remove_fdset(PQsocket(conn->dbconn),
607 										   &sched_fdset_write);
608 
609 					conn = next;
610 					continue;
611 				}
612 			}
613 			if (conn->condition & SCHED_WAIT_SOCK_WRITE)
614 			{
615 				if (fd_check >= 0 && FD_ISSET(fd_check, &wfds))
616 				{
617 					next = conn->next;
618 
619 					pthread_mutex_lock(&(conn->conn_lock));
620 					pthread_cond_signal(&(conn->conn_cond));
621 					pthread_mutex_unlock(&(conn->conn_lock));
622 					rc--;
623 
624 					DLLIST_REMOVE(sched_waitqueue_head,
625 								  sched_waitqueue_tail, conn);
626 
627 					if (conn->condition & SCHED_WAIT_SOCK_READ)
628 						sched_remove_fdset(PQsocket(conn->dbconn),
629 										   &sched_fdset_read);
630 					if (conn->condition & SCHED_WAIT_SOCK_WRITE)
631 						sched_remove_fdset(PQsocket(conn->dbconn),
632 										   &sched_fdset_write);
633 
634 					conn = next;
635 					continue;
636 				}
637 			}
638 			conn = conn->next;
639 		}
640 	}
641 
642 	/*
643 	 * If we reach here the scheduler runmode has been changed by by the main
644 	 * threads signal handler. We currently hold the master lock. First we
645 	 * close the scheduler heads-up socket pair so nobody will think we're
646 	 * listening any longer.
647 	 */
648 
649 	/*
650 	 * close(sched_wakeuppipe[0]); sched_wakeuppipe[0] = -1;
651 	 * close(sched_wakeuppipe[1]); sched_wakeuppipe[1] = -1;
652 	 */
653 
654 	/*
655 	 * Then we cond_signal all connections that are in the queue.
656 	 */
657 	for (conn = sched_waitqueue_head; conn;)
658 	{
659 		next = conn->next;
660 
661 		pthread_mutex_lock(&(conn->conn_lock));
662 		pthread_cond_signal(&(conn->conn_cond));
663 		pthread_mutex_unlock(&(conn->conn_lock));
664 
665 		DLLIST_REMOVE(sched_waitqueue_head,
666 					  sched_waitqueue_tail, conn);
667 
668 		if (conn->condition & SCHED_WAIT_SOCK_READ)
669 			sched_remove_fdset(PQsocket(conn->dbconn),
670 							   &sched_fdset_read);
671 		if (conn->condition & SCHED_WAIT_SOCK_WRITE)
672 			sched_remove_fdset(PQsocket(conn->dbconn),
673 							   &sched_fdset_write);
674 
675 		conn = next;
676 	}
677 
678 	/*
679 	 * Release the master lock and terminate the scheduler thread.
680 	 */
681 	pthread_mutex_unlock(&sched_master_lock);
682 	pthread_exit(NULL);
683 }
684 
685 
686 /* ----------
687  * sched_add_fdset
688  *
689  * Add a file descriptor to one of the global scheduler sets and adjust
690  * sched_numfd accordingly.
691  * ----------
692  */
693 static void
sched_add_fdset(int fd,fd_set * fds)694 sched_add_fdset(int fd, fd_set *fds)
695 {
696 	if (fd >= 0 && fds != NULL)
697 	{
698 		FD_SET(fd, fds);
699 		if (fd >= sched_numfd)
700 			sched_numfd = fd + 1;
701 	}
702 }
703 
704 
705 /* ----------
706  * sched_add_fdset
707  *
708  * Remove a file descriptor from one of the global scheduler sets and adjust
709  * sched_numfd accordingly.
710  * ----------
711  */
712 static void
sched_remove_fdset(int fd,fd_set * fds)713 sched_remove_fdset(int fd, fd_set *fds)
714 {
715 	if (fd >= 0)
716 	{
717 		FD_CLR(fd, fds);
718 		if (sched_numfd == (fd + 1))
719 		{
720 			while (sched_numfd > 0)
721 			{
722 				if (FD_ISSET(sched_numfd - 1, &sched_fdset_read))
723 					break;
724 				if (FD_ISSET(sched_numfd - 1, &sched_fdset_write))
725 					break;
726 				sched_numfd--;
727 			}
728 		}
729 	}
730 }
731