1 /*
2  *  Copyright 2005-2014 Fabrice Colin
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 2 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17  */
18 
19 #include <sys/types.h>
20 #include <dirent.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <fcntl.h>
26 #include <string.h>
27 #include <signal.h>
28 #include <time.h>
29 #include <errno.h>
30 #ifdef __OpenBSD__
31 #include <sys/param.h>
32 #include <sys/sysctl.h>
33 #endif
34 #include <exception>
35 #include <iostream>
36 #include <fstream>
37 #include <algorithm>
38 #include <glibmm/convert.h>
39 #include <glibmm/exception.h>
40 #include <glibmm/miscutils.h>
41 #include <glibmm/stringutils.h>
42 
43 #include "config.h"
44 #include "NLS.h"
45 #include "Memory.h"
46 #include "Url.h"
47 #include "WorkerThread.h"
48 
49 using namespace std;
50 using namespace Glib;
51 
52 // A function object to stop threads with for_each()
53 struct StopThreadFunc
54 {
55 public:
operator ()StopThreadFunc56 	void operator()(map<unsigned int, WorkerThread *>::value_type &p)
57 	{
58 		p.second->stop();
59 #ifdef DEBUG
60 		clog << "StopThreadFunc: stopped thread " << p.second->getId() << endl;
61 #endif
62 		Thread::yield();
63 	}
64 };
65 
66 Dispatcher WorkerThread::m_dispatcher;
67 pthread_mutex_t WorkerThread::m_dispatcherMutex = PTHREAD_MUTEX_INITIALIZER;
68 bool WorkerThread::m_immediateFlush = true;
69 
errorToString(int errorNum)70 string WorkerThread::errorToString(int errorNum)
71 {
72 	if (errorNum == 0)
73 	{
74 		return "";
75 	}
76 
77 	if (errorNum < INDEX_ERROR)
78 	{
79 		ustring errorText(Glib::strerror(errorNum));
80 
81 		return errorText.c_str();
82 	}
83 
84 	// Internal error codes
85 	switch (errorNum)
86 	{
87 		case INDEX_ERROR:
88 			return _("Index error");
89 		case INDEXING_FAILED:
90 			return _("Couldn't index document");
91 		case UPDATE_FAILED:
92 			return _("Couldn't update document");
93 		case UNINDEXING_FAILED:
94 			return _("Couldn't unindex document(s)");
95 		case QUERY_FAILED:
96 			return _("Couldn't run query on search engine");
97 		case HISTORY_FAILED:
98 			return _("Couldn't get history for search engine");
99 		case DOWNLOAD_FAILED:
100 			return _("Couldn't retrieve document");
101 		case MONITORING_FAILED:
102 			return _("File monitor error");
103 		case OPENDIR_FAILED:
104 			return _("Couldn't open directory");
105 		case UNKNOWN_INDEX:
106 			return _("Index doesn't exist");
107 		case UNKNOWN_ENGINE:
108 			return  _("Couldn't create search engine");
109 		case UNSUPPORTED_TYPE:
110 			return _("Cannot index document type");
111 		case UNSUPPORTED_PROTOCOL:
112 			return _("No downloader for this protocol");
113 		case ROBOTS_FORBIDDEN:
114 			return _("Robots META tag forbids indexing");
115 		case NO_MONITORING:
116 			return _("No monitoring handler");
117 		default:
118 			break;
119 	}
120 
121 	return _("Unknown error");
122 }
123 
getDispatcher(void)124 Dispatcher &WorkerThread::getDispatcher(void)
125 {
126 	return m_dispatcher;
127 }
128 
immediateFlush(bool doFlush)129 void WorkerThread::immediateFlush(bool doFlush)
130 {
131 	m_immediateFlush = doFlush;
132 }
133 
WorkerThread()134 WorkerThread::WorkerThread() :
135 	m_startTime(time(NULL)),
136 	m_id(ThreadsManager::get_next_id()),
137 	m_background(false),
138 	m_stopped(false),
139 	m_done(false),
140 	m_errorNum(0)
141 {
142 }
143 
~WorkerThread()144 WorkerThread::~WorkerThread()
145 {
146 }
147 
getStartTime(void) const148 time_t WorkerThread::getStartTime(void) const
149 {
150 	return m_startTime;
151 }
152 
setId(unsigned int id)153 void WorkerThread::setId(unsigned int id)
154 {
155 	m_id = id;
156 }
157 
getId(void) const158 unsigned int WorkerThread::getId(void) const
159 {
160 	return m_id;
161 }
162 
inBackground(void)163 void WorkerThread::inBackground(void)
164 {
165 	m_background = true;
166 }
167 
isBackground(void) const168 bool WorkerThread::isBackground(void) const
169 {
170 	return m_background;
171 }
172 
operator <(const WorkerThread & other) const173 bool WorkerThread::operator<(const WorkerThread &other) const
174 {
175 	return m_id < other.m_id;
176 }
177 
start(void)178 Glib::Thread *WorkerThread::start(void)
179 {
180 #ifdef DEBUG
181 	clog << "WorkerThread::start: " << getType() << " " << m_id << endl;
182 #endif
183 	// Create non-joinable threads
184 	return Thread::create(sigc::mem_fun(*this, &WorkerThread::threadHandler), false);
185 }
186 
stop(void)187 void WorkerThread::stop(void)
188 {
189 	m_stopped = m_done = true;
190 }
191 
isStopped(void) const192 bool WorkerThread::isStopped(void) const
193 {
194 	return m_stopped;
195 }
196 
isDone(void) const197 bool WorkerThread::isDone(void) const
198 {
199 	return m_done;
200 }
201 
getErrorNum(void) const202 int WorkerThread::getErrorNum(void) const
203 {
204 	return m_errorNum;
205 }
206 
getStatus(void) const207 string WorkerThread::getStatus(void) const
208 {
209 	string status(errorToString(m_errorNum));
210 
211 	if ((status.empty() == false) &&
212 		(m_errorParam.empty() == false))
213 	{
214 		status += " (";
215 		status += m_errorParam;
216 		status += ")";
217 	}
218 
219 	return status;
220 }
221 
threadHandler(void)222 void WorkerThread::threadHandler(void)
223 {
224 #ifdef DEBUG
225 	clog << "WorkerThread::threadHandler: thread " << m_id << endl;
226 #endif
227 	try
228 	{
229 		doWork();
230 	}
231 	catch (Glib::Exception &ex)
232 	{
233 		clog << "Glib exception in thread " << m_id << ", type " << getType()
234 			<< ":" << ex.what() << endl;
235 		m_errorNum = UNKNOWN_ERROR;
236 	}
237 	catch (std::exception &ex)
238 	{
239 		clog << "STL exception in thread " << m_id << ", type " << getType()
240 			<< ":" << ex.what() << endl;
241 		m_errorNum = UNKNOWN_ERROR;
242 	}
243 	catch (...)
244 	{
245 		clog << "Unknown exception in thread " << m_id << ", type " << getType() << endl;
246 		m_errorNum = UNKNOWN_ERROR;
247 	}
248 
249 	emitSignal();
250 }
251 
emitSignal(void)252 void WorkerThread::emitSignal(void)
253 {
254 	m_done = true;
255 	if (pthread_mutex_lock(&m_dispatcherMutex) == 0)
256 	{
257 #ifdef DEBUG
258 		clog << "WorkerThread::emitSignal: signaling end of thread " << m_id << endl;
259 #endif
260 		m_dispatcher();
261 
262 		pthread_mutex_unlock(&m_dispatcherMutex);
263 	}
264 }
265 
266 unsigned int ThreadsManager::m_nextThreadId = 1;
267 
ThreadsManager(const string & defaultIndexLocation,unsigned int maxThreadsTime)268 ThreadsManager::ThreadsManager(const string &defaultIndexLocation,
269 	unsigned int maxThreadsTime) :
270 	m_mustQuit(false),
271 	m_defaultIndexLocation(defaultIndexLocation),
272 	m_maxIndexThreads(1),
273 	m_backgroundThreadsCount(0),
274 	m_foregroundThreadsMaxTime(maxThreadsTime),
275 	m_numCPUs(1)
276 {
277 	pthread_rwlock_init(&m_threadsLock, NULL);
278 	pthread_rwlock_init(&m_listsLock, NULL);
279 
280 	// Override the number of indexing threads ?
281 	char *pEnvVar = getenv("PINOT_MAXIMUM_INDEX_THREADS");
282 	if ((pEnvVar != NULL) &&
283 		(strlen(pEnvVar) > 0))
284 	{
285 		int threadsNum = atoi(pEnvVar);
286 
287 		if (threadsNum > 0)
288 		{
289 			m_maxIndexThreads = (unsigned int)threadsNum;
290 		}
291 	}
292 
293 #ifdef __OpenBSD__
294 	int mib[2], ncpus;
295 
296 	mib[0] = CTL_HW;
297 	mib[1] = HW_NCPU;
298 	size_t len = sizeof(ncpus);
299 	if (sysctl(mib, 2, &ncpus, &len, NULL, 0) > 0)
300 	{
301 		m_numCPUs = ncpus;
302 	}
303 #else
304 #ifdef HAVE_SYSCONF
305 	m_numCPUs = sysconf(_SC_NPROCESSORS_ONLN);
306 #endif
307 #endif
308 }
309 
~ThreadsManager()310 ThreadsManager::~ThreadsManager()
311 {
312 	stop_threads();
313 	// Destroy the read/write locks
314 	pthread_rwlock_destroy(&m_listsLock);
315 	pthread_rwlock_destroy(&m_threadsLock);
316 }
317 
read_lock_threads(void)318 bool ThreadsManager::read_lock_threads(void)
319 {
320 	if (pthread_rwlock_rdlock(&m_threadsLock) == 0)
321 	{
322 		return true;
323 	}
324 
325 	return false;
326 }
327 
write_lock_threads(void)328 bool ThreadsManager::write_lock_threads(void)
329 {
330 	if (pthread_rwlock_wrlock(&m_threadsLock) == 0)
331 	{
332 		return true;
333 	}
334 
335 	return false;
336 }
337 
unlock_threads(void)338 void ThreadsManager::unlock_threads(void)
339 {
340 	pthread_rwlock_unlock(&m_threadsLock);
341 }
342 
read_lock_lists(void)343 bool ThreadsManager::read_lock_lists(void)
344 {
345 	if (pthread_rwlock_rdlock(&m_listsLock) == 0)
346 	{
347 		return true;
348 	}
349 
350 	return false;
351 }
352 
write_lock_lists(void)353 bool ThreadsManager::write_lock_lists(void)
354 {
355 	if (pthread_rwlock_wrlock(&m_listsLock) == 0)
356 	{
357 		return true;
358 	}
359 
360 	return false;
361 }
362 
unlock_lists(void)363 void ThreadsManager::unlock_lists(void)
364 {
365 	pthread_rwlock_unlock(&m_listsLock);
366 }
367 
get_thread(void)368 WorkerThread *ThreadsManager::get_thread(void)
369 {
370 	time_t timeNow = time(NULL);
371 	WorkerThread *pWorkerThread = NULL;
372 
373 	// Get the first thread that's finished
374 	if (write_lock_threads() == true)
375 	{
376 		for (map<unsigned int, WorkerThread *>::iterator threadIter = m_threads.begin();
377 			threadIter != m_threads.end(); ++threadIter)
378 		{
379 			unsigned int threadId = threadIter->first;
380 
381 			if (threadIter->second->isDone() == false)
382 			{
383 #ifdef DEBUG
384 				clog << "ThreadsManager::get_thread: thread "
385 					<< threadId << " is not done" << endl;
386 #endif
387 
388 				// Foreground threads ought not to run very long
389 				if ((threadIter->second->isBackground() == false) &&
390 					(threadIter->second->getStartTime() + m_foregroundThreadsMaxTime < timeNow))
391 				{
392 					// This thread has been running for too long !
393 					threadIter->second->stop();
394 
395 					clog << "Stopped long-running thread " << threadId << endl;
396 				}
397 			}
398 			else
399 			{
400 				// This one will do...
401 				pWorkerThread = threadIter->second;
402 				// Remove it
403 				m_threads.erase(threadIter);
404 #ifdef DEBUG
405 				clog << "ThreadsManager::get_thread: thread " << threadId
406 					<< " is done, " << m_threads.size() << " left" << endl;
407 #endif
408 				break;
409 			}
410 		}
411 
412 		unlock_threads();
413 	}
414 
415 	if (pWorkerThread == NULL)
416 	{
417 		return NULL;
418 	}
419 
420 	if (pWorkerThread->isBackground() == true)
421 	{
422 #ifdef DEBUG
423 		clog << "ThreadsManager::get_thread: thread " << pWorkerThread->getId()
424 			<< " was running in the background" << endl;
425 #endif
426 		--m_backgroundThreadsCount;
427 	}
428 
429 	return pWorkerThread;
430 }
431 
get_next_id(void)432 unsigned int ThreadsManager::get_next_id(void)
433 {
434 	unsigned int nextThreadId = ++m_nextThreadId;
435 
436 	// Reclaim memory on a regular basis
437 	if (nextThreadId % 100 == 0)
438 	{
439 		int inUse = Memory::getUsage();
440 		Memory::reclaim();
441 	}
442 
443 	return nextThreadId;
444 }
445 
start_thread(WorkerThread * pWorkerThread,bool inBackground)446 bool ThreadsManager::start_thread(WorkerThread *pWorkerThread, bool inBackground)
447 {
448 	bool createdThread = false;
449 
450 	if (pWorkerThread == NULL)
451 	{
452 		return false;
453 	}
454 
455 	if (inBackground == true)
456 	{
457 #ifdef DEBUG
458 		clog << "ThreadsManager::start_thread: thread " << pWorkerThread->getId()
459 			<< " will run in the background" << endl;
460 #endif
461 		pWorkerThread->inBackground();
462 		++m_backgroundThreadsCount;
463 	}
464 #ifdef DEBUG
465 	else clog << "ThreadsManager::start_thread: thread " << pWorkerThread->getId()
466 			<< " will run in the foreground" << endl;
467 #endif
468 
469 	// Insert
470 	pair<map<unsigned int, WorkerThread *>::iterator, bool> threadPair;
471 	if (write_lock_threads() == true)
472 	{
473 		threadPair = m_threads.insert(pair<unsigned int, WorkerThread *>(pWorkerThread->getId(), pWorkerThread));
474 		if (threadPair.second == false)
475 		{
476 			delete pWorkerThread;
477 			pWorkerThread = NULL;
478 		}
479 
480 		unlock_threads();
481 	}
482 
483 	// Start the thread
484 	if (pWorkerThread != NULL)
485 	{
486 		Thread *pThread = pWorkerThread->start();
487 		if (pThread != NULL)
488 		{
489 			createdThread = true;
490 		}
491 		else
492 		{
493 			// Erase
494 			if (write_lock_threads() == true)
495 			{
496 				m_threads.erase(threadPair.first);
497 
498 				unlock_threads();
499 			}
500 			delete pWorkerThread;
501 		}
502 	}
503 
504 	return createdThread;
505 }
506 
get_threads_count(void)507 unsigned int ThreadsManager::get_threads_count(void)
508 {
509 	int count = 0;
510 
511 	if (read_lock_threads() == true)
512 	{
513 		count = m_threads.size() - m_backgroundThreadsCount;
514 
515 		unlock_threads();
516 	}
517 #ifdef DEBUG
518 	clog << "ThreadsManager::get_threads_count: " << count << "/"
519 		<< m_backgroundThreadsCount << " threads left" << endl;
520 #endif
521 
522 	// A negative count would mean that a background thread
523 	// exited without signaling
524 	return (unsigned int)max(count , 0);
525 }
526 
stop_threads(void)527 void ThreadsManager::stop_threads(void)
528 {
529 	if (m_threads.empty() == false)
530 	{
531 		if (write_lock_threads() == true)
532 		{
533 			// Stop threads
534 			for_each(m_threads.begin(), m_threads.end(), StopThreadFunc());
535 
536 			unlock_threads();
537 		}
538 	}
539 }
540 
connect(void)541 void ThreadsManager::connect(void)
542 {
543 	// The previous manager may have been signalled by our threads
544 	WorkerThread *pThread = get_thread();
545 	while (pThread != NULL)
546 	{
547 		m_onThreadEndSignal(pThread);
548 
549 		// Next
550 		pThread = get_thread();
551 	}
552 #ifdef DEBUG
553 	clog << "ThreadsManager::connect: connecting" << endl;
554 #endif
555 
556 	// Connect the dispatcher
557 	m_threadsEndConnection = WorkerThread::getDispatcher().connect(
558 		sigc::mem_fun(*this, &ThreadsManager::on_thread_signal));
559 #ifdef DEBUG
560 	clog << "ThreadsManager::connect: connected" << endl;
561 #endif
562 }
563 
disconnect(void)564 void ThreadsManager::disconnect(void)
565 {
566 	m_threadsEndConnection.block();
567 	m_threadsEndConnection.disconnect();
568 #ifdef DEBUG
569 	clog << "ThreadsManager::disconnect: disconnected" << endl;
570 #endif
571 }
572 
on_thread_signal()573 void ThreadsManager::on_thread_signal()
574 {
575 	WorkerThread *pThread = get_thread();
576 	if (pThread == NULL)
577 	{
578 #ifdef DEBUG
579 		clog << "ThreadsManager::on_thread_signal: foreign thread" << endl;
580 #endif
581 		return;
582 	}
583 	m_onThreadEndSignal(pThread);
584 }
585 
mustQuit(bool quit)586 bool ThreadsManager::mustQuit(bool quit)
587 {
588 	if (quit == true)
589 	{
590 		m_mustQuit = true;
591 		stop_threads();
592 	}
593 
594 	return m_mustQuit;
595 }
596 
MonitorThread(MonitorInterface * pMonitor,MonitorHandler * pHandler)597 MonitorThread::MonitorThread(MonitorInterface *pMonitor, MonitorHandler *pHandler) :
598 	WorkerThread(),
599 	m_ctrlReadPipe(-1),
600 	m_ctrlWritePipe(-1),
601 	m_pMonitor(pMonitor),
602 	m_pHandler(pHandler)
603 {
604 	int pipeFds[2];
605 
606 #ifdef HAVE_PIPE
607 	if (pipe(pipeFds) == 0)
608 	{
609 		// This pipe will allow to stop select()
610 		m_ctrlReadPipe = pipeFds[0];
611 		m_ctrlWritePipe = pipeFds[1];
612 	}
613 #endif
614 }
615 
~MonitorThread()616 MonitorThread::~MonitorThread()
617 {
618 	if (m_ctrlReadPipe >= 0)
619 	{
620 		close(m_ctrlReadPipe);
621 	}
622 	if (m_ctrlWritePipe >= 0)
623 	{
624 		close(m_ctrlWritePipe);
625 	}
626 }
627 
getType(void) const628 string MonitorThread::getType(void) const
629 {
630 	return "MonitorThread";
631 }
632 
stop(void)633 void MonitorThread::stop(void)
634 {
635 	WorkerThread::stop();
636 	if (m_ctrlWritePipe >= 0)
637 	{
638 		write(m_ctrlWritePipe, "X", 1);
639 	}
640 }
641 
isFileBlacklisted(const string & location)642 bool MonitorThread::isFileBlacklisted(const string &location)
643 {
644 	return false;
645 }
646 
fileModified(const string & location)647 void MonitorThread::fileModified(const string &location)
648 {
649 	// Pass this event directly to the handler
650 	m_pHandler->fileModified(location);
651 }
652 
processEvents(void)653 void MonitorThread::processEvents(void)
654 {
655 	queue<MonitorEvent> events;
656 
657 #ifdef DEBUG
658 	clog << "MonitorThread::processEvents: checking for events" << endl;
659 #endif
660 	if ((m_pMonitor == NULL) ||
661 		(m_pMonitor->retrievePendingEvents(events) == false))
662 	{
663 		return;
664 	}
665 #ifdef DEBUG
666 	clog << "MonitorThread::processEvents: retrieved " << events.size() << " events" << endl;
667 #endif
668 
669 	while ((events.empty() == false) &&
670 		(m_done == false))
671 	{
672 		MonitorEvent &event = events.front();
673 
674 		if ((event.m_location.empty() == true) ||
675 			(event.m_type == MonitorEvent::UNKNOWN))
676 		{
677 			// Next
678 			events.pop();
679 			continue;
680 		}
681 #ifdef DEBUG
682 		clog << "MonitorThread::processEvents: event " << event.m_type << " on "
683 			<< event.m_location << " " << event.m_isDirectory << endl;
684 #endif
685 
686 		// Skip dotfiles and blacklisted files
687 		Url urlObj("file://" + event.m_location);
688 		if ((urlObj.getFile()[0] == '.') ||
689 			(isFileBlacklisted(event.m_location) == true))
690 		{
691 			// Next
692 			events.pop();
693 			continue;
694 		}
695 
696 		// What's the event code ?
697 		if (event.m_type == MonitorEvent::EXISTS)
698 		{
699 			if (event.m_isDirectory == false)
700 			{
701 				m_pHandler->fileExists(event.m_location);
702 			}
703 		}
704 		else if (event.m_type == MonitorEvent::CREATED)
705 		{
706 			if (event.m_isDirectory == false)
707 			{
708 				m_pHandler->fileCreated(event.m_location);
709 			}
710 			else
711 			{
712 				m_pHandler->directoryCreated(event.m_location);
713 			}
714 		}
715 		else if (event.m_type == MonitorEvent::WRITE_CLOSED)
716 		{
717 			if (event.m_isDirectory == false)
718 			{
719 				fileModified(event.m_location);
720 			}
721 		}
722 		else if (event.m_type == MonitorEvent::MOVED)
723 		{
724 			if (event.m_isDirectory == false)
725 			{
726 				m_pHandler->fileMoved(event.m_location, event.m_previousLocation);
727 			}
728 			else
729 			{
730 				// We should receive this only if the destination directory is monitored too
731 				m_pHandler->directoryMoved(event.m_location, event.m_previousLocation);
732 			}
733 		}
734 		else if (event.m_type == MonitorEvent::DELETED)
735 		{
736 			if (event.m_isDirectory == false)
737 			{
738 				m_pHandler->fileDeleted(event.m_location);
739 			}
740 			else
741 			{
742 				// The monitor should have stopped monitoring this
743 				// In practice, events for the files in this directory will already have been received
744 				m_pHandler->directoryDeleted(event.m_location);
745 			}
746 		}
747 
748 		// Next
749 		events.pop();
750 	}
751 }
752 
doWork(void)753 void MonitorThread::doWork(void)
754 {
755 	if ((m_pHandler == NULL) ||
756 		(m_pMonitor == NULL))
757 	{
758 		m_errorNum = NO_MONITORING;
759 		return;
760 	}
761 
762 	// Initialize the handler
763 	m_pHandler->initialize();
764 
765 	// Get the list of files to monitor
766 	const set<string> &fileNames = m_pHandler->getFileNames();
767 	for (set<string>::const_iterator fileIter = fileNames.begin();
768 		fileIter != fileNames.end(); ++fileIter)
769 	{
770 		m_pMonitor->addLocation(*fileIter, false);
771 	}
772 	// Directories, if any, are set elsewhere
773 	// In the case of OnDiskHandler, they are set by DirectoryScannerThread
774 
775 	// There might already be events that need processing
776 	processEvents();
777 
778 	// Wait for something to happen
779 	while (m_done == false)
780 	{
781 		struct timeval selectTimeout;
782 		fd_set listenSet;
783 
784 		selectTimeout.tv_sec = 60;
785 		selectTimeout.tv_usec = 0;
786 
787 		FD_ZERO(&listenSet);
788 		if (m_ctrlReadPipe >= 0)
789 		{
790 			FD_SET(m_ctrlReadPipe, &listenSet);
791 		}
792 
793 		// The file descriptor may change over time
794 		int monitorFd = m_pMonitor->getFileDescriptor();
795 		FD_SET(monitorFd, &listenSet);
796 		if (monitorFd < 0)
797 		{
798 			m_errorNum = MONITORING_FAILED;
799 			return;
800 		}
801 
802 		int fdCount = select(max(monitorFd, m_ctrlReadPipe) + 1, &listenSet, NULL, NULL, &selectTimeout);
803 		if ((fdCount < 0) &&
804 			(errno != EINTR))
805 		{
806 #ifdef DEBUG
807 			clog << "MonitorThread::doWork: select() failed" << endl;
808 #endif
809 			break;
810 		}
811 		else if (FD_ISSET(monitorFd, &listenSet))
812 		{
813 			processEvents();
814 		}
815 	}
816 }
817 
818