1 //-< SYNC.CPP >------------------------------------------------------*--------*
2 // FastDB Version 1.0 (c) 1999 GARRET * ? *
3 // (Main Memory Database Management System) * /\| *
4 // * / \ *
5 // Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
6 // Last update: 10-Dec-98 K.A. Knizhnik * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Intertask synchonization primitives
9 //-------------------------------------------------------------------*--------*
10
11 #define INSIDE_FASTDB
12
13 #include "stdtp.h"
14 #include "sync.h"
15 #ifndef _WINCE
16 #include <sys/stat.h>
17 #endif
18
19 #ifdef VXWORKS
20 #include "fastdbShim.h"
21 #endif
22
23 BEGIN_FASTDB_NAMESPACE
24
25 #ifndef _WIN32
26
27 // Unix specific
28
29 #define ACCESS_PERMISSION_MASK 0666
30
getCurrentTimeMsec()31 unsigned dbSystem::getCurrentTimeMsec()
32 {
33 struct timeval tv;
34 gettimeofday(&tv, NULL);
35 return tv.tv_sec*1000 + tv.tv_usec / 1000;
36 }
37
38
39 #if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
40 END_FASTDB_NAMESPACE
41 #include <errno.h>
42 BEGIN_FASTDB_NAMESPACE
43
44 #define PRINT_ERROR(func) perror(func)
45
46 char const* keyFileDir = "/tmp/";
47
48 #ifndef USE_POSIX_SEMAPHORES
49 END_FASTDB_NAMESPACE
50 #include <signal.h>
51 BEGIN_FASTDB_NAMESPACE
alarm_handler(int)52 static void alarm_handler(int){}
53
54 class moduleInitializer {
55 public:
moduleInitializer()56 moduleInitializer() {
57 static struct sigaction sigact;
58 sigact.sa_handler = alarm_handler;
59 ::sigaction(SIGALRM, &sigact, NULL);
60 }
61 };
62
63 static moduleInitializer initializer; // install SIGLARM handler
64 #endif // USE_POSIX_SEMAPHORES
65
66 #endif // use SysV primitives
67
68
69
70 #if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP || !defined(USE_POSIX_SEMAPHORES)
71
72 #if defined(USE_STD_FTOK)
hashFunction(char const * s)73 int hashFunction(char const* s)
74 {
75 int ch, h = 0;
76 while ((ch = *s++) != '\0') {
77 h ^= ch;
78 }
79 h &= 0xFF;
80 if (h == 0) {
81 h = 1;
82 }
83 return h;
84 }
85
getKeyFromFile(char const * file)86 int getKeyFromFile(char const* file)
87 {
88 return ftok(file, hashFunction(file));
89 }
90 #else
getKeyFromFile(char const * path)91 int getKeyFromFile(char const* path)
92 {
93 struct stat st;
94 #ifdef VXWORKS
95 // we don't have file system now. so we generate a random key at this place
96 static int uni = 1;
97 st.st_dev = uni;
98 st.st_ino = uni++;
99 #else
100 if (::stat(path, &st) < 0) {
101 return (key_t)-1;
102 }
103 #endif
104 return (key_t)(((st.st_dev & 0x7f) << 24) ^ (st.st_ino & 0x7fffffff));
105 }
106 #endif
107
108
109 #if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
open(char const * name,size_t size)110 bool dbSharedMemory::open(char const* name, size_t size)
111 {
112 char* fileName = (char*)name;
113 #ifndef VXWORKS
114 //Why this portion is commented is we don't have file system now
115 //It is used to generate keys. Once our file system is up then we can use this
116 if (strchr(name, '/') == NULL) {
117 fileName = new char[strlen(name)+strlen(keyFileDir)+1];
118 sprintf(fileName, "%s%s", keyFileDir, name);
119 }
120 int fd = ::open(fileName, O_RDWR|O_CREAT, ACCESS_PERMISSION_MASK);
121 if (fd < 0) {
122 if (fileName != name) {
123 delete[] fileName;
124 }
125 return false;
126 }
127 ::close(fd);
128 #endif // ndef VXWORKS
129 int key = getKeyFromFile(fileName);
130 if (fileName != name) {
131 delete[] fileName;
132 }
133 if (key < 0) {
134 return false;
135 }
136 shm = shmget(key, DOALIGN(size, 4096), IPC_CREAT|ACCESS_PERMISSION_MASK);
137 if (shm < 0) {
138 return false;
139 }
140 ptr = (char*)shmat(shm, NULL, 0);
141 return (ptr != (char*)-1);
142 }
143
close()144 void dbSharedMemory::close()
145 {
146 shmdt((char*)ptr);
147 }
148
erase()149 void dbSharedMemory::erase()
150 {
151 close();
152 shmctl(shm, IPC_RMID, NULL);
153 }
154 #endif // use SysV shmat
155 #endif
156
157 ////////////////////////////////////////////////////////////////////////
158 // If we are to use the local implementation of dbSemaphore and dbEvent
159 // (which currently uses SysV based semaphores)
160 #ifndef USE_POSIX_SEMAPHORES
sem_init(int & sem,char const * name,unsigned init_value)161 int sem_init(int& sem, char const* name, unsigned init_value)
162 {
163 key_t key = IPC_PRIVATE;
164 int semid;
165 struct sembuf sops[3];
166 sops[0].sem_num = 1;
167 sops[0].sem_op = 0; /* check if semaphore was already initialized */
168 sops[0].sem_flg = IPC_NOWAIT;
169 sops[1].sem_num = 1;
170 sops[1].sem_op = 1; /* mark semaphore as initialized */
171 sops[1].sem_flg = 0;
172 sops[2].sem_num = 0;
173 sops[2].sem_op = init_value;
174 sops[2].sem_flg = 0;
175 if (name != NULL) {
176 int fd;
177 char* path = (char*)name;
178 if (strchr(name, '/') == NULL) {
179 path = new char[strlen(name)+strlen(keyFileDir)+1];
180 sprintf(path, "%s%s", keyFileDir, name);
181 }
182 fd = open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
183 if (fd < 0) {
184 if (path != name) {
185 delete[] path;
186 }
187 PRINT_ERROR("open");
188 return -1;
189 }
190 close(fd);
191 key = getKeyFromFile(path);
192 if (path != name) {
193 delete[] path;
194 }
195 if (key < 0) {
196 PRINT_ERROR("getKeyFromFile");
197 return -1;
198 }
199 }
200 semid = semget(key, 2, IPC_CREAT|ACCESS_PERMISSION_MASK);
201 if (semid < 0) {
202 PRINT_ERROR("semget");
203 return -1;
204 }
205 if (semop(semid, sops, itemsof(sops)) != 0 && errno != EAGAIN) {
206 PRINT_ERROR("semop");
207 return -1;
208 }
209 sem = semid;
210 return 0;
211 }
212
213 enum wait_status { wait_ok, wait_timeout_expired, wait_error };
214
wait_semaphore(int & sem,unsigned msec,struct sembuf * sops,int n_sops)215 static wait_status wait_semaphore(int& sem, unsigned msec,
216 struct sembuf* sops, int n_sops)
217 {
218 if (msec != INFINITE) {
219 struct timeval start;
220 struct timeval stop;
221 gettimeofday(&start, NULL);
222 unsigned long usec = start.tv_usec + msec % 1000 * 1000;
223 stop.tv_usec = usec % 1000000;
224 stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000;
225
226 while (true) {
227 struct itimerval it;
228 it.it_interval.tv_sec = 0;
229 it.it_interval.tv_usec = 0;
230 it.it_value.tv_sec = stop.tv_sec - start.tv_sec;
231 it.it_value.tv_usec = stop.tv_usec - start.tv_usec;
232 if (stop.tv_usec < start.tv_usec) {
233 it.it_value.tv_usec += 1000000;
234 it.it_value.tv_sec -= 1;
235 }
236 if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
237 return wait_error;
238 }
239 if (semop(sem, sops, n_sops) == 0) {
240 break;
241 }
242 if (errno != EINTR) {
243 return wait_error;
244 }
245 gettimeofday(&start, NULL);
246 if (stop.tv_sec < start.tv_sec ||
247 (stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec))
248 {
249 return wait_timeout_expired;
250 }
251 }
252 } else {
253 while (semop(sem, sops, n_sops) < 0) {
254 if (errno != EINTR) {
255 return wait_error;
256 }
257 }
258 }
259 return wait_ok;
260 }
261
262
wait(unsigned msec)263 bool dbSemaphore::wait(unsigned msec)
264 {
265 static struct sembuf sops[] = {{0, -1, 0}};
266 wait_status ws = wait_semaphore(s, msec, sops, itemsof(sops));
267 assert(ws != wait_error);
268 return ws == wait_ok;
269 }
270
signal(unsigned inc)271 void dbSemaphore::signal(unsigned inc)
272 {
273 if (inc != 0) {
274 struct sembuf sops[1];
275 sops[0].sem_num = 0;
276 sops[0].sem_op = inc;
277 sops[0].sem_flg = 0;
278 int rc = semop(s, sops, 1);
279 assert(rc == 0);
280 }
281 }
282
283
watch()284 bool dbWatchDog::watch()
285 {
286 static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
287 int rc;
288 while ((rc = semop(id, sops, 1)) < 0 && errno == EINTR);
289 return rc == 0;
290 }
291
close()292 void dbWatchDog::close()
293 {
294 semctl(id, 0, IPC_RMID, NULL);
295 }
296
open(char const * name)297 bool dbWatchDog::open(char const* name)
298 {
299 return open(name, ACCESS_PERMISSION_MASK);
300 }
301
open(char const * name,int flags)302 bool dbWatchDog::open(char const* name, int flags)
303 {
304 key_t key = IPC_PRIVATE;
305 if (name != NULL) {
306 int fd;
307 char* path = (char*)name;
308 if (strchr(name, '/') == NULL) {
309 path = new char[strlen(name)+strlen(keyFileDir)+1];
310 sprintf(path, "%s%s", keyFileDir, name);
311 }
312 fd = ::open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
313 if (fd < 0) {
314 if (path != name) {
315 delete[] path;
316 }
317 PRINT_ERROR("open");
318 return -1;
319 }
320 ::close(fd);
321 key = getKeyFromFile(path);
322 if (path != name) {
323 delete[] path;
324 }
325 if (key < 0) {
326 PRINT_ERROR("getKeyFromFile");
327 return -1;
328 }
329 }
330 return (id = semget(key, 1, flags)) >= 0;
331 }
332
create(char const * name)333 bool dbWatchDog::create(char const* name) {
334 static struct sembuf sops[] = {{0, 1, 0}, {0, -1, SEM_UNDO}};
335 if (open(name, IPC_CREAT|ACCESS_PERMISSION_MASK)) {
336 return semop(id, sops, 2) == 0;
337 }
338 return false;
339 }
340
341
342
343 #if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__)
344 /* union semun is defined by including <sys/sem.h> */
345 #else
346 union semun {
347 int val;
348 struct semid_ds* buf;
349 unsigned short* array;
350 };
351 #endif
352 static union semun u;
353
354
reset()355 void dbSemaphore::reset()
356 {
357 u.val = 0;
358 int rc = semctl(s, 0, SETVAL, u);
359 assert(rc >= 0);
360 }
361
open(char const * name,unsigned init_value)362 bool dbSemaphore::open(char const* name, unsigned init_value)
363 {
364 return sem_init(s, name, init_value) == 0;
365 }
366
close()367 void dbSemaphore::close() {}
368
erase()369 void dbSemaphore::erase() {
370 semctl(s, 0, IPC_RMID, &u);
371 }
372
wait(unsigned msec)373 bool dbEvent::wait(unsigned msec)
374 {
375 static struct sembuf sops[] = {{0, -1, 0}, {0, 1, 0}};
376 wait_status ws = wait_semaphore(e, msec, sops, itemsof(sops));
377 assert(ws != wait_error);
378 return ws == wait_ok;
379 }
380
signal()381 void dbEvent::signal()
382 {
383 static struct sembuf sops[] = {{0, 0, IPC_NOWAIT}, {0, 1, 0}};
384 int rc = semop(e, sops, itemsof(sops));
385 assert(rc == 0 || errno == EAGAIN);
386 }
387
reset()388 void dbEvent::reset()
389 {
390 static struct sembuf sops[] = {{0, -1, IPC_NOWAIT}};
391 int rc = semop(e, sops, itemsof(sops));
392 assert(rc == 0 || errno == EAGAIN);
393 }
394
open(char const * name,bool signaled)395 bool dbEvent::open(char const* name, bool signaled)
396 {
397 // XXX: sem_init is POSIX, the rest of these calls are SysV.
398 return sem_init(e, name, signaled) == 0;
399 }
400
close()401 void dbEvent::close() {}
402
erase()403 void dbEvent::erase() {
404 semctl(e, 0, IPC_RMID, &u);
405 }
406
407 #if defined(USE_INTERNAL_CS_IMPL)
408
409 ////////////////////////////////////////////////////////////////////
410 // dbGLobalCriticalSection internal implementation
411
412 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__)) && !defined(RECOVERABLE_CRITICAL_SECTION)
413
enter()414 void dbGlobalCriticalSection::enter()
415 {
416 int inc = -1;
417 __asm__ __volatile__(
418 "lock; xadd %0,%1"
419 :"=d" (inc), "=m" (*count)
420 :"d" (inc), "m" (*count));
421 if (inc != 1) {
422 static struct sembuf sops[] = {{0, -1, 0}};
423 int rc;
424 while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
425 assert(rc == 0);
426 }
427 #if GLOBAL_CS_DEBUG
428 owner = pthread_self();
429 #endif
430 }
431
leave()432 void dbGlobalCriticalSection::leave()
433 {
434 int inc = 1;
435 #if GLOBAL_CS_DEBUG
436 owner = 0;
437 #endif
438 __asm__ __volatile__(
439 "lock; xadd %0,%1"
440 :"=d" (inc), "=m" (*count)
441 :"d" (inc), "m" (*count));
442 if (inc != 0) {
443 /* some other processes waiting to enter critical section */
444 static struct sembuf sops[] = {{0, 1, 0}};
445 int rc = semop(semid, sops, 1);
446 assert(rc == 0);
447 }
448 }
449
create(char const * name,sharedsem_t * count)450 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
451 {
452 this->count = count;
453 *count = 1;
454 return sem_init(semid, name, 0) == 0;
455 }
456
open(char const * name,sharedsem_t * count)457 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
458 {
459 this->count = count;
460 return sem_init(semid, name, 0) == 0;
461 }
462
463 #elif !defined(RECOVERABLE_CRITICAL_SECTION) && defined(__GNUC__) && defined (__GNUC_MINOR__) && ((4 < __GNUC__) || (4 == __GNUC__ && 1 <= __GNUC_MINOR__)) && !defined(__arm__)
464
enter()465 void dbGlobalCriticalSection::enter()
466 {
467 if (__sync_add_and_fetch(count, 1) != 1) {
468 static struct sembuf sops[] = {{0, -1, 0}};
469 int rc;
470 while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
471 assert(rc == 0);
472 }
473 #if GLOBAL_CS_DEBUG
474 owner = pthread_self();
475 #endif
476 }
477
leave()478 void dbGlobalCriticalSection::leave()
479 {
480 #if GLOBAL_CS_DEBUG
481 owner = 0;
482 #endif
483 if (__sync_add_and_fetch(count, -1) != 0) {
484 /* some other processes waiting to enter critical section */
485 static struct sembuf sops[] = {{0, 1, 0}};
486 int rc = semop(semid, sops, 1);
487 assert(rc == 0);
488 }
489 }
490
create(char const * name,sharedsem_t * count)491 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
492 {
493 this->count = count;
494 *count = 0;
495 return sem_init(semid, name, 0) == 0;
496 }
497
open(char const * name,sharedsem_t * count)498 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
499 {
500 this->count = count;
501 return sem_init(semid, name, 0) == 0;
502 }
503
504 #else // defined(__GNUC__) && defined(i386)
505 // "lowest" case, use a SysV semaphore for complete portability
enter()506 void dbGlobalCriticalSection::enter()
507 {
508 static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
509 int rc;
510 while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
511 assert(rc == 0);
512 #if GLOBAL_CS_DEBUG
513 owner = pthread_self();
514 #endif
515 }
516
leave()517 void dbGlobalCriticalSection::leave()
518 {
519 #if GLOBAL_CS_DEBUG
520 owner = 0;
521 #endif
522 static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
523 int rc = semop(semid, sops, 1);
524 assert(rc == 0);
525 }
526
open(char const * name,sharedsem_t *)527 bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
528 {
529 // XXX: sem_init is Posix, the rest of these calls are SysV.
530 return sem_init(semid, name, 1) == 0;
531 }
532
create(char const * name,sharedsem_t *)533 bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
534 {
535 return sem_init(semid, name, 1) == 0;
536 }
537
538 #endif // defined(__GNUC__) && defined(i386)
539
erase()540 void dbGlobalCriticalSection::erase()
541 {
542 semctl(semid, 0, IPC_RMID, &u);
543 }
544
545 #endif // USE_INTERNAL_CS_IMPL
546
547 dbInitializationMutex::initializationStatus
initialize(char const * name)548 dbInitializationMutex::initialize(char const* name)
549 {
550 struct sembuf sops[4];
551 char* path = (char*)name;
552 if (strchr(name, '/') == NULL) {
553 path = new char[strlen(name)+strlen(keyFileDir)+1];
554 sprintf(path, "%s%s", keyFileDir, name);
555 }
556 int fd = open(path, O_WRONLY|O_CREAT, ACCESS_PERMISSION_MASK);
557 if (fd < 0) {
558 if (path != name) {
559 delete[] path;
560 }
561 PRINT_ERROR("open");
562 return InitializationError;
563 }
564 ::close(fd);
565 int key = getKeyFromFile(path);
566 if (path != name) {
567 delete[] path;
568 }
569 if (key < 0) {
570 PRINT_ERROR("getKeyFromFile");
571 return InitializationError;
572 }
573 while (true) {
574 semid = semget(key, 3, IPC_CREAT|ACCESS_PERMISSION_MASK);
575 if (semid < 0) {
576 PRINT_ERROR("semget");
577 return InitializationError;
578 }
579 // Semaphore 0 - number of active processes
580 // Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
581 // Semaphore 2 - semaphore was destroyed
582
583 sops[0].sem_num = 0;
584 sops[0].sem_op = 0; /* check if semaphore was already initialized */
585 sops[0].sem_flg = IPC_NOWAIT;
586 sops[1].sem_num = 0;
587 sops[1].sem_op = 1; /* increment number of active processes */
588 sops[1].sem_flg = SEM_UNDO;
589 sops[2].sem_num = 1;
590 sops[2].sem_op = 1; /* initialization in process */
591 sops[2].sem_flg = SEM_UNDO;
592 sops[3].sem_num = 2;
593 sops[3].sem_op = 0; /* check if semaphore was destroyed */
594 sops[3].sem_flg = IPC_NOWAIT;
595 if (semop(semid, sops, 4) < 0) {
596 if (errno == EAGAIN) {
597 sops[0].sem_num = 0;
598 sops[0].sem_op = -1; /* check if semaphore was already initialized */
599 sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
600 sops[1].sem_num = 1;
601 sops[1].sem_op = 0; /* wait until inialization completed */
602 sops[1].sem_flg = 0;
603 sops[2].sem_num = 0;
604 sops[2].sem_op = 2; /* increment number of active processes */
605 sops[2].sem_flg = SEM_UNDO;
606 sops[3].sem_num = 2;
607 sops[3].sem_op = 0; /* check if semaphore was destroyed */
608 sops[3].sem_flg = IPC_NOWAIT;
609 if (semop(semid, sops, 4) == 0) {
610 return AlreadyInitialized;
611 }
612 if (errno == EAGAIN) {
613 sleep(1);
614 continue;
615 }
616 }
617 if (errno == EIDRM) {
618 continue;
619 }
620 PRINT_ERROR("semop");
621 return InitializationError;
622 } else {
623 return NotYetInitialized;
624 }
625 }
626 }
627
done()628 void dbInitializationMutex::done()
629 {
630 struct sembuf sops[1];
631 sops[0].sem_num = 1;
632 sops[0].sem_op = -1; /* initialization done */
633 sops[0].sem_flg = SEM_UNDO;
634 int rc = semop(semid, sops, 1);
635 assert(rc == 0);
636 }
637
close()638 bool dbInitializationMutex::close()
639 {
640 int rc;
641 struct sembuf sops[3];
642 while (true) {
643 sops[0].sem_num = 0;
644 sops[0].sem_op = -1; /* decrement process couter */
645 sops[0].sem_flg = SEM_UNDO;
646 sops[1].sem_num = 0;
647 sops[1].sem_op = 0; /* check if there are no more active processes */
648 sops[1].sem_flg = IPC_NOWAIT;
649 sops[2].sem_num = 2;
650 sops[2].sem_op = 1; /* mark as destructed */
651 sops[2].sem_flg = SEM_UNDO;
652 if ((rc = semop(semid, sops, 3)) == 0) {
653 return true;
654 } else {
655 assert(errno == EAGAIN);
656 }
657 sops[0].sem_num = 0;
658 sops[0].sem_op = -2; /* decrement process couter and check for non-zero */
659 sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
660 sops[1].sem_num = 0;
661 sops[1].sem_op = 1;
662 sops[1].sem_flg = SEM_UNDO;
663 if ((rc = semop(semid, sops, 2)) == 0) {
664 return false;
665 } else {
666 assert(errno == EAGAIN);
667 }
668 }
669 }
670
erase()671 void dbInitializationMutex::erase()
672 {
673 semctl(semid, 0, IPC_RMID, &u);
674 }
675 #endif // !USE_POSIX_SEMAPHORES
676
677
678
679 // Thread stuff
680 #ifndef NO_PTHREADS
681
682 #if defined(_SC_NPROCESSORS_ONLN)
numberOfProcessors()683 int dbThread::numberOfProcessors() {
684 return sysconf(_SC_NPROCESSORS_ONLN);
685 }
686 #elif defined(__linux__)
687 END_FASTDB_NAMESPACE
688 #include <linux/smp.h>
689 BEGIN_FASTDB_NAMESPACE
numberOfProcessors()690 int dbThread::numberOfProcessors() { return smp_num_cpus; }
691 #elif defined(__FreeBSD__) || defined(__bsdi__) || defined(__OpenBSD__) || defined(__NetBSD__)
692 #if defined(__bsdi__) || defined(__OpenBSD__)
693 END_FASTDB_NAMESPACE
694 #include <sys/param.h>
695 BEGIN_FASTDB_NAMESPACE
696 #endif
697 END_FASTDB_NAMESPACE
698 #include <sys/sysctl.h>
699 BEGIN_FASTDB_NAMESPACE
numberOfProcessors()700 int dbThread::numberOfProcessors() {
701 int mib[2],ncpus=0;
702 size_t len=sizeof(ncpus);
703 mib[0]= CTL_HW;
704 mib[1]= HW_NCPU;
705 sysctl(mib,2,&ncpus,&len,NULL,0);
706 return ncpus;
707 }
708 #else
numberOfProcessors()709 int dbThread::numberOfProcessors() { return 1; }
710 #endif
711 #endif // NO_PTHREADS
712
713
714 #else // _WIN32
715
716 // Win32 specific code
717 unsigned dbSystem::getCurrentTimeMsec()
718 {
719 return GetTickCount();
720 }
721
722 #ifdef SET_NULL_DACL
723 dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
724 #endif
725
726 #endif // __WIN32
727
728 //////////////////////////////////////////////////////////////
729 // Common W32 and Unix platform code follows
730
pooledThreadFunc(void * arg)731 void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
732 {
733 ((dbPooledThread*)arg)->run();
734 }
735
dbPooledThread(dbThreadPool * threadPool)736 dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
737 {
738 pool = threadPool;
739 startSem.open();
740 readySem.open();
741 next = NULL;
742 running = true;
743 thread.create(&pooledThreadFunc, this);
744 }
745
~dbPooledThread()746 dbPooledThread::~dbPooledThread()
747 {
748 startSem.close();
749 readySem.close();
750 }
751
stop()752 void dbPooledThread::stop()
753 {
754 running = false;
755 startSem.signal();
756 readySem.wait(pool->mutex);
757 }
758
run()759 void dbPooledThread::run()
760 {
761 dbCriticalSection cs(pool->mutex);
762 while (true) {
763 startSem.wait(pool->mutex);
764 if (!running) {
765 break;
766 }
767 (*f)(arg);
768 readySem.signal();
769 }
770 readySem.signal();
771 }
772
join(dbPooledThread * thr)773 void dbThreadPool::join(dbPooledThread* thr)
774 {
775 dbCriticalSection cs(mutex);
776 thr->readySem.wait(mutex);
777 thr->next = freeThreads;
778 freeThreads = thr;
779 }
780
781
create(dbThread::thread_proc_t f,void * arg)782 dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
783 {
784 dbCriticalSection cs(mutex);
785 dbPooledThread* t = freeThreads;
786 if (t == NULL) {
787 t = freeThreads = new dbPooledThread(this);
788 }
789 freeThreads = t->next;
790 t->f = f;
791 t->arg = arg;
792 t->startSem.signal();
793 return t;
794 }
795
796
dbThreadPool()797 dbThreadPool::dbThreadPool()
798 {
799 freeThreads = NULL;
800 }
801
~dbThreadPool()802 dbThreadPool::~dbThreadPool()
803 {
804 dbCriticalSection cs(mutex);
805 dbPooledThread *t, *next;
806 for (t = freeThreads; t != NULL; t = next) {
807 next = t->next;
808 t->stop();
809 delete t;
810 }
811 }
812
813 END_FASTDB_NAMESPACE
814
815