1 /*
2 * Copyright 2005-2014 Fabrice Colin
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 */
18
19 #include <sys/types.h>
20 #include <dirent.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <fcntl.h>
26 #include <string.h>
27 #include <signal.h>
28 #include <time.h>
29 #include <errno.h>
30 #ifdef __OpenBSD__
31 #include <sys/param.h>
32 #include <sys/sysctl.h>
33 #endif
34 #include <exception>
35 #include <iostream>
36 #include <fstream>
37 #include <algorithm>
38 #include <glibmm/convert.h>
39 #include <glibmm/exception.h>
40 #include <glibmm/miscutils.h>
41 #include <glibmm/stringutils.h>
42
43 #include "config.h"
44 #include "NLS.h"
45 #include "Memory.h"
46 #include "Url.h"
47 #include "WorkerThread.h"
48
49 using namespace std;
50 using namespace Glib;
51
52 // A function object to stop threads with for_each()
53 struct StopThreadFunc
54 {
55 public:
operator ()StopThreadFunc56 void operator()(map<unsigned int, WorkerThread *>::value_type &p)
57 {
58 p.second->stop();
59 #ifdef DEBUG
60 clog << "StopThreadFunc: stopped thread " << p.second->getId() << endl;
61 #endif
62 Thread::yield();
63 }
64 };
65
66 Dispatcher WorkerThread::m_dispatcher;
67 pthread_mutex_t WorkerThread::m_dispatcherMutex = PTHREAD_MUTEX_INITIALIZER;
68 bool WorkerThread::m_immediateFlush = true;
69
errorToString(int errorNum)70 string WorkerThread::errorToString(int errorNum)
71 {
72 if (errorNum == 0)
73 {
74 return "";
75 }
76
77 if (errorNum < INDEX_ERROR)
78 {
79 ustring errorText(Glib::strerror(errorNum));
80
81 return errorText.c_str();
82 }
83
84 // Internal error codes
85 switch (errorNum)
86 {
87 case INDEX_ERROR:
88 return _("Index error");
89 case INDEXING_FAILED:
90 return _("Couldn't index document");
91 case UPDATE_FAILED:
92 return _("Couldn't update document");
93 case UNINDEXING_FAILED:
94 return _("Couldn't unindex document(s)");
95 case QUERY_FAILED:
96 return _("Couldn't run query on search engine");
97 case HISTORY_FAILED:
98 return _("Couldn't get history for search engine");
99 case DOWNLOAD_FAILED:
100 return _("Couldn't retrieve document");
101 case MONITORING_FAILED:
102 return _("File monitor error");
103 case OPENDIR_FAILED:
104 return _("Couldn't open directory");
105 case UNKNOWN_INDEX:
106 return _("Index doesn't exist");
107 case UNKNOWN_ENGINE:
108 return _("Couldn't create search engine");
109 case UNSUPPORTED_TYPE:
110 return _("Cannot index document type");
111 case UNSUPPORTED_PROTOCOL:
112 return _("No downloader for this protocol");
113 case ROBOTS_FORBIDDEN:
114 return _("Robots META tag forbids indexing");
115 case NO_MONITORING:
116 return _("No monitoring handler");
117 default:
118 break;
119 }
120
121 return _("Unknown error");
122 }
123
getDispatcher(void)124 Dispatcher &WorkerThread::getDispatcher(void)
125 {
126 return m_dispatcher;
127 }
128
immediateFlush(bool doFlush)129 void WorkerThread::immediateFlush(bool doFlush)
130 {
131 m_immediateFlush = doFlush;
132 }
133
WorkerThread()134 WorkerThread::WorkerThread() :
135 m_startTime(time(NULL)),
136 m_id(ThreadsManager::get_next_id()),
137 m_background(false),
138 m_stopped(false),
139 m_done(false),
140 m_errorNum(0)
141 {
142 }
143
~WorkerThread()144 WorkerThread::~WorkerThread()
145 {
146 }
147
getStartTime(void) const148 time_t WorkerThread::getStartTime(void) const
149 {
150 return m_startTime;
151 }
152
setId(unsigned int id)153 void WorkerThread::setId(unsigned int id)
154 {
155 m_id = id;
156 }
157
getId(void) const158 unsigned int WorkerThread::getId(void) const
159 {
160 return m_id;
161 }
162
inBackground(void)163 void WorkerThread::inBackground(void)
164 {
165 m_background = true;
166 }
167
isBackground(void) const168 bool WorkerThread::isBackground(void) const
169 {
170 return m_background;
171 }
172
operator <(const WorkerThread & other) const173 bool WorkerThread::operator<(const WorkerThread &other) const
174 {
175 return m_id < other.m_id;
176 }
177
start(void)178 Glib::Thread *WorkerThread::start(void)
179 {
180 #ifdef DEBUG
181 clog << "WorkerThread::start: " << getType() << " " << m_id << endl;
182 #endif
183 // Create non-joinable threads
184 return Thread::create(sigc::mem_fun(*this, &WorkerThread::threadHandler), false);
185 }
186
stop(void)187 void WorkerThread::stop(void)
188 {
189 m_stopped = m_done = true;
190 }
191
isStopped(void) const192 bool WorkerThread::isStopped(void) const
193 {
194 return m_stopped;
195 }
196
isDone(void) const197 bool WorkerThread::isDone(void) const
198 {
199 return m_done;
200 }
201
getErrorNum(void) const202 int WorkerThread::getErrorNum(void) const
203 {
204 return m_errorNum;
205 }
206
getStatus(void) const207 string WorkerThread::getStatus(void) const
208 {
209 string status(errorToString(m_errorNum));
210
211 if ((status.empty() == false) &&
212 (m_errorParam.empty() == false))
213 {
214 status += " (";
215 status += m_errorParam;
216 status += ")";
217 }
218
219 return status;
220 }
221
threadHandler(void)222 void WorkerThread::threadHandler(void)
223 {
224 #ifdef DEBUG
225 clog << "WorkerThread::threadHandler: thread " << m_id << endl;
226 #endif
227 try
228 {
229 doWork();
230 }
231 catch (Glib::Exception &ex)
232 {
233 clog << "Glib exception in thread " << m_id << ", type " << getType()
234 << ":" << ex.what() << endl;
235 m_errorNum = UNKNOWN_ERROR;
236 }
237 catch (std::exception &ex)
238 {
239 clog << "STL exception in thread " << m_id << ", type " << getType()
240 << ":" << ex.what() << endl;
241 m_errorNum = UNKNOWN_ERROR;
242 }
243 catch (...)
244 {
245 clog << "Unknown exception in thread " << m_id << ", type " << getType() << endl;
246 m_errorNum = UNKNOWN_ERROR;
247 }
248
249 emitSignal();
250 }
251
emitSignal(void)252 void WorkerThread::emitSignal(void)
253 {
254 m_done = true;
255 if (pthread_mutex_lock(&m_dispatcherMutex) == 0)
256 {
257 #ifdef DEBUG
258 clog << "WorkerThread::emitSignal: signaling end of thread " << m_id << endl;
259 #endif
260 m_dispatcher();
261
262 pthread_mutex_unlock(&m_dispatcherMutex);
263 }
264 }
265
266 unsigned int ThreadsManager::m_nextThreadId = 1;
267
ThreadsManager(const string & defaultIndexLocation,unsigned int maxThreadsTime)268 ThreadsManager::ThreadsManager(const string &defaultIndexLocation,
269 unsigned int maxThreadsTime) :
270 m_mustQuit(false),
271 m_defaultIndexLocation(defaultIndexLocation),
272 m_maxIndexThreads(1),
273 m_backgroundThreadsCount(0),
274 m_foregroundThreadsMaxTime(maxThreadsTime),
275 m_numCPUs(1)
276 {
277 pthread_rwlock_init(&m_threadsLock, NULL);
278 pthread_rwlock_init(&m_listsLock, NULL);
279
280 // Override the number of indexing threads ?
281 char *pEnvVar = getenv("PINOT_MAXIMUM_INDEX_THREADS");
282 if ((pEnvVar != NULL) &&
283 (strlen(pEnvVar) > 0))
284 {
285 int threadsNum = atoi(pEnvVar);
286
287 if (threadsNum > 0)
288 {
289 m_maxIndexThreads = (unsigned int)threadsNum;
290 }
291 }
292
293 #ifdef __OpenBSD__
294 int mib[2], ncpus;
295
296 mib[0] = CTL_HW;
297 mib[1] = HW_NCPU;
298 size_t len = sizeof(ncpus);
299 if (sysctl(mib, 2, &ncpus, &len, NULL, 0) > 0)
300 {
301 m_numCPUs = ncpus;
302 }
303 #else
304 #ifdef HAVE_SYSCONF
305 m_numCPUs = sysconf(_SC_NPROCESSORS_ONLN);
306 #endif
307 #endif
308 }
309
~ThreadsManager()310 ThreadsManager::~ThreadsManager()
311 {
312 stop_threads();
313 // Destroy the read/write locks
314 pthread_rwlock_destroy(&m_listsLock);
315 pthread_rwlock_destroy(&m_threadsLock);
316 }
317
read_lock_threads(void)318 bool ThreadsManager::read_lock_threads(void)
319 {
320 if (pthread_rwlock_rdlock(&m_threadsLock) == 0)
321 {
322 return true;
323 }
324
325 return false;
326 }
327
write_lock_threads(void)328 bool ThreadsManager::write_lock_threads(void)
329 {
330 if (pthread_rwlock_wrlock(&m_threadsLock) == 0)
331 {
332 return true;
333 }
334
335 return false;
336 }
337
unlock_threads(void)338 void ThreadsManager::unlock_threads(void)
339 {
340 pthread_rwlock_unlock(&m_threadsLock);
341 }
342
read_lock_lists(void)343 bool ThreadsManager::read_lock_lists(void)
344 {
345 if (pthread_rwlock_rdlock(&m_listsLock) == 0)
346 {
347 return true;
348 }
349
350 return false;
351 }
352
write_lock_lists(void)353 bool ThreadsManager::write_lock_lists(void)
354 {
355 if (pthread_rwlock_wrlock(&m_listsLock) == 0)
356 {
357 return true;
358 }
359
360 return false;
361 }
362
unlock_lists(void)363 void ThreadsManager::unlock_lists(void)
364 {
365 pthread_rwlock_unlock(&m_listsLock);
366 }
367
get_thread(void)368 WorkerThread *ThreadsManager::get_thread(void)
369 {
370 time_t timeNow = time(NULL);
371 WorkerThread *pWorkerThread = NULL;
372
373 // Get the first thread that's finished
374 if (write_lock_threads() == true)
375 {
376 for (map<unsigned int, WorkerThread *>::iterator threadIter = m_threads.begin();
377 threadIter != m_threads.end(); ++threadIter)
378 {
379 unsigned int threadId = threadIter->first;
380
381 if (threadIter->second->isDone() == false)
382 {
383 #ifdef DEBUG
384 clog << "ThreadsManager::get_thread: thread "
385 << threadId << " is not done" << endl;
386 #endif
387
388 // Foreground threads ought not to run very long
389 if ((threadIter->second->isBackground() == false) &&
390 (threadIter->second->getStartTime() + m_foregroundThreadsMaxTime < timeNow))
391 {
392 // This thread has been running for too long !
393 threadIter->second->stop();
394
395 clog << "Stopped long-running thread " << threadId << endl;
396 }
397 }
398 else
399 {
400 // This one will do...
401 pWorkerThread = threadIter->second;
402 // Remove it
403 m_threads.erase(threadIter);
404 #ifdef DEBUG
405 clog << "ThreadsManager::get_thread: thread " << threadId
406 << " is done, " << m_threads.size() << " left" << endl;
407 #endif
408 break;
409 }
410 }
411
412 unlock_threads();
413 }
414
415 if (pWorkerThread == NULL)
416 {
417 return NULL;
418 }
419
420 if (pWorkerThread->isBackground() == true)
421 {
422 #ifdef DEBUG
423 clog << "ThreadsManager::get_thread: thread " << pWorkerThread->getId()
424 << " was running in the background" << endl;
425 #endif
426 --m_backgroundThreadsCount;
427 }
428
429 return pWorkerThread;
430 }
431
get_next_id(void)432 unsigned int ThreadsManager::get_next_id(void)
433 {
434 unsigned int nextThreadId = ++m_nextThreadId;
435
436 // Reclaim memory on a regular basis
437 if (nextThreadId % 100 == 0)
438 {
439 int inUse = Memory::getUsage();
440 Memory::reclaim();
441 }
442
443 return nextThreadId;
444 }
445
start_thread(WorkerThread * pWorkerThread,bool inBackground)446 bool ThreadsManager::start_thread(WorkerThread *pWorkerThread, bool inBackground)
447 {
448 bool createdThread = false;
449
450 if (pWorkerThread == NULL)
451 {
452 return false;
453 }
454
455 if (inBackground == true)
456 {
457 #ifdef DEBUG
458 clog << "ThreadsManager::start_thread: thread " << pWorkerThread->getId()
459 << " will run in the background" << endl;
460 #endif
461 pWorkerThread->inBackground();
462 ++m_backgroundThreadsCount;
463 }
464 #ifdef DEBUG
465 else clog << "ThreadsManager::start_thread: thread " << pWorkerThread->getId()
466 << " will run in the foreground" << endl;
467 #endif
468
469 // Insert
470 pair<map<unsigned int, WorkerThread *>::iterator, bool> threadPair;
471 if (write_lock_threads() == true)
472 {
473 threadPair = m_threads.insert(pair<unsigned int, WorkerThread *>(pWorkerThread->getId(), pWorkerThread));
474 if (threadPair.second == false)
475 {
476 delete pWorkerThread;
477 pWorkerThread = NULL;
478 }
479
480 unlock_threads();
481 }
482
483 // Start the thread
484 if (pWorkerThread != NULL)
485 {
486 Thread *pThread = pWorkerThread->start();
487 if (pThread != NULL)
488 {
489 createdThread = true;
490 }
491 else
492 {
493 // Erase
494 if (write_lock_threads() == true)
495 {
496 m_threads.erase(threadPair.first);
497
498 unlock_threads();
499 }
500 delete pWorkerThread;
501 }
502 }
503
504 return createdThread;
505 }
506
get_threads_count(void)507 unsigned int ThreadsManager::get_threads_count(void)
508 {
509 int count = 0;
510
511 if (read_lock_threads() == true)
512 {
513 count = m_threads.size() - m_backgroundThreadsCount;
514
515 unlock_threads();
516 }
517 #ifdef DEBUG
518 clog << "ThreadsManager::get_threads_count: " << count << "/"
519 << m_backgroundThreadsCount << " threads left" << endl;
520 #endif
521
522 // A negative count would mean that a background thread
523 // exited without signaling
524 return (unsigned int)max(count , 0);
525 }
526
stop_threads(void)527 void ThreadsManager::stop_threads(void)
528 {
529 if (m_threads.empty() == false)
530 {
531 if (write_lock_threads() == true)
532 {
533 // Stop threads
534 for_each(m_threads.begin(), m_threads.end(), StopThreadFunc());
535
536 unlock_threads();
537 }
538 }
539 }
540
connect(void)541 void ThreadsManager::connect(void)
542 {
543 // The previous manager may have been signalled by our threads
544 WorkerThread *pThread = get_thread();
545 while (pThread != NULL)
546 {
547 m_onThreadEndSignal(pThread);
548
549 // Next
550 pThread = get_thread();
551 }
552 #ifdef DEBUG
553 clog << "ThreadsManager::connect: connecting" << endl;
554 #endif
555
556 // Connect the dispatcher
557 m_threadsEndConnection = WorkerThread::getDispatcher().connect(
558 sigc::mem_fun(*this, &ThreadsManager::on_thread_signal));
559 #ifdef DEBUG
560 clog << "ThreadsManager::connect: connected" << endl;
561 #endif
562 }
563
disconnect(void)564 void ThreadsManager::disconnect(void)
565 {
566 m_threadsEndConnection.block();
567 m_threadsEndConnection.disconnect();
568 #ifdef DEBUG
569 clog << "ThreadsManager::disconnect: disconnected" << endl;
570 #endif
571 }
572
on_thread_signal()573 void ThreadsManager::on_thread_signal()
574 {
575 WorkerThread *pThread = get_thread();
576 if (pThread == NULL)
577 {
578 #ifdef DEBUG
579 clog << "ThreadsManager::on_thread_signal: foreign thread" << endl;
580 #endif
581 return;
582 }
583 m_onThreadEndSignal(pThread);
584 }
585
mustQuit(bool quit)586 bool ThreadsManager::mustQuit(bool quit)
587 {
588 if (quit == true)
589 {
590 m_mustQuit = true;
591 stop_threads();
592 }
593
594 return m_mustQuit;
595 }
596
MonitorThread(MonitorInterface * pMonitor,MonitorHandler * pHandler)597 MonitorThread::MonitorThread(MonitorInterface *pMonitor, MonitorHandler *pHandler) :
598 WorkerThread(),
599 m_ctrlReadPipe(-1),
600 m_ctrlWritePipe(-1),
601 m_pMonitor(pMonitor),
602 m_pHandler(pHandler)
603 {
604 int pipeFds[2];
605
606 #ifdef HAVE_PIPE
607 if (pipe(pipeFds) == 0)
608 {
609 // This pipe will allow to stop select()
610 m_ctrlReadPipe = pipeFds[0];
611 m_ctrlWritePipe = pipeFds[1];
612 }
613 #endif
614 }
615
~MonitorThread()616 MonitorThread::~MonitorThread()
617 {
618 if (m_ctrlReadPipe >= 0)
619 {
620 close(m_ctrlReadPipe);
621 }
622 if (m_ctrlWritePipe >= 0)
623 {
624 close(m_ctrlWritePipe);
625 }
626 }
627
getType(void) const628 string MonitorThread::getType(void) const
629 {
630 return "MonitorThread";
631 }
632
stop(void)633 void MonitorThread::stop(void)
634 {
635 WorkerThread::stop();
636 if (m_ctrlWritePipe >= 0)
637 {
638 write(m_ctrlWritePipe, "X", 1);
639 }
640 }
641
isFileBlacklisted(const string & location)642 bool MonitorThread::isFileBlacklisted(const string &location)
643 {
644 return false;
645 }
646
fileModified(const string & location)647 void MonitorThread::fileModified(const string &location)
648 {
649 // Pass this event directly to the handler
650 m_pHandler->fileModified(location);
651 }
652
processEvents(void)653 void MonitorThread::processEvents(void)
654 {
655 queue<MonitorEvent> events;
656
657 #ifdef DEBUG
658 clog << "MonitorThread::processEvents: checking for events" << endl;
659 #endif
660 if ((m_pMonitor == NULL) ||
661 (m_pMonitor->retrievePendingEvents(events) == false))
662 {
663 return;
664 }
665 #ifdef DEBUG
666 clog << "MonitorThread::processEvents: retrieved " << events.size() << " events" << endl;
667 #endif
668
669 while ((events.empty() == false) &&
670 (m_done == false))
671 {
672 MonitorEvent &event = events.front();
673
674 if ((event.m_location.empty() == true) ||
675 (event.m_type == MonitorEvent::UNKNOWN))
676 {
677 // Next
678 events.pop();
679 continue;
680 }
681 #ifdef DEBUG
682 clog << "MonitorThread::processEvents: event " << event.m_type << " on "
683 << event.m_location << " " << event.m_isDirectory << endl;
684 #endif
685
686 // Skip dotfiles and blacklisted files
687 Url urlObj("file://" + event.m_location);
688 if ((urlObj.getFile()[0] == '.') ||
689 (isFileBlacklisted(event.m_location) == true))
690 {
691 // Next
692 events.pop();
693 continue;
694 }
695
696 // What's the event code ?
697 if (event.m_type == MonitorEvent::EXISTS)
698 {
699 if (event.m_isDirectory == false)
700 {
701 m_pHandler->fileExists(event.m_location);
702 }
703 }
704 else if (event.m_type == MonitorEvent::CREATED)
705 {
706 if (event.m_isDirectory == false)
707 {
708 m_pHandler->fileCreated(event.m_location);
709 }
710 else
711 {
712 m_pHandler->directoryCreated(event.m_location);
713 }
714 }
715 else if (event.m_type == MonitorEvent::WRITE_CLOSED)
716 {
717 if (event.m_isDirectory == false)
718 {
719 fileModified(event.m_location);
720 }
721 }
722 else if (event.m_type == MonitorEvent::MOVED)
723 {
724 if (event.m_isDirectory == false)
725 {
726 m_pHandler->fileMoved(event.m_location, event.m_previousLocation);
727 }
728 else
729 {
730 // We should receive this only if the destination directory is monitored too
731 m_pHandler->directoryMoved(event.m_location, event.m_previousLocation);
732 }
733 }
734 else if (event.m_type == MonitorEvent::DELETED)
735 {
736 if (event.m_isDirectory == false)
737 {
738 m_pHandler->fileDeleted(event.m_location);
739 }
740 else
741 {
742 // The monitor should have stopped monitoring this
743 // In practice, events for the files in this directory will already have been received
744 m_pHandler->directoryDeleted(event.m_location);
745 }
746 }
747
748 // Next
749 events.pop();
750 }
751 }
752
doWork(void)753 void MonitorThread::doWork(void)
754 {
755 if ((m_pHandler == NULL) ||
756 (m_pMonitor == NULL))
757 {
758 m_errorNum = NO_MONITORING;
759 return;
760 }
761
762 // Initialize the handler
763 m_pHandler->initialize();
764
765 // Get the list of files to monitor
766 const set<string> &fileNames = m_pHandler->getFileNames();
767 for (set<string>::const_iterator fileIter = fileNames.begin();
768 fileIter != fileNames.end(); ++fileIter)
769 {
770 m_pMonitor->addLocation(*fileIter, false);
771 }
772 // Directories, if any, are set elsewhere
773 // In the case of OnDiskHandler, they are set by DirectoryScannerThread
774
775 // There might already be events that need processing
776 processEvents();
777
778 // Wait for something to happen
779 while (m_done == false)
780 {
781 struct timeval selectTimeout;
782 fd_set listenSet;
783
784 selectTimeout.tv_sec = 60;
785 selectTimeout.tv_usec = 0;
786
787 FD_ZERO(&listenSet);
788 if (m_ctrlReadPipe >= 0)
789 {
790 FD_SET(m_ctrlReadPipe, &listenSet);
791 }
792
793 // The file descriptor may change over time
794 int monitorFd = m_pMonitor->getFileDescriptor();
795 FD_SET(monitorFd, &listenSet);
796 if (monitorFd < 0)
797 {
798 m_errorNum = MONITORING_FAILED;
799 return;
800 }
801
802 int fdCount = select(max(monitorFd, m_ctrlReadPipe) + 1, &listenSet, NULL, NULL, &selectTimeout);
803 if ((fdCount < 0) &&
804 (errno != EINTR))
805 {
806 #ifdef DEBUG
807 clog << "MonitorThread::doWork: select() failed" << endl;
808 #endif
809 break;
810 }
811 else if (FD_ISSET(monitorFd, &listenSet))
812 {
813 processEvents();
814 }
815 }
816 }
817
818