1 //-< SYNC.CPP >------------------------------------------------------*--------*
2 // FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Main Memory Database Management System)                          *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Intertask synchonization primitives
9 //-------------------------------------------------------------------*--------*
10 
11 #define INSIDE_FASTDB
12 
13 #include "stdtp.h"
14 #include "sync.h"
15 #ifndef _WINCE
16 #include <sys/stat.h>
17 #endif
18 
19 #ifdef VXWORKS
20 #include "fastdbShim.h"
21 #endif
22 
23 BEGIN_FASTDB_NAMESPACE
24 
25 #ifndef _WIN32
26 
27 // Unix specific
28 
29 #define ACCESS_PERMISSION_MASK 0666
30 
getCurrentTimeMsec()31 unsigned dbSystem::getCurrentTimeMsec()
32 {
33     struct timeval tv;
34     gettimeofday(&tv, NULL);
35     return tv.tv_sec*1000 + tv.tv_usec / 1000;
36 }
37 
38 
39 #if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
40 END_FASTDB_NAMESPACE
41 #include <errno.h>
42 BEGIN_FASTDB_NAMESPACE
43 
44 #define PRINT_ERROR(func)  perror(func)
45 
46 char const*  keyFileDir = "/tmp/";
47 
48 #ifndef USE_POSIX_SEMAPHORES
49 END_FASTDB_NAMESPACE
50 #include <signal.h>
51 BEGIN_FASTDB_NAMESPACE
alarm_handler(int)52 static void alarm_handler(int){}
53 
54 class moduleInitializer {
55   public:
moduleInitializer()56     moduleInitializer() {
57         static struct sigaction sigact;
58         sigact.sa_handler = alarm_handler;
59         ::sigaction(SIGALRM, &sigact, NULL);
60     }
61 };
62 
63 static moduleInitializer initializer; // install SIGLARM handler
64 #endif // USE_POSIX_SEMAPHORES
65 
66 #endif // use SysV primitives
67 
68 
69 
70 #if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP || !defined(USE_POSIX_SEMAPHORES)
71 
72 #if defined(USE_STD_FTOK)
hashFunction(char const * s)73 int hashFunction(char const* s)
74 {
75     int ch, h = 0;
76     while ((ch = *s++) != '\0') {
77         h ^= ch;
78     }
79     h &= 0xFF;
80     if (h == 0) {
81         h = 1;
82     }
83     return h;
84 }
85 
getKeyFromFile(char const * file)86 int getKeyFromFile(char const* file)
87 {
88     return ftok(file, hashFunction(file));
89 }
90 #else
getKeyFromFile(char const * path)91 int getKeyFromFile(char const* path)
92 {
93     struct stat st;
94 #ifdef VXWORKS
95     // we don't have file system now. so we generate a random key at this place
96     static int uni = 1;
97     st.st_dev = uni;
98     st.st_ino = uni++;
99 #else
100     if (::stat(path, &st) < 0) {
101         return (key_t)-1;
102     }
103 #endif
104     return (key_t)(((st.st_dev & 0x7f) << 24) ^ (st.st_ino & 0x7fffffff));
105 }
106 #endif
107 
108 
109 #if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
open(char const * name,size_t size)110 bool dbSharedMemory::open(char const* name, size_t size)
111 {
112     char* fileName = (char*)name;
113 #ifndef VXWORKS
114     //Why this portion is commented is we don't have file system now
115     //It is used to generate keys. Once our file system is up then we can use this
116     if (strchr(name, '/') == NULL) {
117         fileName = new char[strlen(name)+strlen(keyFileDir)+1];
118         sprintf(fileName, "%s%s", keyFileDir, name);
119     }
120     int fd = ::open(fileName, O_RDWR|O_CREAT, ACCESS_PERMISSION_MASK);
121     if (fd < 0) {
122         if (fileName != name) {
123             delete[] fileName;
124         }
125         return false;
126     }
127     ::close(fd);
128 #endif // ndef VXWORKS
129     int key = getKeyFromFile(fileName);
130     if (fileName != name) {
131         delete[] fileName;
132     }
133     if (key < 0) {
134         return false;
135     }
136     shm = shmget(key, DOALIGN(size, 4096), IPC_CREAT|ACCESS_PERMISSION_MASK);
137     if (shm < 0) {
138         return false;
139     }
140     ptr = (char*)shmat(shm, NULL, 0);
141     return (ptr != (char*)-1);
142 }
143 
close()144 void dbSharedMemory::close()
145 {
146     shmdt((char*)ptr);
147 }
148 
erase()149 void dbSharedMemory::erase()
150 {
151     close();
152     shmctl(shm, IPC_RMID, NULL);
153 }
154 #endif // use SysV shmat
155 #endif
156 
157 ////////////////////////////////////////////////////////////////////////
158 // If we are to use the local implementation of dbSemaphore and dbEvent
159 //    (which currently uses SysV based semaphores)
160 #ifndef  USE_POSIX_SEMAPHORES
sem_init(int & sem,char const * name,unsigned init_value)161 int sem_init(int& sem, char const* name, unsigned init_value)
162 {
163     key_t key = IPC_PRIVATE;
164     int semid;
165     struct sembuf sops[3];
166     sops[0].sem_num = 1;
167     sops[0].sem_op  = 0; /* check if semaphore was already initialized */
168     sops[0].sem_flg = IPC_NOWAIT;
169     sops[1].sem_num = 1;
170     sops[1].sem_op  = 1; /* mark semaphore as initialized */
171     sops[1].sem_flg = 0;
172     sops[2].sem_num = 0;
173     sops[2].sem_op  = init_value;
174     sops[2].sem_flg = 0;
175     if (name != NULL) {
176         int fd;
177         char* path = (char*)name;
178         if (strchr(name, '/') == NULL) {
179             path = new char[strlen(name)+strlen(keyFileDir)+1];
180             sprintf(path, "%s%s", keyFileDir, name);
181         }
182         fd = open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
183         if (fd < 0) {
184             if (path != name) {
185                 delete[] path;
186             }
187             PRINT_ERROR("open");
188             return -1;
189         }
190         close(fd);
191         key = getKeyFromFile(path);
192         if (path != name) {
193             delete[] path;
194         }
195         if (key < 0) {
196             PRINT_ERROR("getKeyFromFile");
197             return -1;
198         }
199     }
200     semid = semget(key, 2, IPC_CREAT|ACCESS_PERMISSION_MASK);
201     if (semid < 0) {
202         PRINT_ERROR("semget");
203         return -1;
204     }
205     if (semop(semid, sops, itemsof(sops)) != 0 && errno != EAGAIN) {
206         PRINT_ERROR("semop");
207         return -1;
208     }
209     sem = semid;
210     return 0;
211 }
212 
213 enum wait_status { wait_ok, wait_timeout_expired, wait_error };
214 
wait_semaphore(int & sem,unsigned msec,struct sembuf * sops,int n_sops)215 static wait_status wait_semaphore(int& sem, unsigned msec,
216                                   struct sembuf* sops, int n_sops)
217 {
218     if (msec != INFINITE) {
219         struct timeval start;
220         struct timeval stop;
221         gettimeofday(&start, NULL);
222         unsigned long usec = start.tv_usec + msec % 1000 * 1000;
223         stop.tv_usec = usec % 1000000;
224         stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000;
225 
226         while (true) {
227             struct itimerval it;
228             it.it_interval.tv_sec = 0;
229             it.it_interval.tv_usec = 0;
230             it.it_value.tv_sec = stop.tv_sec - start.tv_sec;
231             it.it_value.tv_usec = stop.tv_usec - start.tv_usec;
232             if (stop.tv_usec < start.tv_usec) {
233                 it.it_value.tv_usec += 1000000;
234                 it.it_value.tv_sec -= 1;
235             }
236             if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
237                 return wait_error;
238             }
239             if (semop(sem, sops, n_sops) == 0) {
240                 break;
241             }
242             if (errno != EINTR) {
243                 return wait_error;
244             }
245             gettimeofday(&start, NULL);
246             if (stop.tv_sec < start.tv_sec ||
247                (stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec))
248             {
249                 return wait_timeout_expired;
250             }
251         }
252     } else {
253         while (semop(sem, sops, n_sops) < 0) {
254             if (errno != EINTR) {
255                 return wait_error;
256             }
257         }
258     }
259     return wait_ok;
260 }
261 
262 
wait(unsigned msec)263 bool dbSemaphore::wait(unsigned msec)
264 {
265     static struct sembuf sops[] = {{0, -1, 0}};
266     wait_status ws = wait_semaphore(s, msec, sops, itemsof(sops));
267     assert(ws != wait_error);
268     return ws == wait_ok;
269 }
270 
signal(unsigned inc)271 void dbSemaphore::signal(unsigned inc)
272 {
273     if (inc != 0) {
274         struct sembuf sops[1];
275         sops[0].sem_num = 0;
276         sops[0].sem_op  = inc;
277         sops[0].sem_flg = 0;
278         int rc = semop(s, sops, 1);
279         assert(rc == 0);
280     }
281 }
282 
283 
watch()284 bool dbWatchDog::watch()
285 {
286     static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
287     int rc;
288     while ((rc = semop(id, sops, 1)) < 0 && errno == EINTR);
289     return rc == 0;
290 }
291 
close()292 void dbWatchDog::close()
293 {
294     semctl(id, 0, IPC_RMID, NULL);
295 }
296 
open(char const * name)297 bool dbWatchDog::open(char const* name)
298 {
299     return open(name, ACCESS_PERMISSION_MASK);
300 }
301 
open(char const * name,int flags)302 bool dbWatchDog::open(char const* name, int flags)
303 {
304     key_t key = IPC_PRIVATE;
305     if (name != NULL) {
306         int fd;
307         char* path = (char*)name;
308         if (strchr(name, '/') == NULL) {
309             path = new char[strlen(name)+strlen(keyFileDir)+1];
310             sprintf(path, "%s%s", keyFileDir, name);
311         }
312         fd = ::open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
313         if (fd < 0) {
314             if (path != name) {
315                 delete[] path;
316             }
317             PRINT_ERROR("open");
318             return -1;
319         }
320         ::close(fd);
321         key = getKeyFromFile(path);
322         if (path != name) {
323             delete[] path;
324         }
325         if (key < 0) {
326             PRINT_ERROR("getKeyFromFile");
327             return -1;
328         }
329     }
330     return (id = semget(key, 1, flags)) >= 0;
331 }
332 
create(char const * name)333 bool dbWatchDog::create(char const* name) {
334     static struct sembuf sops[] = {{0, 1, 0}, {0, -1, SEM_UNDO}};
335     if (open(name, IPC_CREAT|ACCESS_PERMISSION_MASK)) {
336         return semop(id, sops, 2) == 0;
337     }
338     return false;
339 }
340 
341 
342 
343 #if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__)
344 /* union semun is defined by including <sys/sem.h> */
345 #else
346 union semun {
347     int val;
348     struct semid_ds* buf;
349     unsigned short* array;
350 };
351 #endif
352 static union semun u;
353 
354 
reset()355 void dbSemaphore::reset()
356 {
357     u.val = 0;
358     int rc = semctl(s, 0, SETVAL, u);
359     assert(rc >= 0);
360 }
361 
open(char const * name,unsigned init_value)362 bool dbSemaphore::open(char const* name, unsigned init_value)
363 {
364     return sem_init(s, name, init_value) == 0;
365 }
366 
close()367 void dbSemaphore::close() {}
368 
erase()369 void dbSemaphore::erase() {
370     semctl(s, 0, IPC_RMID, &u);
371 }
372 
wait(unsigned msec)373 bool dbEvent::wait(unsigned msec)
374 {
375     static struct sembuf sops[] = {{0, -1, 0}, {0, 1, 0}};
376     wait_status ws = wait_semaphore(e, msec, sops, itemsof(sops));
377     assert(ws != wait_error);
378     return ws == wait_ok;
379 }
380 
signal()381 void dbEvent::signal()
382 {
383     static struct sembuf sops[] = {{0, 0, IPC_NOWAIT}, {0, 1, 0}};
384     int rc = semop(e, sops, itemsof(sops));
385     assert(rc == 0 || errno == EAGAIN);
386 }
387 
reset()388 void dbEvent::reset()
389 {
390     static struct sembuf sops[] = {{0, -1, IPC_NOWAIT}};
391     int rc = semop(e, sops, itemsof(sops));
392     assert(rc == 0 || errno == EAGAIN);
393 }
394 
open(char const * name,bool signaled)395 bool dbEvent::open(char const* name, bool signaled)
396 {
397 // XXX: sem_init is POSIX, the rest of these calls are SysV.
398     return sem_init(e, name, signaled) == 0;
399 }
400 
close()401 void dbEvent::close() {}
402 
erase()403 void dbEvent::erase() {
404     semctl(e, 0, IPC_RMID, &u);
405 }
406 
407 #if defined(USE_INTERNAL_CS_IMPL)
408 
409 ////////////////////////////////////////////////////////////////////
410 // dbGLobalCriticalSection internal implementation
411 
412 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__)) && !defined(RECOVERABLE_CRITICAL_SECTION)
413 
enter()414 void dbGlobalCriticalSection::enter()
415 {
416     int inc = -1;
417     __asm__ __volatile__(
418                         "lock; xadd %0,%1"
419                         :"=d" (inc), "=m" (*count)
420                         :"d" (inc), "m" (*count));
421     if (inc != 1) {
422         static struct sembuf sops[] = {{0, -1, 0}};
423         int rc;
424         while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
425         assert(rc == 0);
426     }
427 #if GLOBAL_CS_DEBUG
428     owner = pthread_self();
429 #endif
430 }
431 
leave()432 void dbGlobalCriticalSection::leave()
433 {
434     int inc = 1;
435 #if GLOBAL_CS_DEBUG
436     owner = 0;
437 #endif
438     __asm__ __volatile__(
439                         "lock; xadd %0,%1"
440                         :"=d" (inc), "=m" (*count)
441                         :"d" (inc), "m" (*count));
442     if (inc != 0) {
443         /* some other processes waiting to enter critical section */
444         static struct sembuf sops[] = {{0, 1, 0}};
445         int rc = semop(semid, sops, 1);
446         assert(rc == 0);
447     }
448 }
449 
create(char const * name,sharedsem_t * count)450 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
451 {
452     this->count = count;
453     *count = 1;
454     return sem_init(semid, name, 0) == 0;
455 }
456 
open(char const * name,sharedsem_t * count)457 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
458 {
459     this->count = count;
460     return sem_init(semid, name, 0) == 0;
461 }
462 
463 #elif !defined(RECOVERABLE_CRITICAL_SECTION) && defined(__GNUC__) && defined (__GNUC_MINOR__) && ((4 < __GNUC__) || (4 == __GNUC__ && 1 <= __GNUC_MINOR__)) && !defined(__arm__)
464 
enter()465 void dbGlobalCriticalSection::enter()
466 {
467     if (__sync_add_and_fetch(count, 1) != 1) {
468         static struct sembuf sops[] = {{0, -1, 0}};
469         int rc;
470         while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
471         assert(rc == 0);
472     }
473 #if GLOBAL_CS_DEBUG
474     owner = pthread_self();
475 #endif
476 }
477 
leave()478 void dbGlobalCriticalSection::leave()
479 {
480 #if GLOBAL_CS_DEBUG
481     owner = 0;
482 #endif
483     if (__sync_add_and_fetch(count, -1) != 0) {
484         /* some other processes waiting to enter critical section */
485         static struct sembuf sops[] = {{0, 1, 0}};
486         int rc = semop(semid, sops, 1);
487         assert(rc == 0);
488     }
489 }
490 
create(char const * name,sharedsem_t * count)491 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
492 {
493     this->count = count;
494     *count = 0;
495     return sem_init(semid, name, 0) == 0;
496 }
497 
open(char const * name,sharedsem_t * count)498 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
499 {
500     this->count = count;
501     return sem_init(semid, name, 0) == 0;
502 }
503 
504 #else // defined(__GNUC__) && defined(i386)
505 // "lowest" case, use a SysV semaphore for complete portability
enter()506 void dbGlobalCriticalSection::enter()
507 {
508     static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
509     int rc;
510     while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
511     assert(rc == 0);
512 #if GLOBAL_CS_DEBUG
513     owner = pthread_self();
514 #endif
515 }
516 
leave()517 void dbGlobalCriticalSection::leave()
518 {
519 #if GLOBAL_CS_DEBUG
520     owner = 0;
521 #endif
522     static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
523     int rc = semop(semid, sops, 1);
524     assert(rc == 0);
525 }
526 
open(char const * name,sharedsem_t *)527 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
528 {
529 // XXX: sem_init is Posix, the rest of these calls are SysV.
530     return sem_init(semid, name, 1) == 0;
531 }
532 
create(char const * name,sharedsem_t *)533 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
534 {
535     return sem_init(semid, name, 1) == 0;
536 }
537 
538 #endif // defined(__GNUC__) && defined(i386)
539 
erase()540 void dbGlobalCriticalSection::erase()
541 {
542     semctl(semid, 0, IPC_RMID, &u);
543 }
544 
545 #endif // USE_INTERNAL_CS_IMPL
546 
547 dbInitializationMutex::initializationStatus
initialize(char const * name)548 dbInitializationMutex::initialize(char const* name)
549 {
550     struct sembuf sops[4];
551     char* path = (char*)name;
552     if (strchr(name, '/') == NULL) {
553         path = new char[strlen(name)+strlen(keyFileDir)+1];
554         sprintf(path, "%s%s", keyFileDir, name);
555     }
556     int fd = open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
557     if (fd < 0) {
558         if (path != name) {
559             delete[] path;
560         }
561         PRINT_ERROR("open");
562         return InitializationError;
563     }
564     ::close(fd);
565     int key = getKeyFromFile(path);
566     if (path != name) {
567         delete[] path;
568     }
569     if (key < 0) {
570         PRINT_ERROR("getKeyFromFile");
571         return InitializationError;
572     }
573     while (true) {
574         semid = semget(key, 3, IPC_CREAT|ACCESS_PERMISSION_MASK);
575         if (semid < 0) {
576             PRINT_ERROR("semget");
577             return InitializationError;
578         }
579         // Semaphore 0 - number of active processes
580         // Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
581         // Semaphore 2 - semaphore was destroyed
582 
583         sops[0].sem_num = 0;
584         sops[0].sem_op  = 0; /* check if semaphore was already initialized */
585         sops[0].sem_flg = IPC_NOWAIT;
586         sops[1].sem_num = 0;
587         sops[1].sem_op  = 1; /* increment number of active processes */
588         sops[1].sem_flg = SEM_UNDO;
589         sops[2].sem_num = 1;
590         sops[2].sem_op  = 1; /* initialization in process */
591         sops[2].sem_flg = SEM_UNDO;
592         sops[3].sem_num = 2;
593         sops[3].sem_op  = 0; /* check if semaphore was destroyed */
594         sops[3].sem_flg = IPC_NOWAIT;
595         if (semop(semid, sops, 4) < 0) {
596             if (errno == EAGAIN) {
597                 sops[0].sem_num = 0;
598                 sops[0].sem_op  = -1; /* check if semaphore was already initialized */
599                 sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
600                 sops[1].sem_num = 1;
601                 sops[1].sem_op  = 0; /* wait until inialization completed */
602                 sops[1].sem_flg = 0;
603                 sops[2].sem_num = 0;
604                 sops[2].sem_op  = 2; /* increment number of active processes */
605                 sops[2].sem_flg = SEM_UNDO;
606                 sops[3].sem_num = 2;
607                 sops[3].sem_op  = 0; /* check if semaphore was destroyed */
608                 sops[3].sem_flg = IPC_NOWAIT;
609                 if (semop(semid, sops, 4) == 0) {
610                     return AlreadyInitialized;
611                 }
612                 if (errno == EAGAIN) {
613                     sleep(1);
614                     continue;
615                 }
616             }
617             if (errno == EIDRM) {
618                 continue;
619             }
620             PRINT_ERROR("semop");
621             return InitializationError;
622         } else {
623             return NotYetInitialized;
624         }
625     }
626 }
627 
done()628 void dbInitializationMutex::done()
629 {
630     struct sembuf sops[1];
631     sops[0].sem_num = 1;
632     sops[0].sem_op  = -1; /* initialization done */
633     sops[0].sem_flg = SEM_UNDO;
634     int rc = semop(semid, sops, 1);
635     assert(rc == 0);
636 }
637 
close()638 bool dbInitializationMutex::close()
639 {
640     int rc;
641     struct sembuf sops[3];
642     while (true) {
643         sops[0].sem_num = 0;
644         sops[0].sem_op  = -1; /* decrement process couter */
645         sops[0].sem_flg = SEM_UNDO;
646         sops[1].sem_num = 0;
647         sops[1].sem_op  = 0;  /* check if there are no more active processes */
648         sops[1].sem_flg = IPC_NOWAIT;
649         sops[2].sem_num = 2;
650         sops[2].sem_op  = 1;  /* mark as destructed */
651         sops[2].sem_flg = SEM_UNDO;
652         if ((rc = semop(semid, sops, 3)) == 0) {
653             return true;
654         } else {
655             assert(errno == EAGAIN);
656         }
657         sops[0].sem_num = 0;
658         sops[0].sem_op  = -2; /* decrement process couter and check for non-zero */
659         sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
660         sops[1].sem_num = 0;
661         sops[1].sem_op  = 1;
662         sops[1].sem_flg = SEM_UNDO;
663         if ((rc = semop(semid, sops, 2)) == 0) {
664             return false;
665         } else {
666             assert(errno == EAGAIN);
667         }
668     }
669 }
670 
erase()671 void dbInitializationMutex::erase()
672 {
673     semctl(semid, 0, IPC_RMID, &u);
674 }
675 #endif // !USE_POSIX_SEMAPHORES
676 
677 
678 
679 //  Thread stuff
680 #ifndef NO_PTHREADS
681 
682 #if defined(_SC_NPROCESSORS_ONLN)
numberOfProcessors()683 int dbThread::numberOfProcessors() {
684     return sysconf(_SC_NPROCESSORS_ONLN);
685 }
686 #elif defined(__linux__)
687 END_FASTDB_NAMESPACE
688 #include <linux/smp.h>
689 BEGIN_FASTDB_NAMESPACE
numberOfProcessors()690 int dbThread::numberOfProcessors() { return smp_num_cpus; }
691 #elif defined(__FreeBSD__) || defined(__bsdi__) || defined(__OpenBSD__) || defined(__NetBSD__)
692 #if defined(__bsdi__) || defined(__OpenBSD__)
693 END_FASTDB_NAMESPACE
694 #include <sys/param.h>
695 BEGIN_FASTDB_NAMESPACE
696 #endif
697 END_FASTDB_NAMESPACE
698 #include <sys/sysctl.h>
699 BEGIN_FASTDB_NAMESPACE
numberOfProcessors()700 int dbThread::numberOfProcessors() {
701     int mib[2],ncpus=0;
702     size_t len=sizeof(ncpus);
703     mib[0]= CTL_HW;
704     mib[1]= HW_NCPU;
705     sysctl(mib,2,&ncpus,&len,NULL,0);
706     return ncpus;
707 }
708 #else
numberOfProcessors()709 int dbThread::numberOfProcessors() { return 1; }
710 #endif
711 #endif // NO_PTHREADS
712 
713 
714 #else // _WIN32
715 
716 // Win32 specific code
717 unsigned dbSystem::getCurrentTimeMsec()
718 {
719     return GetTickCount();
720 }
721 
722 #ifdef SET_NULL_DACL
723 dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
724 #endif
725 
726 #endif // __WIN32
727 
728 //////////////////////////////////////////////////////////////
729 // Common W32 and Unix platform code follows
730 
pooledThreadFunc(void * arg)731 void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
732 {
733     ((dbPooledThread*)arg)->run();
734 }
735 
dbPooledThread(dbThreadPool * threadPool)736 dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
737 {
738     pool = threadPool;
739     startSem.open();
740     readySem.open();
741     next = NULL;
742     running = true;
743     thread.create(&pooledThreadFunc, this);
744 }
745 
~dbPooledThread()746 dbPooledThread::~dbPooledThread()
747 {
748     startSem.close();
749     readySem.close();
750 }
751 
stop()752 void dbPooledThread::stop()
753 {
754     running = false;
755     startSem.signal();
756     readySem.wait(pool->mutex);
757 }
758 
run()759 void dbPooledThread::run()
760 {
761     dbCriticalSection cs(pool->mutex);
762     while (true) {
763         startSem.wait(pool->mutex);
764         if (!running) {
765             break;
766         }
767         (*f)(arg);
768         readySem.signal();
769     }
770     readySem.signal();
771 }
772 
join(dbPooledThread * thr)773 void dbThreadPool::join(dbPooledThread* thr)
774 {
775     dbCriticalSection cs(mutex);
776     thr->readySem.wait(mutex);
777     thr->next = freeThreads;
778     freeThreads = thr;
779 }
780 
781 
create(dbThread::thread_proc_t f,void * arg)782 dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
783 {
784     dbCriticalSection cs(mutex);
785     dbPooledThread* t = freeThreads;
786     if (t == NULL) {
787         t = freeThreads = new dbPooledThread(this);
788     }
789     freeThreads = t->next;
790     t->f = f;
791     t->arg = arg;
792     t->startSem.signal();
793     return t;
794 }
795 
796 
dbThreadPool()797 dbThreadPool::dbThreadPool()
798 {
799     freeThreads = NULL;
800 }
801 
~dbThreadPool()802 dbThreadPool::~dbThreadPool()
803 {
804     dbCriticalSection cs(mutex);
805     dbPooledThread *t, *next;
806     for (t = freeThreads; t != NULL; t = next) {
807         next = t->next;
808         t->stop();
809         delete t;
810     }
811 }
812 
813 END_FASTDB_NAMESPACE
814 
815