1 /* ----------------------------------------------------------------------
2 * scheduler.c
3 *
4 * Event scheduling subsystem for slon.
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 * ----------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23
24 #ifndef WIN32
25 #include <unistd.h>
26 #include <sys/time.h>
27 #endif
28
29 #include "slon.h"
30
31
32 /*
33 * If PF_LOCAL is not defined, use the old BSD name PF_UNIX
34 */
35 #ifndef PF_LOCAL
36 #define PF_LOCAL PF_UNIX
37 #endif
38
39 /* ----------
40 * Static data
41 * ----------
42 */
43 static ScheduleStatus sched_status = SCHED_STATUS_OK;
44
45 static int sched_numfd = 0;
46 static fd_set sched_fdset_read;
47 static fd_set sched_fdset_write;
48 static SlonConn *sched_waitqueue_head = NULL;
49 static SlonConn *sched_waitqueue_tail = NULL;
50
51 static pthread_t sched_main_thread;
52 static pthread_t sched_scheduler_thread;
53
54 static pthread_mutex_t sched_master_lock;
55 static pthread_cond_t sched_master_cond;
56
57
58 /* ----------
59 * Local functions
60 * ----------
61 */
62 static void *sched_mainloop(void *);
63 static void sched_add_fdset(int fd, fd_set *fds);
64 static void sched_remove_fdset(int fd, fd_set *fds);
65
66 /* static void sched_shutdown(); */
67
68
69 /* ----------
70 * sched_start_mainloop
71 *
72 * Called from SlonMain() before starting up any worker thread.
73 *
74 * This will spawn the event scheduling thread that does the central select(2)
75 * system call.
76 * ----------
77 */
78 int
sched_start_mainloop(void)79 sched_start_mainloop(void)
80 {
81 sched_status = SCHED_STATUS_OK;
82 sched_waitqueue_head = NULL;
83 sched_waitqueue_tail = NULL;
84 sched_numfd = 0;
85 FD_ZERO(&sched_fdset_read);
86 FD_ZERO(&sched_fdset_write);
87
88 /*
89 * Remember the main threads identifier
90 */
91 sched_main_thread = pthread_self();
92
93 /*
94 * Initialize the master lock and condition variables
95 */
96 if (pthread_mutex_init(&sched_master_lock, NULL) < 0)
97 {
98 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_init() - %s\n",
99 strerror(errno));
100 return -1;
101 }
102 if (pthread_cond_init(&sched_master_cond, NULL) < 0)
103 {
104 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_init() - %s\n",
105 strerror(errno));
106 return -1;
107 }
108
109 /*
110 * Grab the scheduler master lock
111 */
112 if (pthread_mutex_lock(&sched_master_lock) < 0)
113 {
114 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_lock() - %s\n",
115 strerror(errno));
116 return -1;
117 }
118
119 /*
120 * Start the scheduler thread
121 */
122 if (pthread_create(&sched_scheduler_thread, NULL, sched_mainloop, NULL) < 0)
123 {
124 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_create() - %s\n",
125 strerror(errno));
126 return -1;
127 }
128
129 /*
130 * When the scheduler is ready, he'll signal the scheduler cond
131 */
132 if (pthread_cond_wait(&sched_master_cond, &sched_master_lock) < 0)
133 {
134 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_cond_wait() - %s\n",
135 strerror(errno));
136 return -1;
137 }
138
139 /*
140 * Release the scheduler lock
141 */
142 if (pthread_mutex_unlock(&sched_master_lock) < 0)
143 {
144 slon_log(SLON_FATAL, "sched_start_mainloop: pthread_mutex_unlock() - %s\n",
145 strerror(errno));
146 return -1;
147 }
148
149 /*
150 * Check for errors
151 */
152 if (sched_status != SCHED_STATUS_OK)
153 return -1;
154
155 /*
156 * Scheduler started successfully
157 */
158 return 0;
159 }
160
161
162 /* ----------
163 * sched_wait_mainloop
164 *
165 * Called from main() after all working threads according to the initial
166 * configuration are started. Will wait until the scheduler mainloop
167 * terminates.
168 * ----------
169 */
170 int
sched_wait_mainloop(void)171 sched_wait_mainloop(void)
172 {
173 /*
174 * Wait for the scheduler to finish.
175 */
176 if (pthread_join(sched_scheduler_thread, NULL) < 0)
177 {
178 perror("sched_wait_mainloop: pthread_join()");
179 return -1;
180 }
181 return 0;
182 }
183
184
185 /* ----------
186 * sched_wait_conn
187 *
188 * Assumes that the thread holds the lock on conn->conn_lock.
189 *
190 * Adds the connection to the central wait queue and wakes up the scheduler
191 * thread to reloop onto the select(2) call.
192 * ----------
193 */
194 int
sched_wait_conn(SlonConn * conn,int condition)195 sched_wait_conn(SlonConn * conn, int condition)
196 {
197 ScheduleStatus rc;
198 int fds;
199
200 /*
201 * Grab the master lock and check that we're in normal runmode
202 */
203 pthread_mutex_lock(&sched_master_lock);
204 if (sched_status != SCHED_STATUS_OK)
205 {
206 pthread_mutex_unlock(&sched_master_lock);
207 return -1;
208 }
209
210 /*
211 * Remember the event we're waiting for and add the database connection to
212 * the fdset(s)
213 */
214 conn->condition = condition;
215 if (condition & SCHED_WAIT_SOCK_READ)
216 {
217 fds = PQsocket(conn->dbconn);
218 sched_add_fdset(fds, &sched_fdset_read);
219 }
220 if (condition & SCHED_WAIT_SOCK_WRITE)
221 {
222 fds = PQsocket(conn->dbconn);
223 sched_add_fdset(fds, &sched_fdset_write);
224 }
225
226 /*
227 * Add the connection to the wait queue
228 */
229 DLLIST_ADD_HEAD(sched_waitqueue_head, sched_waitqueue_tail, conn);
230
231 /*
232 * Give the scheduler thread a heads up, release the master lock and wait
233 * for it to tell us that the event we're waiting for happened.
234 */
235 if (pipewrite(sched_wakeuppipe[1], "x", 1) < 0)
236 {
237 perror("sched_wait_conn: write()");
238 exit(-1);
239 }
240 pthread_mutex_unlock(&sched_master_lock);
241 pthread_cond_wait(&(conn->conn_cond), &(conn->conn_lock));
242
243 /*
244 * Determine the return code
245 */
246 pthread_mutex_lock(&sched_master_lock);
247 if ((rc = sched_status) == SCHED_STATUS_OK)
248 {
249 if (conn->condition & SCHED_WAIT_CANCEL)
250 {
251 conn->condition &= ~(SCHED_WAIT_CANCEL);
252 rc = SCHED_STATUS_CANCEL;
253 }
254 }
255 pthread_mutex_unlock(&sched_master_lock);
256
257 return rc;
258 }
259
260
261 /* ----------
262 * sched_wait_time
263 *
264 * Assumes that the thread holds the lock on conn->conn_lock.
265 *
266 * Like sched_wait_conn() but with a timeout. Can be called without any
267 * read/write condition to wait for to resemble a pure timeout mechanism.
268 * ----------
269 */
270 int
sched_wait_time(SlonConn * conn,int condition,int msec)271 sched_wait_time(SlonConn * conn, int condition, int msec)
272 {
273 struct timeval *tv = &(conn->timeout);
274
275 /*
276 * Calculate the end-time of the desired timeout.
277 */
278 gettimeofday(tv, NULL);
279 tv->tv_sec += (long) (msec / 1000) +
280 (((msec % 1000) * 1000) + tv->tv_usec) / 1000000;
281 tv->tv_usec = (tv->tv_usec + (msec % 1000) * 1000) % 1000000;
282
283 /*
284 * Let sched_wait_conn() do the rest.
285 */
286 return sched_wait_conn(conn, condition | SCHED_WAIT_TIMEOUT);
287 }
288
289
290 /* ----------
291 * sched_msleep
292 *
293 * Use the schedulers event loop to sleep for msec milliseconds.
294 * ----------
295 */
296 int
sched_msleep(SlonNode * node,int msec)297 sched_msleep(SlonNode * node, int msec)
298 {
299 SlonConn *conn;
300 char dummyconn_name[64];
301 int rc;
302
303 if (node)
304 {
305 snprintf(dummyconn_name, 64, "msleep_node_%d", node->no_id);
306 conn = slon_make_dummyconn(dummyconn_name);
307 }
308 else
309 conn = slon_make_dummyconn("msleep_local");
310
311 rc = sched_wait_time(conn, 0, msec);
312 slon_free_dummyconn(conn);
313
314 return rc;
315 }
316
317
318 /* ----------
319 * sched_get_status
320 *
321 * Return the current scheduler status in a thread safe fashion
322 * ----------
323 */
324 int
sched_get_status(void)325 sched_get_status(void)
326 {
327 ScheduleStatus status;
328
329 pthread_mutex_lock(&sched_master_lock);
330 status = sched_status;
331 pthread_mutex_unlock(&sched_master_lock);
332 return status;
333 }
334
335
336 /* ----------
337 * sched_wakeup_node
338 *
339 * Wakeup the threads (listen and worker) of one or all remote nodes to cause
340 * them rechecking the current runtime status or adjust their configuration
341 * to changes.
342 * ----------
343 */
344 int
sched_wakeup_node(int no_id)345 sched_wakeup_node(int no_id)
346 {
347 SlonConn *conn;
348 int num_wakeup = 0;
349
350 pthread_mutex_lock(&sched_master_lock);
351
352 /*
353 * Set all waiters that belong to that node to cancel
354 */
355 for (conn = sched_waitqueue_head; conn; conn = conn->next)
356 {
357 if (conn->node != NULL)
358 {
359 if (no_id < 0 || conn->node->no_id == no_id)
360 {
361 conn->condition |= SCHED_WAIT_CANCEL;
362 num_wakeup++;
363 }
364 }
365 }
366
367 /*
368 * Give the scheduler thread a heads up if some wait was canceled;
369 */
370 if (num_wakeup > 0)
371 {
372 if (pipewrite(sched_wakeuppipe[1], "x", 1) < 0)
373 {
374 perror("sched_wait_conn: write()");
375 slon_restart();
376 }
377 }
378 pthread_mutex_unlock(&sched_master_lock);
379
380 remoteWorker_wakeup(no_id);
381
382 slon_log(SLON_DEBUG2, "sched_wakeup_node(): no_id=%d "
383 "(%d threads + worker signaled)\n", no_id, num_wakeup);
384
385 return num_wakeup;
386 }
387
388
389 /* ----------
390 * sched_mainloop
391 *
392 * The thread handling the master scheduling.
393 * ----------
394 */
395 static void *
sched_mainloop(void * dummy)396 sched_mainloop(void *dummy)
397 {
398 fd_set rfds;
399 fd_set wfds;
400 int rc;
401 SlonConn *conn;
402 SlonConn *next;
403 struct timeval min_timeout;
404 struct timeval *tv;
405 int i;
406
407 /*
408 * Grab the scheduler master lock. This will wait until the main thread
409 * acutally blocks on the master cond.
410 */
411 pthread_mutex_lock(&sched_master_lock);
412
413 /*
414 * Initialize the fdsets for select(2)
415 */
416 FD_ZERO(&sched_fdset_read);
417 FD_ZERO(&sched_fdset_write);
418
419 sched_add_fdset(sched_wakeuppipe[0], &sched_fdset_read);
420
421 /*
422 * Done with all initialization. Let the main thread go ahead and get
423 * everyone else dancing.
424 */
425 pthread_cond_signal(&sched_master_cond);
426
427 /*
428 * And we are now entering the endless loop of scheduling events
429 */
430 while (sched_status == SCHED_STATUS_OK)
431 {
432 struct timeval now;
433 struct timeval timeout;
434
435 /*
436 * Check if any of the connections in the wait queue have reached
437 * their timeout. While doing so, we also remember the closest timeout
438 * in the future.
439 */
440 tv = NULL;
441 gettimeofday(&now, NULL);
442 for (conn = sched_waitqueue_head; conn;)
443 {
444 next = conn->next;
445
446 if (conn->condition & SCHED_WAIT_CANCEL)
447 {
448 /*
449 * Some other thread wants this thread to wake up.
450 */
451 DLLIST_REMOVE(sched_waitqueue_head,
452 sched_waitqueue_tail, conn);
453
454 if (conn->condition & SCHED_WAIT_SOCK_READ)
455 sched_remove_fdset(PQsocket(conn->dbconn),
456 &sched_fdset_read);
457 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
458 sched_remove_fdset(PQsocket(conn->dbconn),
459 &sched_fdset_write);
460
461 pthread_mutex_lock(&(conn->conn_lock));
462 pthread_cond_signal(&(conn->conn_cond));
463 pthread_mutex_unlock(&(conn->conn_lock));
464
465 conn = next;
466 continue;
467 }
468 if (conn->condition & SCHED_WAIT_TIMEOUT)
469 {
470 /*
471 * This connection has a timeout. Calculate the time until
472 * that.
473 */
474 timeout.tv_sec = conn->timeout.tv_sec - now.tv_sec;
475 timeout.tv_usec = conn->timeout.tv_usec - now.tv_usec;
476 while (timeout.tv_usec < 0)
477 {
478 timeout.tv_sec--;
479 timeout.tv_usec += 1000000;
480 }
481
482 /*
483 * Check if the timeout has elapsed
484 */
485 if (timeout.tv_sec < 0 ||
486 (timeout.tv_sec == 0 && timeout.tv_usec < 20000))
487 {
488 /*
489 * Remove the connection from the wait queue. We consider
490 * everything closer than 20 msec being elapsed to avoid a
491 * full scheduler round just for one kernel tick.
492 */
493 DLLIST_REMOVE(sched_waitqueue_head,
494 sched_waitqueue_tail, conn);
495
496 if (conn->condition & SCHED_WAIT_SOCK_READ)
497 sched_remove_fdset(PQsocket(conn->dbconn),
498 &sched_fdset_read);
499 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
500 sched_remove_fdset(PQsocket(conn->dbconn),
501 &sched_fdset_write);
502
503 pthread_mutex_lock(&(conn->conn_lock));
504 pthread_cond_signal(&(conn->conn_cond));
505 pthread_mutex_unlock(&(conn->conn_lock));
506 }
507 else
508 {
509 /*
510 * Timeout not elapsed. Remember the nearest.
511 */
512 if (tv == NULL ||
513 timeout.tv_sec < min_timeout.tv_sec ||
514 (timeout.tv_sec == min_timeout.tv_sec &&
515 timeout.tv_usec < min_timeout.tv_usec))
516 {
517 tv = &min_timeout;
518 min_timeout.tv_sec = timeout.tv_sec;
519 min_timeout.tv_usec = timeout.tv_usec;
520 }
521 }
522 }
523 conn = next;
524 }
525
526 /*
527 * Make copies of the file descriptor sets for select(2)
528 */
529 FD_ZERO(&rfds);
530 FD_ZERO(&wfds);
531 for (i = 0; i < sched_numfd; i++)
532 {
533 if (FD_ISSET(i, &sched_fdset_read))
534 FD_SET(i, &rfds);
535 if (FD_ISSET(i, &sched_fdset_write))
536 FD_SET(i, &wfds);
537 }
538
539 /*
540 * Do the select(2) while unlocking the master lock.
541 */
542 pthread_mutex_unlock(&sched_master_lock);
543 rc = select(sched_numfd, &rfds, &wfds, NULL, tv);
544 pthread_mutex_lock(&sched_master_lock);
545
546 /*
547 * Check for errors
548 */
549 if (rc < 0)
550 {
551 perror("sched_mainloop: select()");
552 sched_status = SCHED_STATUS_ERROR;
553 break;
554 }
555
556 /*
557 * Check the special pipe for a heads up.
558 */
559 if (FD_ISSET(sched_wakeuppipe[0], &rfds))
560 {
561 char buf[1];
562
563 rc--;
564 if (piperead(sched_wakeuppipe[0], buf, 1) != 1)
565 {
566 perror("sched_mainloop: read()");
567 sched_status = SCHED_STATUS_ERROR;
568 break;
569 }
570
571 if (buf[0] == 'p')
572 {
573 sched_status = SCHED_STATUS_SHUTDOWN;
574 }
575 }
576
577 /*
578 * Check all remaining connections if the IO condition the thread is
579 * waiting for has occured.
580 */
581 conn = sched_waitqueue_head;
582 while (rc > 0 && conn)
583 {
584 int fd_check = PQsocket(conn->dbconn);
585
586 if (conn->condition & SCHED_WAIT_SOCK_READ)
587 {
588
589
590 if (fd_check >= 0 && FD_ISSET(fd_check, &rfds))
591 {
592 next = conn->next;
593
594 pthread_mutex_lock(&(conn->conn_lock));
595 pthread_cond_signal(&(conn->conn_cond));
596 pthread_mutex_unlock(&(conn->conn_lock));
597 rc--;
598
599 DLLIST_REMOVE(sched_waitqueue_head,
600 sched_waitqueue_tail, conn);
601
602 if (conn->condition & SCHED_WAIT_SOCK_READ)
603 sched_remove_fdset(PQsocket(conn->dbconn),
604 &sched_fdset_read);
605 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
606 sched_remove_fdset(PQsocket(conn->dbconn),
607 &sched_fdset_write);
608
609 conn = next;
610 continue;
611 }
612 }
613 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
614 {
615 if (fd_check >= 0 && FD_ISSET(fd_check, &wfds))
616 {
617 next = conn->next;
618
619 pthread_mutex_lock(&(conn->conn_lock));
620 pthread_cond_signal(&(conn->conn_cond));
621 pthread_mutex_unlock(&(conn->conn_lock));
622 rc--;
623
624 DLLIST_REMOVE(sched_waitqueue_head,
625 sched_waitqueue_tail, conn);
626
627 if (conn->condition & SCHED_WAIT_SOCK_READ)
628 sched_remove_fdset(PQsocket(conn->dbconn),
629 &sched_fdset_read);
630 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
631 sched_remove_fdset(PQsocket(conn->dbconn),
632 &sched_fdset_write);
633
634 conn = next;
635 continue;
636 }
637 }
638 conn = conn->next;
639 }
640 }
641
642 /*
643 * If we reach here the scheduler runmode has been changed by by the main
644 * threads signal handler. We currently hold the master lock. First we
645 * close the scheduler heads-up socket pair so nobody will think we're
646 * listening any longer.
647 */
648
649 /*
650 * close(sched_wakeuppipe[0]); sched_wakeuppipe[0] = -1;
651 * close(sched_wakeuppipe[1]); sched_wakeuppipe[1] = -1;
652 */
653
654 /*
655 * Then we cond_signal all connections that are in the queue.
656 */
657 for (conn = sched_waitqueue_head; conn;)
658 {
659 next = conn->next;
660
661 pthread_mutex_lock(&(conn->conn_lock));
662 pthread_cond_signal(&(conn->conn_cond));
663 pthread_mutex_unlock(&(conn->conn_lock));
664
665 DLLIST_REMOVE(sched_waitqueue_head,
666 sched_waitqueue_tail, conn);
667
668 if (conn->condition & SCHED_WAIT_SOCK_READ)
669 sched_remove_fdset(PQsocket(conn->dbconn),
670 &sched_fdset_read);
671 if (conn->condition & SCHED_WAIT_SOCK_WRITE)
672 sched_remove_fdset(PQsocket(conn->dbconn),
673 &sched_fdset_write);
674
675 conn = next;
676 }
677
678 /*
679 * Release the master lock and terminate the scheduler thread.
680 */
681 pthread_mutex_unlock(&sched_master_lock);
682 pthread_exit(NULL);
683 }
684
685
686 /* ----------
687 * sched_add_fdset
688 *
689 * Add a file descriptor to one of the global scheduler sets and adjust
690 * sched_numfd accordingly.
691 * ----------
692 */
693 static void
sched_add_fdset(int fd,fd_set * fds)694 sched_add_fdset(int fd, fd_set *fds)
695 {
696 if (fd >= 0 && fds != NULL)
697 {
698 FD_SET(fd, fds);
699 if (fd >= sched_numfd)
700 sched_numfd = fd + 1;
701 }
702 }
703
704
705 /* ----------
706 * sched_add_fdset
707 *
708 * Remove a file descriptor from one of the global scheduler sets and adjust
709 * sched_numfd accordingly.
710 * ----------
711 */
712 static void
sched_remove_fdset(int fd,fd_set * fds)713 sched_remove_fdset(int fd, fd_set *fds)
714 {
715 if (fd >= 0)
716 {
717 FD_CLR(fd, fds);
718 if (sched_numfd == (fd + 1))
719 {
720 while (sched_numfd > 0)
721 {
722 if (FD_ISSET(sched_numfd - 1, &sched_fdset_read))
723 break;
724 if (FD_ISSET(sched_numfd - 1, &sched_fdset_write))
725 break;
726 sched_numfd--;
727 }
728 }
729 }
730 }
731