1 /*
2 * tlibthrd.cxx
3 *
4 * Routines for pre-emptive threading system
5 *
6 * Portable Windows Library
7 *
8 * Copyright (c) 1993-1998 Equivalence Pty. Ltd.
9 *
10 * The contents of this file are subject to the Mozilla Public License
11 * Version 1.0 (the "License"); you may not use this file except in
12 * compliance with the License. You may obtain a copy of the License at
13 * http://www.mozilla.org/MPL/
14 *
15 * Software distributed under the License is distributed on an "AS IS"
16 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17 * the License for the specific language governing rights and limitations
18 * under the License.
19 *
20 * The Original Code is Portable Windows Library.
21 *
22 * The Initial Developer of the Original Code is Equivalence Pty. Ltd.
23 *
24 * Portions are Copyright (C) 1993 Free Software Foundation, Inc.
25 * All Rights Reserved.
26 *
27 * Contributor(s): ______________________________________.
28 *
29 * $Revision: 28733 $
30 * $Author: rjongbloed $
31 * $Date: 2013-01-01 20:44:11 -0600 (Tue, 01 Jan 2013) $
32 */
33
34 #include <ptlib/socket.h>
35 #include <sched.h>
36 #include <pthread.h>
37 #include <sys/resource.h>
38
39 #ifdef P_RTEMS
40 #define SUSPEND_SIG SIGALRM
41 #include <sched.h>
42 #else
43 #define SUSPEND_SIG SIGVTALRM
44 #endif
45
46 #ifdef P_MACOSX
47 #include <mach/mach.h>
48 #include <mach/thread_policy.h>
49 #include <sys/param.h>
50 #include <sys/sysctl.h>
51 // going to need the main thread for adjusting relative priority
52 static pthread_t baseThread;
53 #elif defined(P_LINUX)
54 #include <sys/syscall.h>
55 #endif
56
57 #ifdef P_HAS_SEMAPHORES_XPG6
58 #include "semaphore.h"
59 #endif
60
61 int PX_NewHandle(const char *, int);
62
63 #define PPThreadKill(id, sig) PProcess::Current().PThreadKill(id, sig)
64
65
66 #define PAssertPTHREAD(func, args) \
67 { \
68 unsigned threadOpRetry = 0; \
69 while (PAssertThreadOp(func args, threadOpRetry, #func, __FILE__, __LINE__)); \
70 }
71
PAssertThreadOp(int retval,unsigned & retry,const char * funcname,const char * file,unsigned line)72 static PBoolean PAssertThreadOp(int retval,
73 unsigned & retry,
74 const char * funcname,
75 const char * file,
76 unsigned line)
77 {
78 if (retval == 0) {
79 PTRACE_IF(2, retry > 0, "PTLib\t" << funcname << " required " << retry << " retries!");
80 return false;
81 }
82
83 if (errno == EINTR || errno == EAGAIN) {
84 if (++retry < 1000) {
85 #if defined(P_RTEMS)
86 sched_yield();
87 #else
88 usleep(10000); // Basically just swap out thread to try and clear blockage
89 #endif
90 return true; // Return value to try again
91 }
92 // Give up and assert
93 }
94
95 #if P_USE_ASSERTS
96 PAssertFunc(file, line, NULL, psprintf("Function %s failed", funcname));
97 #endif
98 return false;
99 }
100
101
102 #if defined(P_LINUX)
GetSchedParam(PThread::Priority priority,sched_param & param)103 static int GetSchedParam(PThread::Priority priority, sched_param & param)
104 {
105 /*
106 Set realtime scheduling if our effective user id is root (only then is this
107 allowed) AND our priority is Highest.
108 I don't know if other UNIX OSs have SCHED_FIFO and SCHED_RR as well.
109
110 WARNING: a misbehaving thread (one that never blocks) started with Highest
111 priority can hang the entire machine. That is why root permission is
112 neccessary.
113 */
114
115 memset(¶m, 0, sizeof(sched_param));
116
117 switch (priority) {
118 case PThread::HighestPriority :
119 param.sched_priority = sched_get_priority_max(SCHED_RR);
120 break;
121
122 case PThread::HighPriority :
123 param.sched_priority = sched_get_priority_min(SCHED_RR);
124 break;
125
126 #ifdef SCHED_BATCH
127 case PThread::LowestPriority :
128 case PThread::LowPriority :
129 return SCHED_BATCH;
130 #endif
131
132 default : // PThread::NormalPriority :
133 return SCHED_OTHER;
134 }
135
136 if (geteuid() == 0)
137 return SCHED_RR;
138
139 param.sched_priority = 0;
140 PTRACE(2, "PTLib\tNo permission to set priority level " << priority);
141 return SCHED_OTHER;
142 }
143 #endif
144
145
146 PDECLARE_CLASS(PHouseKeepingThread, PThread)
147 public:
148 PHouseKeepingThread()
149 : PThread(1000, NoAutoDeleteThread, HighestPriority, "Housekeeper")
150 { closing = false; Resume(); }
151
152 void Main();
153 void SetClosing() { closing = true; }
154
155 protected:
156 PBoolean closing;
157 };
158
159
160 static pthread_mutex_t MutexInitialiser = PTHREAD_MUTEX_INITIALIZER;
161
162
163 #define new PNEW
164
165
166 void PHouseKeepingThread::Main()
167 {
168 PProcess & process = PProcess::Current();
169
170 while (!closing) {
171 PTimeInterval delay = process.timers.Process();
172 if (delay > 10000)
173 delay = 10000;
174
175 process.breakBlock.Wait(delay);
176
177 process.m_activeThreadMutex.Wait();
178 PBoolean found;
179 do {
180 found = false;
181 for (PProcess::ThreadMap::iterator it = process.m_activeThreads.begin(); it != process.m_activeThreads.end(); ++it) {
182 PThread * thread = it->second;
183 if (thread->IsAutoDelete() && thread->IsTerminated()) {
184 process.m_activeThreads.erase(it);
185
186 // unlock the m_activeThreadMutex to avoid deadlocks:
187 // if somewhere in the destructor a call to PTRACE() is made,
188 // which itself calls PThread::Current(), deadlocks are possible
189 thread->m_threadId = 0;
190 process.m_activeThreadMutex.Signal();
191 delete thread;
192 process.m_activeThreadMutex.Wait();
193
194 found = true;
195 break;
196 }
197 }
198 } while (found);
199 process.m_activeThreadMutex.Signal();
200
201 process.PXCheckSignals();
202 }
203
204 PTRACE(5, "Housekeeping thread ended");
205 }
206
207
208 bool PProcess::SignalTimerChange()
209 {
210 if (!PAssert(IsInitialised(), PLogicError) || m_shuttingDown)
211 return false;
212
213 PWaitAndSignal m(housekeepingMutex);
214 if (housekeepingThread == NULL) {
215 #if PMEMORY_CHECK
216 PBoolean oldIgnoreAllocations = PMemoryHeap::SetIgnoreAllocations(true);
217 #endif
218 housekeepingThread = new PHouseKeepingThread;
219 #if PMEMORY_CHECK
220 PMemoryHeap::SetIgnoreAllocations(oldIgnoreAllocations);
221 #endif
222 }
223
224 breakBlock.Signal();
225 return true;
226 }
227
228
229 void PProcess::Construct()
230 {
231 #ifndef P_RTEMS
232 // get the file descriptor limit
233 struct rlimit rl;
234 PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
235 maxHandles = rl.rlim_cur;
236 PTRACE(4, "PTLib\tMaximum per-process file handles is " << maxHandles);
237 #else
238 maxHandles = 500; // arbitrary value
239 #endif
240
241 // initialise the housekeeping thread
242 housekeepingThread = NULL;
243
244 #ifdef P_MACOSX
245 // records the main thread for priority adjusting
246 baseThread = pthread_self();
247 #endif
248
249 CommonConstruct();
250 }
251
252
253 PBoolean PProcess::SetMaxHandles(int newMax)
254 {
255 #ifndef P_RTEMS
256 // get the current process limit
257 struct rlimit rl;
258 PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
259
260 // set the new current limit
261 rl.rlim_cur = newMax;
262 if (setrlimit(RLIMIT_NOFILE, &rl) == 0) {
263 PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
264 maxHandles = rl.rlim_cur;
265 if (maxHandles == newMax) {
266 PTRACE(2, "PTLib\tNew maximum per-process file handles set to " << maxHandles);
267 return true;
268 }
269 }
270 #endif // !P_RTEMS
271
272 PTRACE(1, "PTLib\tCannot set per-process file handle limit to "
273 << newMax << " (is " << maxHandles << ") - check permissions");
274 return false;
275 }
276
277
278 PProcess::~PProcess()
279 {
280 PreShutdown();
281
282 // Don't wait for housekeeper to stop if Terminate() is called from it.
283 {
284 PWaitAndSignal m(housekeepingMutex);
285 if ((housekeepingThread != NULL) && (PThread::Current() != housekeepingThread)) {
286 housekeepingThread->SetClosing();
287 SignalTimerChange();
288 housekeepingThread->WaitForTermination();
289 delete housekeepingThread;
290 }
291 }
292
293 CommonDestruct();
294
295 PostShutdown();
296 }
297
298 PBoolean PProcess::PThreadKill(pthread_t id, unsigned sig)
299 {
300 PWaitAndSignal m(m_activeThreadMutex);
301
302 if (m_activeThreads.find(id) == m_activeThreads.end())
303 return false;
304
305 return pthread_kill(id, sig) == 0;
306 }
307
308 void PProcess::PXSetThread(pthread_t id, PThread * thread)
309 {
310 PThread * currentThread = NULL;
311
312 m_activeThreadMutex.Wait();
313
314 ThreadMap::iterator it = m_activeThreads.find(id);
315 if (it != m_activeThreads.end() && it->second->IsAutoDelete())
316 currentThread = it->second;
317
318 m_activeThreads[id] = thread;
319
320 m_activeThreadMutex.Signal();
321
322 if (currentThread != NULL)
323 delete currentThread;
324 }
325
326 //////////////////////////////////////////////////////////////////////////////
327
328 //
329 // Called to construct a PThread for either:
330 //
331 // a) The primordial PProcesss thread
332 // b) A non-PTLib thread that needs to use PTLib routines, such as PTRACE
333 //
334 // This is always called in the context of the running thread, so naturally, the thread
335 // is not paused
336 //
337
338 PThread::PThread(bool isProcess)
339 : m_isProcess(isProcess)
340 , m_autoDelete(!isProcess)
341 , m_originalStackSize(0) // 0 indicates external thread
342 , m_threadId(pthread_self())
343 , PX_priority(NormalPriority)
344 #if defined(P_LINUX)
345 , PX_linuxId(syscall(SYS_gettid))
346 #endif
347 , PX_suspendMutex(MutexInitialiser)
348 , PX_suspendCount(0)
349 , PX_firstTimeStart(false)
350 #ifndef P_HAS_SEMAPHORES
351 , PX_waitingSemaphore(NULL)
352 , PX_WaitSemMutex(MutexInitialiser)
353 #endif
354 {
355 #ifdef P_RTEMS
356 PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);
357 #else
358 PAssertOS(::pipe(unblockPipe) == 0);
359 #endif
360
361 if (isProcess)
362 return;
363
364 PProcess & process = PProcess::Current();
365 process.PXSetThread(m_threadId, this);
366 process.SignalTimerChange();
367 }
368
369
370 //
371 // Called to construct a PThread for a normal PTLib thread.
372 //
373 // This is always called in the context of some other thread, and
374 // the PThread is always created in the paused state
375 //
376 PThread::PThread(PINDEX stackSize,
377 AutoDeleteFlag deletion,
378 Priority priorityLevel,
379 const PString & name)
380 : m_isProcess(false)
381 , m_autoDelete(deletion == AutoDeleteThread)
382 , m_originalStackSize(stackSize) // 0 indicates PTLib created thread
383 , m_threadName(name)
384 , m_threadId(0) // 0 indicates thread has not started
385 , PX_priority(priorityLevel)
386 #if defined(P_LINUX)
387 , PX_linuxId(0)
388 #endif
389 , PX_suspendMutex(MutexInitialiser)
390 , PX_suspendCount(1)
391 , PX_firstTimeStart(true) // new thread is actually started the first time Resume() is called.
392 #ifndef P_HAS_SEMAPHORES
393 , PX_waitingSemaphore(NULL)
394 , PX_WaitSemMutex(MutexInitialiser)
395 #endif
396 {
397 PAssert(stackSize > 0, PInvalidParameter);
398
399 #ifdef P_RTEMS
400 PAssertOS(socketpair(AF_INET,SOCK_STREAM,0,unblockPipe) == 0);
401 #else
402 PAssertOS(::pipe(unblockPipe) == 0);
403 #endif
404 PX_NewHandle("Thread unblock pipe", PMAX(unblockPipe[0], unblockPipe[1]));
405
406 // If need to be deleted automatically, make sure thread that does it runs.
407 if (m_autoDelete)
408 PProcess::Current().SignalTimerChange();
409
410 PTRACE(5, "PTLib\tCreated thread " << this << ' ' << m_threadName);
411 }
412
413 //
414 // Called to destruct a PThread
415 //
416 // If not called in the context of the thread being destroyed, we need to wait
417 // for that thread to stop before continuing
418 //
419
420 PThread::~PThread()
421 {
422 if (PProcessInstance == NULL) {
423 #if PTRACING
424 PTrace::Cleanup();
425 #endif
426 } else {
427 pthread_t id = m_threadId;
428 PProcess & process = PProcess::Current();
429
430 // need to terminate the thread if it was ever started and it is not us
431 if ((id != 0) && (id != pthread_self()))
432 Terminate();
433
434 // cause the housekeeping thread to be created, if not already running
435 process.SignalTimerChange();
436
437 // last gasp tracing
438 PTRACE(5, "PTLib\tDestroyed thread " << this << ' ' << m_threadName << "(id = " << ::hex << id << ::dec << ")");
439
440
441 // if thread was started, remove it from the active thread list and detach it to release thread resources
442 if (id != 0) {
443 process.m_activeThreadMutex.Wait();
444 if (m_originalStackSize != 0)
445 pthread_detach(id);
446 process.m_activeThreads.erase(id);
447 process.m_activeThreadMutex.Signal();
448 }
449
450 // cause the housekeeping thread to wake up (we know it must be running)
451 process.SignalTimerChange();
452 }
453
454 // close I/O unblock pipes
455 ::close(unblockPipe[0]);
456 ::close(unblockPipe[1]);
457
458 #ifndef P_HAS_SEMAPHORES
459 pthread_mutex_destroy(&PX_WaitSemMutex);
460 #endif
461
462 // If the mutex was not locked, the unlock will fail */
463 pthread_mutex_trylock(&PX_suspendMutex);
464 pthread_mutex_unlock(&PX_suspendMutex);
465 pthread_mutex_destroy(&PX_suspendMutex);
466 }
467
468
469 void * PThread::PX_ThreadStart(void * arg)
470 {
471 PThread * thread = (PThread *)arg;
472 // Added this to guarantee that the thread creation (PThread::Restart)
473 // has completed before we start the thread. Then the m_threadId has
474 // been set.
475 pthread_mutex_lock(&thread->PX_suspendMutex);
476 thread->SetThreadName(thread->GetThreadName());
477 #if defined(P_LINUX)
478 thread->PX_linuxId = syscall(SYS_gettid);
479 thread->PX_startTick = PTimer::Tick();
480 #endif
481 pthread_mutex_unlock(&thread->PX_suspendMutex);
482
483 // make sure the cleanup routine is called when the thread exits
484 //pthread_cleanup_push(&PThread::PX_ThreadEnd, arg);
485
486 #if defined(P_LINUX)
487 PTRACE(5, "PTLib\tStarted thread " << thread << " (" << thread->PX_linuxId << ") " << thread->GetThreadName());
488 #else
489 PTRACE(5, "PTLib\tStarted thread " << thread << ' ' << thread->GetThreadName());
490 #endif
491
492 PProcess::Current().OnThreadStart(*thread);
493
494 // now call the the thread main routine
495 thread->Main();
496
497 // execute the cleanup routine
498 //pthread_cleanup_pop(1);
499 PX_ThreadEnd(arg);
500
501 // clean up tracing
502 #if PTRACING
503 PTrace::Cleanup();
504 #endif
505
506 // Inform the helgrind finite state machine that this thread has finished
507 // Commented out as on some platforms it causes a crash, no idea why!
508 // pthread_exit(0);
509
510 return NULL;
511 }
512
513
514 void PThread::PX_ThreadEnd(void * arg)
515 {
516 PThread * thread = (PThread *)arg;
517 PProcess & process = PProcess::Current();
518
519 #if defined(P_LINUX)
520 thread->PX_endTick = PTimer::Tick();
521 #endif
522
523 process.OnThreadEnded(*thread);
524 }
525
526
527 void PThread::Restart()
528 {
529 if (!IsTerminated())
530 return;
531
532 pthread_attr_t threadAttr;
533 pthread_attr_init(&threadAttr);
534
535 #if defined(P_LINUX)
536
537 // Set a decent (256K) stack size that won't eat all virtual memory
538 pthread_attr_setstacksize(&threadAttr, 16*PTHREAD_STACK_MIN);
539
540 struct sched_param sched_params;
541 PAssertPTHREAD(pthread_attr_setschedpolicy, (&threadAttr, GetSchedParam(PX_priority, sched_params)));
542 PAssertPTHREAD(pthread_attr_setschedparam, (&threadAttr, &sched_params));
543
544 #elif defined(P_RTEMS)
545 pthread_attr_setstacksize(&threadAttr, 2*PTHREAD_MINIMUM_STACK_SIZE);
546 pthread_attr_setinheritsched(&threadAttr, PTHREAD_EXPLICIT_SCHED);
547 pthread_attr_setschedpolicy(&threadAttr, SCHED_OTHER);
548 struct sched_param sched_param;
549 sched_param.sched_priority = 125; /* set medium priority */
550 pthread_attr_setschedparam(&threadAttr, &sched_param);
551 #endif
552
553 PProcess & process = PProcess::Current();
554
555 // lock the thread list
556 process.m_activeThreadMutex.Wait();
557
558 // create the thread
559 PAssertPTHREAD(pthread_create, (&m_threadId, &threadAttr, PX_ThreadStart, this));
560
561 // put the thread into the thread list
562 process.PXSetThread(m_threadId, this);
563
564 // Inside process.m_activeThreadMutex so simple static is OK
565 size_t newHighWaterMark = 0;
566 static size_t highWaterMark = 0;
567 if (process.m_activeThreads.size() > highWaterMark)
568 newHighWaterMark = highWaterMark = process.m_activeThreads.size();
569
570 // unlock the thread list
571 process.m_activeThreadMutex.Signal();
572
573 pthread_attr_destroy(&threadAttr);
574
575 PTRACE_IF(newHighWaterMark%100 == 0 ? 2 : 4, newHighWaterMark > 0,
576 "PTLib\tThread high water mark set: " << newHighWaterMark);
577
578 #ifdef P_MACOSX
579 if (PX_priority == HighestPriority) {
580 PTRACE(1, "set thread to have the highest priority (MACOSX)");
581 SetPriority(HighestPriority);
582 }
583 #endif
584 }
585
586
587 void PX_SuspendSignalHandler(int)
588 {
589 PThread * thread = PThread::Current();
590 if (thread == NULL)
591 return;
592
593 PBoolean notResumed = true;
594 while (notResumed) {
595 BYTE ch;
596 notResumed = ::read(thread->unblockPipe[0], &ch, 1) < 0 && errno == EINTR;
597 #if !( defined(P_NETBSD) && defined(P_NO_CANCEL) )
598 pthread_testcancel();
599 #endif
600 }
601 }
602
603
604 void PThread::Suspend(PBoolean susp)
605 {
606 PAssertPTHREAD(pthread_mutex_lock, (&PX_suspendMutex));
607
608 // Check for start up condition, first time Resume() is called
609 if (PX_firstTimeStart) {
610 if (susp)
611 PX_suspendCount++;
612 else {
613 if (PX_suspendCount > 0)
614 PX_suspendCount--;
615 if (PX_suspendCount == 0) {
616 PX_firstTimeStart = false;
617 Restart();
618 }
619 }
620
621 PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
622 return;
623 }
624
625 #if defined(P_MACOSX) && (P_MACOSX <= 55)
626 // Suspend - warn the user with an Assertion
627 PAssertAlways("Cannot suspend threads on Mac OS X due to lack of pthread_kill()");
628 #else
629 if (PPThreadKill(m_threadId, 0)) {
630
631 // if suspending, then see if already suspended
632 if (susp) {
633 PX_suspendCount++;
634 if (PX_suspendCount == 1) {
635 if (m_threadId != pthread_self()) {
636 signal(SUSPEND_SIG, PX_SuspendSignalHandler);
637 PPThreadKill(m_threadId, SUSPEND_SIG);
638 }
639 else {
640 PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
641 PX_SuspendSignalHandler(SUSPEND_SIG);
642 return; // Mutex already unlocked
643 }
644 }
645 }
646
647 // if resuming, then see if to really resume
648 else if (PX_suspendCount > 0) {
649 PX_suspendCount--;
650 if (PX_suspendCount == 0)
651 PXAbortBlock();
652 }
653 }
654
655 PAssertPTHREAD(pthread_mutex_unlock, (&PX_suspendMutex));
656 #endif // P_MACOSX
657 }
658
659
660 void PThread::Resume()
661 {
662 Suspend(false);
663 }
664
665
666 PBoolean PThread::IsSuspended() const
667 {
668 if (PX_firstTimeStart)
669 return true;
670
671 if (IsTerminated())
672 return false;
673
674 PAssertPTHREAD(pthread_mutex_lock, ((pthread_mutex_t *)&PX_suspendMutex));
675 PBoolean suspended = PX_suspendCount != 0;
676 PAssertPTHREAD(pthread_mutex_unlock, ((pthread_mutex_t *)&PX_suspendMutex));
677 return suspended;
678 }
679
680
681 void PThread::SetAutoDelete(AutoDeleteFlag deletion)
682 {
683 PAssert(deletion != AutoDeleteThread || (!m_isProcess && this != &PProcess::Current()), PLogicError);
684 m_autoDelete = deletion == AutoDeleteThread;
685 }
686
687 #ifdef P_MACOSX
688 // obtain thread priority of the main thread
689 static unsigned long
690 GetThreadBasePriority ()
691 {
692 thread_basic_info_data_t threadInfo;
693 policy_info_data_t thePolicyInfo;
694 unsigned int count;
695
696 if (baseThread == 0) {
697 return 0;
698 }
699
700 // get basic info
701 count = THREAD_BASIC_INFO_COUNT;
702 thread_info (pthread_mach_thread_np (baseThread), THREAD_BASIC_INFO,
703 (integer_t*)&threadInfo, &count);
704
705 switch (threadInfo.policy) {
706 case POLICY_TIMESHARE:
707 count = POLICY_TIMESHARE_INFO_COUNT;
708 thread_info(pthread_mach_thread_np (baseThread),
709 THREAD_SCHED_TIMESHARE_INFO,
710 (integer_t*)&(thePolicyInfo.ts), &count);
711 return thePolicyInfo.ts.base_priority;
712
713 case POLICY_FIFO:
714 count = POLICY_FIFO_INFO_COUNT;
715 thread_info(pthread_mach_thread_np (baseThread),
716 THREAD_SCHED_FIFO_INFO,
717 (integer_t*)&(thePolicyInfo.fifo), &count);
718 if (thePolicyInfo.fifo.depressed)
719 return thePolicyInfo.fifo.depress_priority;
720 return thePolicyInfo.fifo.base_priority;
721
722 case POLICY_RR:
723 count = POLICY_RR_INFO_COUNT;
724 thread_info(pthread_mach_thread_np (baseThread),
725 THREAD_SCHED_RR_INFO,
726 (integer_t*)&(thePolicyInfo.rr), &count);
727 if (thePolicyInfo.rr.depressed)
728 return thePolicyInfo.rr.depress_priority;
729 return thePolicyInfo.rr.base_priority;
730 }
731
732 return 0;
733 }
734 #endif
735
736 void PThread::SetPriority(Priority priorityLevel)
737 {
738 PX_priority = priorityLevel;
739
740 if (IsTerminated())
741 return;
742
743 #if defined(P_LINUX)
744 struct sched_param params;
745 PAssertPTHREAD(pthread_setschedparam, (m_threadId, GetSchedParam(priorityLevel, params), ¶ms));
746
747 #elif defined(P_MACOSX)
748 if (priorityLevel == HighestPriority) {
749 /* get fixed priority */
750 {
751 int result;
752
753 thread_extended_policy_data_t theFixedPolicy;
754 thread_precedence_policy_data_t thePrecedencePolicy;
755 long relativePriority;
756
757 theFixedPolicy.timeshare = false; // set to true for a non-fixed thread
758 result = thread_policy_set (pthread_mach_thread_np(m_threadId),
759 THREAD_EXTENDED_POLICY,
760 (thread_policy_t)&theFixedPolicy,
761 THREAD_EXTENDED_POLICY_COUNT);
762 if (result != KERN_SUCCESS) {
763 PTRACE(1, "thread_policy - Couldn't set thread as fixed priority.");
764 }
765
766 // set priority
767
768 // precedency policy's "importance" value is relative to
769 // spawning thread's priority
770
771 relativePriority = 62 - GetThreadBasePriority();
772 PTRACE(3, "relativePriority is " << relativePriority << " base priority is " << GetThreadBasePriority());
773
774 thePrecedencePolicy.importance = relativePriority;
775 result = thread_policy_set (pthread_mach_thread_np(m_threadId),
776 THREAD_PRECEDENCE_POLICY,
777 (thread_policy_t)&thePrecedencePolicy,
778 THREAD_PRECEDENCE_POLICY_COUNT);
779 if (result != KERN_SUCCESS) {
780 PTRACE(1, "thread_policy - Couldn't set thread priority.");
781 }
782 }
783 }
784 #endif
785 }
786
787
788 PThread::Priority PThread::GetPriority() const
789 {
790 #if defined(LINUX)
791 int policy;
792 struct sched_param params;
793
794 PAssertPTHREAD(pthread_getschedparam, (m_threadId, &policy, ¶ms));
795
796 switch (policy)
797 {
798 case SCHED_OTHER:
799 break;
800
801 case SCHED_FIFO:
802 case SCHED_RR:
803 return params.sched_priority > sched_get_priority_min(policy) ? HighestPriority : HighPriority;
804
805 #ifdef SCHED_BATCH
806 case SCHED_BATCH :
807 return LowPriority;
808 #endif
809
810 default:
811 /* Unknown scheduler. We don't know what priority this thread has. */
812 PTRACE(1, "PTLib\tPThread::GetPriority: unknown scheduling policy #" << policy);
813 }
814 #endif
815
816 return NormalPriority; /* as good a guess as any */
817 }
818
819
820 #ifndef P_HAS_SEMAPHORES
821 void PThread::PXSetWaitingSemaphore(PSemaphore * sem)
822 {
823 PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
824 PX_waitingSemaphore = sem;
825 PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
826 }
827 #endif
828
829
830 #ifdef P_GNU_PTH
831 // GNU PTH threads version (used by NetBSD)
832 // Taken from NetBSD pkg patches
833 void PThread::Sleep(const PTimeInterval & timeout)
834 {
835 PTime lastTime;
836 PTime targetTime = PTime() + timeout;
837
838 sched_yield();
839 lastTime = PTime();
840
841 while (lastTime < targetTime) {
842 P_timeval tval = targetTime - lastTime;
843 if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
844 break;
845
846 pthread_testcancel();
847
848 lastTime = PTime();
849 }
850 }
851
852 #else
853 // Normal Posix threads version
854 void PThread::Sleep(const PTimeInterval & timeout)
855 {
856 PTime lastTime;
857 PTime targetTime = lastTime + timeout;
858 do {
859 P_timeval tval = targetTime - lastTime;
860 if (select(0, NULL, NULL, NULL, tval) < 0 && errno != EINTR)
861 break;
862
863 #if !( defined(P_NETBSD) && defined(P_NO_CANCEL) )
864 pthread_testcancel();
865 #endif
866
867 lastTime = PTime();
868 } while (lastTime < targetTime);
869 }
870 #endif
871
872 void PThread::Yield()
873 {
874 sched_yield();
875 }
876
877
878 //
879 // Terminate the specified thread
880 //
881 void PThread::Terminate()
882 {
883 // if thread was not created by PTLib, then don't terminate it
884 if (m_originalStackSize <= 0)
885 return;
886
887 // if thread calls Terminate on itself, then do it
888 // don't use PThread::Current, as the thread may already not be in the
889 // active threads list
890 if (m_threadId == pthread_self()) {
891 pthread_exit(0);
892 return; // keeps compiler happy
893 }
894
895 // if the thread is already terminated, then nothing to do
896 if (IsTerminated())
897 return;
898
899 // otherwise force thread to die
900 PTRACE(2, "PTLib\tForcing termination of thread " << (void *)this);
901
902 PXAbortBlock();
903 WaitForTermination(20);
904
905 #if !defined(P_HAS_SEMAPHORES) && !defined(P_HAS_NAMED_SEMAPHORES)
906 PAssertPTHREAD(pthread_mutex_lock, (&PX_WaitSemMutex));
907 if (PX_waitingSemaphore != NULL) {
908 PAssertPTHREAD(pthread_mutex_lock, (&PX_waitingSemaphore->mutex));
909 PX_waitingSemaphore->queuedLocks--;
910 PAssertPTHREAD(pthread_mutex_unlock, (&PX_waitingSemaphore->mutex));
911 PX_waitingSemaphore = NULL;
912 }
913 PAssertPTHREAD(pthread_mutex_unlock, (&PX_WaitSemMutex));
914 #endif
915
916 #if ( defined(P_NETBSD) && defined(P_NO_CANCEL) )
917 PPThreadKill(m_threadId, SIGKILL);
918 #else
919 if (m_threadId) {
920 pthread_cancel(m_threadId);
921 }
922 #endif
923 }
924
925
926 PBoolean PThread::IsTerminated() const
927 {
928 if (m_isProcess)
929 return false; // Process is always still running
930
931 // See if thread is still running
932 pthread_t id = m_threadId;
933 return id == 0 || pthread_kill(id, 0) != 0;
934 }
935
936
937 void PThread::WaitForTermination() const
938 {
939 WaitForTermination(PMaxTimeInterval);
940 }
941
942
943 PBoolean PThread::WaitForTermination(const PTimeInterval & maxWait) const
944 {
945 pthread_t id = m_threadId;
946 if (id == 0 || this == Current()) {
947 PTRACE(2, "WaitForTermination on 0x" << hex << id << dec << " short circuited");
948 return true;
949 }
950
951 PTRACE(6, "WaitForTermination on 0x" << hex << id << dec << " for " << maxWait);
952
953 PXAbortBlock(); // this assist in clean shutdowns on some systems
954
955 PSimpleTimer timeout(maxWait);
956 while (!IsTerminated()) {
957 if (timeout.HasExpired())
958 return false;
959
960 Sleep(10); // sleep for 10ms. This slows down the busy loop removing 100%
961 // CPU usage and also yeilds so other threads can run.
962 }
963
964 PTRACE(6, "WaitForTermination on 0x" << hex << id << dec << " finished");
965 return true;
966 }
967
968
969 #if defined(P_LINUX)
970
971 static inline unsigned long long jiffies_to_msecs(const unsigned long j)
972 {
973 static long sysconf_HZ = sysconf(_SC_CLK_TCK);
974 return (j * 1000LL) / sysconf_HZ;
975 }
976
977
978 static bool LinuxGetTimes(const PString & statFileName,
979 #if PTRACING
980 PString & error,
981 #endif
982 PThread::Times & times)
983 {
984 /* From the man page on the "stat" file
985 Status information about the process. This is used by ps(1). It is defined in /usr/src/linux/fs/proc/array.c.
986 The fields, in order, with their proper scanf(3) format specifiers, are:
987 pid %d The process ID.
988 comm %s The filename of the executable, in parentheses. This is visible
989 whether or not the executable is swapped out.
990 state %c One character from the string "RSDZTW" where R is running, S is
991 sleeping in an interruptible wait, D is waiting in uninterruptible
992 disk sleep, Z is zombie, T is traced or stopped (on a signal), and
993 W is paging.
994 ppid %d The PID of the parent.
995 pgrp %d The process group ID of the process.
996 session %d The session ID of the process.
997 tty_nr %d The tty the process uses.
998 tpgid %d The process group ID of the process which currently owns the tty
999 that the process is connected to.
1000 flags %lu The kernel flags word of the process. For bit meanings, see the
1001 PF_* defines in <linux/sched.h>. Details depend on the kernel
1002 version.
1003 minflt %lu The number of minor faults the process has made which have not
1004 required loading a memory page from disk.
1005 cminflt %lu The number of minor faults that the process's waited-for children
1006 have made.
1007 majflt %lu The number of major faults the process has made which have required
1008 loading a memory page from disk.
1009 cmajflt %lu The number of major faults that the process's waited-for children
1010 have made.
1011 utime %lu The number of jiffies that this process has been scheduled in user
1012 mode.
1013 stime %lu The number of jiffies that this process has been scheduled in kernel
1014 mode.
1015 cutime %ld The number of jiffies that this process's waited-for children have
1016 been scheduled in user mode. (See also times(2).)
1017 cstime %ld The number of jiffies that this process's waited-for children have
1018 been scheduled in kernel mode.
1019 priority %ld The standard nice value, plus fifteen. The value is never negative
1020 in the kernel.
1021 nice %ld The nice value ranges from 19 (nicest) to -19 (not nice to others).
1022 num_threads %ld Number of threads.
1023 itrealvalue %ld The time in jiffies before the next SIGALRM is sent to the process
1024 due to an interval timer.
1025 starttime %lu The time in jiffies the process started after system boot.
1026 vsize %lu Virtual memory size in bytes.
1027 rss %ld Resident Set Size: number of pages the process has in real memory,
1028 minus 3 for administrative purposes. This is just the pages which
1029 count towards text, data, or stack space. This does not include
1030 pages which have not been demand-loaded in, or which are swapped out.
1031 rlim %lu Current limit in bytes on the rss of the process
1032 (usually 4294967295 on i386).
1033 startcode %lu The address above which program text can run.
1034 endcode %lu The address below which program text can run.
1035 startstack %lu The address of the start of the stack.
1036 kstkesp %lu The current value of esp (stack pointer), as found in the kernel
1037 stack page for the process.
1038 kstkeip %lu The current EIP (instruction pointer).
1039 signal %lu The bitmap of pending signals.
1040 blocked %lu The bitmap of blocked signals.
1041 sigignore %lu The bitmap of ignored signals.
1042 sigcatch %lu The bitmap of caught signals.
1043 wchan %lu This is the "channel" in which the process is waiting. It is the
1044 address of a system call, and can be looked up in a namelist if you
1045 need a textual name. (If you have an up-to-date /etc/psdatabase, then
1046 try ps -l to see the WCHAN field in action.)
1047 nswap %lu Number of pages swapped (not maintained).
1048 cnswap %lu Cumulative nswap for child processes (not maintained).
1049 exit_signal %d Signal to be sent to parent when we die.
1050 processor %d CPU number last executed on.
1051 rt_priority %lu (since kernel 2.5.19) Real-time scheduling priority (see sched_setscheduler(2)).
1052 policy %lu (since kernel 2.5.19) Scheduling policy (see sched_setscheduler(2)).
1053 delayacct_blkio_ticks %llu (since Linux 2.6.18) Aggregated block I/O delays, measured in
1054 clock ticks (centiseconds).
1055 */
1056
1057 PTextFile statfile(statFileName, PFile::ReadOnly);
1058 if (!statfile.IsOpen()) {
1059 PTRACE_PARAM(error = "Could not find thread stat file");
1060 return false;
1061 }
1062
1063 char line[1000];
1064 statfile.getline(line, sizeof(line));
1065 if (!statfile.good()) {
1066 PTRACE_PARAM(error = "Could not read thread stat file");
1067 return false;
1068 }
1069
1070 int pid;
1071 char comm[100];
1072 char state;
1073 int ppid, pgrp, session, tty_nr, tpgid;
1074 unsigned long flags, minflt, cminflt, majflt, cmajflt, utime, stime;
1075 long cutime, cstime, priority, nice, num_threads, itrealvalue;
1076 unsigned long starttime, vsize;
1077 long rss;
1078 unsigned long rlim, startcode, endcode, startstack, kstkesp, kstkeip, signal, blocked, sigignore, sigcatch, wchan, nswap, cnswap;
1079 int exit_signal, processor;
1080 unsigned long rt_priority, policy;
1081 unsigned long long delayacct_blkio_ticks;
1082
1083 // 17698 (maxmcu) R 1 17033 8586 34833 17467 4202560 7
1084 // 0 0 0 0 0 0 0 -100 0 16
1085 // 0 55172504 258756608 6741 4294967295 134512640 137352760 3217892976 8185700 15991824
1086 // 0 0 4 201349635 0 0 0 -1 7 99
1087 // 2 0
1088
1089 int count = sscanf(line,
1090 "%d%s %c%d%d%d%d%d%lu%lu"
1091 "%lu%lu%lu%lu%lu%ld%ld%ld%ld%ld"
1092 "%ld%lu%lu%ld%lu%lu%lu%lu%lu%lu"
1093 "%lu%lu%lu%lu%lu%lu%lu%d%d%lu"
1094 "%lu%llu",
1095 &pid, comm, &state, &ppid, &pgrp, &session, &tty_nr, &tpgid, &flags, &minflt,
1096 &cminflt, &majflt, &cmajflt, &utime, &stime, &cutime, &cstime, &priority, &nice, &num_threads,
1097 &itrealvalue, &starttime, &vsize, &rss, &rlim, &startcode, &endcode, &startstack, &kstkesp, &kstkeip,
1098 &signal, &blocked, &sigignore, &sigcatch, &wchan, &nswap, &cnswap, &exit_signal, &processor, &rt_priority,
1099 &policy, &delayacct_blkio_ticks);
1100 if (count != 42) {
1101 PTRACE_PARAM(error = psprintf("Not enough values (%d)\n%s", count, line));
1102 return false;
1103 }
1104
1105 times.m_kernel = jiffies_to_msecs(stime);
1106 times.m_user = jiffies_to_msecs(utime);
1107 return true;
1108 }
1109
1110
1111 bool PThread::GetTimes(Times & times)
1112 {
1113 PStringStream statFileName;
1114 statFileName << "/proc/" << getpid() << "/task/" << PX_linuxId << "/stat";
1115
1116 PTRACE_PARAM(PString error);
1117
1118 for (int retry = 0; retry < 3; ++retry) {
1119 if (LinuxGetTimes(statFileName,
1120 #if PTRACING
1121 error,
1122 #endif
1123 times)) {
1124 if (PX_endTick != 0)
1125 times.m_real = PX_endTick - PX_startTick;
1126 else
1127 times.m_real = PTimer::Tick() - PX_startTick;
1128 return true;
1129 }
1130 }
1131
1132 PTRACE(2, "PTLib\tError reading " << statFileName << ", " << error);
1133 return false;
1134 }
1135 #else
1136 bool PThread::GetTimes(Times & times)
1137 {
1138 return false;
1139 }
1140 #endif
1141
1142
1143 int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
1144 {
1145 PTRACE(7, "PTLib\tPThread::PXBlockOnIO(" << handle << ',' << type << ')');
1146
1147 if ((handle < 0) || (handle >= PProcess::Current().GetMaxHandles())) {
1148 PTRACE(2, "PTLib\tAttempt to use illegal handle in PThread::PXBlockOnIO, handle=" << handle);
1149 errno = EBADF;
1150 return -1;
1151 }
1152
1153 // make sure we flush the buffer before doing a write
1154 P_fd_set read_fds;
1155 P_fd_set write_fds;
1156 P_fd_set exception_fds;
1157
1158 int retval;
1159 do {
1160 switch (type) {
1161 case PChannel::PXReadBlock:
1162 case PChannel::PXAcceptBlock:
1163 read_fds = handle;
1164 write_fds.Zero();
1165 exception_fds.Zero();
1166 break;
1167 case PChannel::PXWriteBlock:
1168 read_fds.Zero();
1169 write_fds = handle;
1170 exception_fds.Zero();
1171 break;
1172 case PChannel::PXConnectBlock:
1173 read_fds.Zero();
1174 write_fds = handle;
1175 exception_fds = handle;
1176 break;
1177 default:
1178 PAssertAlways(PLogicError);
1179 return 0;
1180 }
1181
1182 // include the termination pipe into all blocking I/O functions
1183 read_fds += unblockPipe[0];
1184
1185 P_timeval tval = timeout;
1186 retval = ::select(PMAX(handle, unblockPipe[0])+1,
1187 read_fds, write_fds, exception_fds, tval);
1188 } while (retval < 0 && errno == EINTR);
1189
1190 if ((retval == 1) && read_fds.IsPresent(unblockPipe[0])) {
1191 BYTE ch;
1192 PAssertOS(::read(unblockPipe[0], &ch, 1) != -1);
1193 errno = EINTR;
1194 retval = -1;
1195 PTRACE(6, "PTLib\tUnblocked I/O fd=" << unblockPipe[0]);
1196 }
1197
1198 return retval;
1199 }
1200
1201 void PThread::PXAbortBlock() const
1202 {
1203 static BYTE ch = 0;
1204 PAssertOS(::write(unblockPipe[1], &ch, 1) != -1);
1205 PTRACE(6, "PTLib\tUnblocking I/O fd=" << unblockPipe[0] << " thread=" << GetThreadName());
1206 }
1207
1208
1209 ///////////////////////////////////////////////////////////////////////////////
1210
1211 PSemaphore::PSemaphore(PXClass pxc)
1212 {
1213 pxClass = pxc;
1214
1215 // these should never be used, as this constructor is
1216 // only used for PMutex and PSyncPoint and they have their
1217 // own copy constructors
1218
1219 initialVar = maxCountVar = 0;
1220
1221 if(pxClass == PXSemaphore) {
1222 #if defined(P_HAS_SEMAPHORES)
1223 /* call sem_init, otherwise sem_destroy fails*/
1224 PAssertPTHREAD(sem_init, (&semId, 0, 0));
1225 #elif defined(P_HAS_NAMED_SEMAPHORES)
1226 semId = CreateSem(0);
1227 #else
1228 currentCount = maximumCount = 0;
1229 queuedLocks = 0;
1230 pthread_mutex_init(&mutex, NULL);
1231 pthread_cond_init(&condVar, NULL);
1232 #endif
1233 }
1234 }
1235
1236
1237 PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
1238 {
1239 pxClass = PXSemaphore;
1240
1241 initialVar = initial;
1242 maxCountVar = maxCount;
1243
1244 #if defined(P_HAS_SEMAPHORES)
1245 PAssertPTHREAD(sem_init, (&semId, 0, initial));
1246 #elif defined(P_HAS_NAMED_SEMAPHORES)
1247 semId = CreateSem(initialVar);
1248 #else
1249 PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
1250 PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
1251
1252 PAssert(maxCount > 0, "Invalid semaphore maximum.");
1253 if (initial > maxCount)
1254 initial = maxCount;
1255
1256 currentCount = initial;
1257 maximumCount = maxCount;
1258 queuedLocks = 0;
1259 #endif
1260 }
1261
1262
1263 PSemaphore::PSemaphore(const PSemaphore & sem)
1264 {
1265 pxClass = sem.GetSemClass();
1266
1267 initialVar = sem.GetInitial();
1268 maxCountVar = sem.GetMaxCount();
1269
1270 if(pxClass == PXSemaphore) {
1271 #if defined(P_HAS_SEMAPHORES)
1272 PAssertPTHREAD(sem_init, (&semId, 0, initialVar));
1273 #elif defined(P_HAS_NAMED_SEMAPHORES)
1274 semId = CreateSem(initialVar);
1275 #else
1276 PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
1277 PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
1278
1279 PAssert(maxCountVar > 0, "Invalid semaphore maximum.");
1280 if (initialVar > maxCountVar)
1281 initialVar = maxCountVar;
1282
1283 currentCount = initialVar;
1284 maximumCount = maxCountVar;
1285 queuedLocks = 0;
1286 #endif
1287 }
1288 }
1289
1290 PSemaphore::~PSemaphore()
1291 {
1292 if(pxClass == PXSemaphore) {
1293 #if defined(P_HAS_SEMAPHORES)
1294 PAssertPTHREAD(sem_destroy, (&semId));
1295 #elif defined(P_HAS_NAMED_SEMAPHORES)
1296 PAssertPTHREAD(sem_close, (semId));
1297 #else
1298 PAssert(queuedLocks == 0, "Semaphore destroyed with queued locks");
1299 PAssertPTHREAD(pthread_mutex_destroy, (&mutex));
1300 PAssertPTHREAD(pthread_cond_destroy, (&condVar));
1301 #endif
1302 }
1303 }
1304
1305 #if defined(P_HAS_NAMED_SEMAPHORES)
1306 sem_t * PSemaphore::CreateSem(unsigned initialValue)
1307 {
1308 sem_t *sem;
1309
1310 // Since sem_open and sem_unlink are two operations, there is a small
1311 // window of opportunity that two simultaneous accesses may return
1312 // the same semaphore. Therefore, the static mutex is used to
1313 // prevent this.
1314 static pthread_mutex_t semCreationMutex = PTHREAD_MUTEX_INITIALIZER;
1315 PAssertPTHREAD(pthread_mutex_lock, (&semCreationMutex));
1316
1317 sem_unlink("/ptlib_sem");
1318 sem = sem_open("/ptlib_sem", (O_CREAT | O_EXCL), 700, initialValue);
1319
1320 PAssertPTHREAD(pthread_mutex_unlock, (&semCreationMutex));
1321
1322 PAssert(sem != SEM_FAILED, "Couldn't create named semaphore");
1323 return sem;
1324 }
1325 #endif
1326
1327 void PSemaphore::Wait()
1328 {
1329 #if defined(P_HAS_SEMAPHORES)
1330 PAssertPTHREAD(sem_wait, (&semId));
1331 #elif defined(P_HAS_NAMED_SEMAPHORES)
1332 PAssertPTHREAD(sem_wait, (semId));
1333 #else
1334 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1335
1336 queuedLocks++;
1337 PThread::Current()->PXSetWaitingSemaphore(this);
1338
1339 while (currentCount == 0) {
1340 int err = pthread_cond_wait(&condVar, &mutex);
1341 PAssert(err == 0 || err == EINTR, psprintf("wait error = %i", err));
1342 }
1343
1344 PThread::Current()->PXSetWaitingSemaphore(NULL);
1345 queuedLocks--;
1346
1347 currentCount--;
1348
1349 PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
1350 #endif
1351 }
1352
1353
1354 PBoolean PSemaphore::Wait(const PTimeInterval & waitTime)
1355 {
1356 if (waitTime == PMaxTimeInterval) {
1357 Wait();
1358 return true;
1359 }
1360
1361 // create absolute finish time
1362 PTime finishTime;
1363 finishTime += waitTime;
1364
1365 #if defined(P_HAS_SEMAPHORES)
1366 #ifdef P_HAS_SEMAPHORES_XPG6
1367 // use proper timed spinlocks if supported.
1368 // http://www.opengroup.org/onlinepubs/007904975/functions/sem_timedwait.html
1369
1370 struct timespec absTime;
1371 absTime.tv_sec = finishTime.GetTimeInSeconds();
1372 absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
1373
1374 do {
1375 if (sem_timedwait(&semId, &absTime) == 0)
1376 return true;
1377 } while (errno == EINTR);
1378
1379 PAssert(errno == ETIMEDOUT, strerror(errno));
1380 return false;
1381
1382 #else
1383 // loop until timeout, or semaphore becomes available
1384 // don't use a PTimer, as this causes the housekeeping
1385 // thread to get very busy
1386 do {
1387 if (sem_trywait(&semId) == 0)
1388 return true;
1389
1390 #if defined(P_LINUX)
1391 // sched_yield in a tight loop is bad karma
1392 // for the linux scheduler: http://www.ussg.iu.edu/hypermail/linux/kernel/0312.2/1127.html
1393 PThread::Current()->Sleep(10);
1394 #else
1395 PThread::Yield();
1396 #endif
1397 } while (PTime() < finishTime);
1398
1399 return false;
1400
1401 #endif
1402 #elif defined(P_HAS_NAMED_SEMAPHORES)
1403 do {
1404 if(sem_trywait(semId) == 0)
1405 return true;
1406 PThread::Current()->Sleep(10);
1407 } while (PTime() < finishTime);
1408
1409 return false;
1410 #else
1411
1412 struct timespec absTime;
1413 absTime.tv_sec = finishTime.GetTimeInSeconds();
1414 absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
1415
1416 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1417
1418 PThread * thread = PThread::Current();
1419 thread->PXSetWaitingSemaphore(this);
1420 queuedLocks++;
1421
1422 PBoolean ok = true;
1423 while (currentCount == 0) {
1424 int err = pthread_cond_timedwait(&condVar, &mutex, &absTime);
1425 if (err == ETIMEDOUT) {
1426 ok = false;
1427 break;
1428 }
1429 else
1430 PAssert(err == 0 || err == EINTR, psprintf("timed wait error = %i", err));
1431 }
1432
1433 thread->PXSetWaitingSemaphore(NULL);
1434 queuedLocks--;
1435
1436 if (ok)
1437 currentCount--;
1438
1439 PAssertPTHREAD(pthread_mutex_unlock, ((pthread_mutex_t *)&mutex));
1440
1441 return ok;
1442 #endif
1443 }
1444
1445
1446 void PSemaphore::Signal()
1447 {
1448 #if defined(P_HAS_SEMAPHORES)
1449 PAssertPTHREAD(sem_post, (&semId));
1450 #elif defined(P_HAS_NAMED_SEMAPHORES)
1451 PAssertPTHREAD(sem_post, (semId));
1452 #else
1453 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1454
1455 if (currentCount < maximumCount)
1456 currentCount++;
1457
1458 if (queuedLocks > 0)
1459 PAssertPTHREAD(pthread_cond_signal, (&condVar));
1460
1461 PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
1462 #endif
1463 }
1464
1465
1466 PBoolean PSemaphore::WillBlock() const
1467 {
1468 #if defined(P_HAS_SEMAPHORES)
1469 if (sem_trywait((sem_t *)&semId) != 0) {
1470 PAssertOS(errno == EAGAIN || errno == EINTR);
1471 return true;
1472 }
1473 PAssertPTHREAD(sem_post, ((sem_t *)&semId));
1474 return false;
1475 #elif defined(P_HAS_NAMED_SEMAPHORES)
1476 if (sem_trywait(semId) != 0) {
1477 PAssertOS(errno == EAGAIN || errno == EINTR);
1478 return true;
1479 }
1480 PAssertPTHREAD(sem_post, (semId));
1481 return false;
1482 #else
1483 return currentCount == 0;
1484 #endif
1485 }
1486
1487
1488 ///////////////////////////////////////////////////////////////////////////////
1489
1490 PTimedMutex::PTimedMutex()
1491 {
1492 Construct();
1493 }
1494
1495 PTimedMutex::PTimedMutex(const PTimedMutex &)
1496 {
1497 Construct();
1498 }
1499
1500
1501 void PTimedMutex::Construct()
1502 {
1503 #if P_HAS_RECURSIVE_MUTEX
1504
1505 pthread_mutexattr_t attr;
1506 PAssertPTHREAD(pthread_mutexattr_init, (&attr));
1507
1508 #if (P_HAS_RECURSIVE_MUTEX == 2)
1509 PAssertPTHREAD(pthread_mutexattr_settype, (&attr, PTHREAD_MUTEX_RECURSIVE));
1510 #else
1511 PAssertPTHREAD(pthread_mutexattr_settype, (&attr, PTHREAD_MUTEX_RECURSIVE_NP));
1512 #endif
1513
1514 PAssertPTHREAD(pthread_mutex_init, (&m_mutex, &attr));
1515 PAssertPTHREAD(pthread_mutexattr_destroy, (&attr));
1516
1517 #else // P_HAS_RECURSIVE_MUTEX
1518
1519 m_lockerId = (pthread_id)-1;
1520 PAssertPTHREAD(pthread_mutex_init, (&m_mutex, NULL));
1521
1522 #endif // P_HAS_RECURSIVE_MUTEX
1523 }
1524
1525
1526 PTimedMutex::~PTimedMutex()
1527 {
1528 int result = pthread_mutex_destroy(&m_mutex);
1529 if (result == EBUSY) {
1530 // In case it is us
1531 while (pthread_mutex_unlock(&m_mutex) == 0)
1532 ;
1533
1534 // Wait a bit for someone else to unlock it
1535 for (PINDEX i = 0; i < 100; ++i) {
1536 if ((result = pthread_mutex_destroy(&m_mutex)) != EBUSY)
1537 break;
1538 usleep(100);
1539 }
1540 }
1541
1542 #ifdef _DEBUG
1543 PAssert(result == 0, "Error destroying mutex");
1544 #endif
1545 }
1546
1547
1548 void PTimedMutex::Wait()
1549 {
1550 #if P_HAS_RECURSIVE_MUTEX
1551
1552 PAssertPTHREAD(pthread_mutex_lock, (&m_mutex));
1553
1554 #else //P_HAS_RECURSIVE_MUTEX
1555
1556 pthread_t currentThreadId = pthread_self();
1557
1558 // if the mutex is already acquired by this thread,
1559 // then just increment the lock count
1560 if (pthread_equal(m_lockerId, currentThreadId)) {
1561 // Note this does not need a lock as it can only be touched by the thread
1562 // which already has the mutex locked.
1563 ++m_lockCount;
1564 return;
1565 }
1566
1567 // acquire the lock for real
1568 PAssertPTHREAD(pthread_mutex_lock, (&m_mutex));
1569
1570 PAssert(m_lockerId == (pthread_t)-1 && m_lockCount.IsZero(),
1571 "PMutex acquired whilst locked by another thread");
1572
1573 // Note this is protected by the mutex itself only the thread with
1574 // the lock can alter it.
1575 m_lockerId = currentThreadId;
1576
1577 #endif // P_HAS_RECURSIVE_MUTEX
1578 }
1579
1580
1581 PBoolean PTimedMutex::Wait(const PTimeInterval & waitTime)
1582 {
1583 // if waiting indefinitely, then do so
1584 if (waitTime == PMaxTimeInterval) {
1585 Wait();
1586 return true;
1587 }
1588
1589 #if !P_HAS_RECURSIVE_MUTEX
1590 pthread_t currentThreadId = pthread_self();
1591
1592 // if we already have the mutex, return immediately
1593 if (pthread_equal(m_lockerId, currentThreadId)) {
1594 // Note this does not need a lock as it can only be touched by the thread
1595 // which already has the mutex locked.
1596 ++m_lockCount;
1597 return true;
1598 }
1599 #endif
1600
1601 // create absolute finish time
1602 PTime finishTime;
1603 finishTime += waitTime;
1604
1605 #if P_PTHREADS_XPG6
1606
1607 struct timespec absTime;
1608 absTime.tv_sec = finishTime.GetTimeInSeconds();
1609 absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
1610
1611 if (pthread_mutex_timedlock(&m_mutex, &absTime) != 0)
1612 return false;
1613
1614 #else // P_PTHREADS_XPG6
1615
1616 while (pthread_mutex_trylock(&m_mutex) != 0) {
1617 if (PTime() >= finishTime)
1618 return false;
1619 usleep(10000);
1620 }
1621
1622 #endif // P_PTHREADS_XPG6
1623
1624 #if !P_HAS_RECURSIVE_MUTEX
1625 PAssert((lockerId == (pthread_t)-1) && m_lockCount.IsZero(),
1626 "PMutex acquired whilst locked by another thread");
1627
1628 // Note this is protected by the mutex itself only the thread with
1629 // the lock can alter it.
1630 m_lockerId = currentThreadId;
1631 #endif
1632
1633 return true;
1634 }
1635
1636
1637 void PTimedMutex::Signal()
1638 {
1639 #if !P_HAS_RECURSIVE_MUTEX
1640 if (!pthread_equal(m_lockerId, pthread_self())) {
1641 PAssertAlways("PMutex signal failed - no matching wait or signal by wrong thread");
1642 return;
1643 }
1644
1645 // if lock was recursively acquired, then decrement the counter
1646 // Note this does not need a separate lock as it can only be touched by the thread
1647 // which already has the mutex locked.
1648 if (!m_lockCount.IsZero()) {
1649 --m_lockCount;
1650 return;
1651 }
1652
1653 // otherwise mark mutex as available
1654 m_lockerId = (pthread_t)-1;
1655
1656 #endif
1657
1658 PAssertPTHREAD(pthread_mutex_unlock, (&m_mutex));
1659 }
1660
1661
1662 PBoolean PTimedMutex::WillBlock() const
1663 {
1664 #if !P_HAS_RECURSIVE_MUTEX
1665 if (pthread_equal(m_lockerId, pthread_self()))
1666 return false;
1667 #endif
1668
1669 if (pthread_mutex_trylock(&m_mutex) != 0)
1670 return true;
1671
1672 PAssertPTHREAD(pthread_mutex_unlock, (&m_mutex));
1673 return false;
1674 }
1675
1676
1677 ///////////////////////////////////////////////////////////////////////////////
1678
1679 PSyncPoint::PSyncPoint()
1680 : PSemaphore(PXSyncPoint)
1681 {
1682 PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
1683 PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
1684 signalled = false;
1685 }
1686
1687 PSyncPoint::PSyncPoint(const PSyncPoint &)
1688 : PSemaphore(PXSyncPoint)
1689 {
1690 PAssertPTHREAD(pthread_mutex_init, (&mutex, NULL));
1691 PAssertPTHREAD(pthread_cond_init, (&condVar, NULL));
1692 signalled = false;
1693 }
1694
1695 PSyncPoint::~PSyncPoint()
1696 {
1697 PAssertPTHREAD(pthread_mutex_destroy, (&mutex));
1698 PAssertPTHREAD(pthread_cond_destroy, (&condVar));
1699 }
1700
1701 void PSyncPoint::Wait()
1702 {
1703 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1704 while (!signalled)
1705 pthread_cond_wait(&condVar, &mutex);
1706 signalled = false;
1707 PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
1708 }
1709
1710
1711 PBoolean PSyncPoint::Wait(const PTimeInterval & waitTime)
1712 {
1713 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1714
1715 PTime finishTime;
1716 finishTime += waitTime;
1717 struct timespec absTime;
1718 absTime.tv_sec = finishTime.GetTimeInSeconds();
1719 absTime.tv_nsec = finishTime.GetMicrosecond() * 1000;
1720
1721 int err = 0;
1722 while (!signalled) {
1723 err = pthread_cond_timedwait(&condVar, &mutex, &absTime);
1724 if (err == 0 || err == ETIMEDOUT)
1725 break;
1726
1727 PAssertOS(err == EINTR && errno == EINTR);
1728 }
1729
1730 if (err == 0)
1731 signalled = false;
1732
1733 PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
1734
1735 return err == 0;
1736 }
1737
1738
1739 void PSyncPoint::Signal()
1740 {
1741 PAssertPTHREAD(pthread_mutex_lock, (&mutex));
1742 signalled = true;
1743 PAssertPTHREAD(pthread_cond_signal, (&condVar));
1744 PAssertPTHREAD(pthread_mutex_unlock, (&mutex));
1745 }
1746
1747
1748 PBoolean PSyncPoint::WillBlock() const
1749 {
1750 return !signalled;
1751 }
1752
1753
1754