1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "ConcurrentMergeScheduler.h"
9 #include "_ConcurrentMergeScheduler.h"
10 #include "IndexWriter.h"
11 #include "TestPoint.h"
12 #include "StringUtils.h"
13 
14 namespace Lucene {
15 
16 Collection<ConcurrentMergeSchedulerPtr> ConcurrentMergeScheduler::allInstances;
17 bool ConcurrentMergeScheduler::anyExceptions = false;
18 
ConcurrentMergeScheduler()19 ConcurrentMergeScheduler::ConcurrentMergeScheduler() {
20     mergeThreadPriority = -1;
21     mergeThreads = SetMergeThread::newInstance();
22     maxThreadCount = 1;
23     suppressExceptions = false;
24     closed = false;
25 }
26 
~ConcurrentMergeScheduler()27 ConcurrentMergeScheduler::~ConcurrentMergeScheduler() {
28 }
29 
initialize()30 void ConcurrentMergeScheduler::initialize() {
31     // Only for testing
32     if (allInstances) {
33         addMyself();
34     }
35 }
36 
setMaxThreadCount(int32_t count)37 void ConcurrentMergeScheduler::setMaxThreadCount(int32_t count) {
38     if (count < 1) {
39         boost::throw_exception(IllegalArgumentException(L"count should be at least 1"));
40     }
41     maxThreadCount = count;
42 }
43 
getMaxThreadCount()44 int32_t ConcurrentMergeScheduler::getMaxThreadCount() {
45     return maxThreadCount;
46 }
47 
getMergeThreadPriority()48 int32_t ConcurrentMergeScheduler::getMergeThreadPriority() {
49     SyncLock syncLock(this);
50     initMergeThreadPriority();
51     return mergeThreadPriority;
52 }
53 
setMergeThreadPriority(int32_t pri)54 void ConcurrentMergeScheduler::setMergeThreadPriority(int32_t pri) {
55     SyncLock syncLock(this);
56     if (pri > LuceneThread::MAX_THREAD_PRIORITY || pri < LuceneThread::MIN_THREAD_PRIORITY) {
57         boost::throw_exception(IllegalArgumentException(L"priority must be in range " + StringUtils::toString(LuceneThread::MIN_THREAD_PRIORITY) +
58                                L" .. " + StringUtils::toString(LuceneThread::MAX_THREAD_PRIORITY) + L" inclusive"));
59     }
60     mergeThreadPriority = pri;
61 
62     for (SetMergeThread::iterator merge = mergeThreads.begin(); merge != mergeThreads.end(); ++merge) {
63         (*merge)->setThreadPriority(pri);
64     }
65 }
66 
verbose()67 bool ConcurrentMergeScheduler::verbose() {
68     return (!_writer.expired() && IndexWriterPtr(_writer)->verbose());
69 }
70 
message(const String & message)71 void ConcurrentMergeScheduler::message(const String& message) {
72     if (verbose() && !_writer.expired()) {
73         IndexWriterPtr(_writer)->message(L"CMS: " + message);
74     }
75 }
76 
initMergeThreadPriority()77 void ConcurrentMergeScheduler::initMergeThreadPriority() {
78     SyncLock syncLock(this);
79     if (mergeThreadPriority == -1) {
80         // Default to slightly higher priority than our calling thread
81         mergeThreadPriority = std::min(LuceneThread::NORM_THREAD_PRIORITY + 1, LuceneThread::MAX_THREAD_PRIORITY);
82     }
83 }
84 
close()85 void ConcurrentMergeScheduler::close() {
86     sync();
87     closed = true;
88 }
89 
sync()90 void ConcurrentMergeScheduler::sync() {
91     SyncLock syncLock(this);
92     while (mergeThreadCount() > 0) {
93         message(L"now wait for threads; currently " + StringUtils::toString(mergeThreads.size()) + L" still running");
94         wait(1000);
95     }
96     mergeThreads.clear();
97 }
98 
mergeThreadCount()99 int32_t ConcurrentMergeScheduler::mergeThreadCount() {
100     SyncLock syncLock(this);
101     int32_t count = 0;
102     for (SetMergeThread::iterator merge = mergeThreads.begin(); merge != mergeThreads.end(); ++merge) {
103         if ((*merge)->isAlive()) {
104             ++count;
105         }
106     }
107     return count;
108 }
109 
merge(const IndexWriterPtr & writer)110 void ConcurrentMergeScheduler::merge(const IndexWriterPtr& writer) {
111     BOOST_ASSERT(!writer->holdsLock());
112 
113     this->_writer = writer;
114 
115     initMergeThreadPriority();
116 
117     dir = writer->getDirectory();
118 
119     // First, quickly run through the newly proposed merges and add any orthogonal merges (ie a merge not
120     // involving segments already pending to be merged) to the queue.  If we are way behind on merging,
121     // many of these newly proposed merges will likely already be registered.
122     message(L"now merge");
123     message(L"  index: " + writer->segString());
124 
125     // Iterate, pulling from the IndexWriter's queue of pending merges, until it's empty
126     while (true) {
127         OneMergePtr merge(writer->getNextMerge());
128         if (!merge) {
129             message(L"  no more merges pending; now return");
130             return;
131         }
132 
133         // We do this with the primary thread to keep deterministic assignment of segment names
134         writer->mergeInit(merge);
135 
136         bool success = false;
137         LuceneException finally;
138         try {
139             SyncLock syncLock(this);
140             MergeThreadPtr merger;
141             while (mergeThreadCount() >= maxThreadCount) {
142                 message(L"    too many merge threads running; stalling...");
143                 wait(1000);
144             }
145 
146             message(L"  consider merge " + merge->segString(dir));
147 
148             BOOST_ASSERT(mergeThreadCount() < maxThreadCount);
149 
150             // OK to spawn a new merge thread to handle this merge
151             merger = getMergeThread(writer, merge);
152             mergeThreads.add(merger);
153             message(L"    launch new thread");
154 
155             merger->start();
156             success = true;
157         } catch (LuceneException& e) {
158             finally = e;
159         }
160         if (!success) {
161             writer->mergeFinish(merge);
162         }
163         finally.throwException();
164     }
165 }
166 
doMerge(const OneMergePtr & merge)167 void ConcurrentMergeScheduler::doMerge(const OneMergePtr& merge) {
168     TestScope testScope(L"ConcurrentMergeScheduler", L"doMerge");
169     IndexWriterPtr(_writer)->merge(merge);
170 }
171 
getMergeThread(const IndexWriterPtr & writer,const OneMergePtr & merge)172 MergeThreadPtr ConcurrentMergeScheduler::getMergeThread(const IndexWriterPtr& writer, const OneMergePtr& merge) {
173     SyncLock syncLock(this);
174     MergeThreadPtr thread(newLucene<MergeThread>(shared_from_this(), writer, merge));
175     thread->setThreadPriority(mergeThreadPriority);
176     return thread;
177 }
178 
handleMergeException(const LuceneException & exc)179 void ConcurrentMergeScheduler::handleMergeException(const LuceneException& exc) {
180     // When an exception is hit during merge, IndexWriter removes any partial files and then
181     // allows another merge to run.  If whatever caused the error is not transient then the
182     // exception will keep happening, so, we sleep here to avoid saturating CPU in such cases
183     LuceneThread::threadSleep(250); // pause 250 msec
184     boost::throw_exception(MergeException());
185 }
186 
anyUnhandledExceptions()187 bool ConcurrentMergeScheduler::anyUnhandledExceptions() {
188     if (!allInstances) {
189         boost::throw_exception(RuntimeException(L"setTestMode() was not called"));
190     }
191     SyncLock instancesLock(&allInstances);
192     for (Collection<ConcurrentMergeSchedulerPtr>::iterator instance = allInstances.begin(); instance != allInstances.end(); ++instance) {
193         (*instance)->sync();
194     }
195     bool v = anyExceptions;
196     anyExceptions = false;
197     return v;
198 }
199 
clearUnhandledExceptions()200 void ConcurrentMergeScheduler::clearUnhandledExceptions() {
201     SyncLock instancesLock(&allInstances);
202     anyExceptions = false;
203 }
204 
addMyself()205 void ConcurrentMergeScheduler::addMyself() {
206     SyncLock instancesLock(&allInstances);
207     int32_t size = allInstances.size();
208     int32_t upto = 0;
209     for (int32_t i = 0; i < size; ++i) {
210         ConcurrentMergeSchedulerPtr other(allInstances[i]);
211         if (!(other->closed && other->mergeThreadCount() == 0)) {
212             // Keep this one for now: it still has threads or may spawn new threads
213             allInstances[upto++] = other;
214         }
215 
216         allInstances.remove(allInstances.begin() + upto, allInstances.end());
217         allInstances.add(shared_from_this());
218     }
219 }
220 
setSuppressExceptions()221 void ConcurrentMergeScheduler::setSuppressExceptions() {
222     suppressExceptions = true;
223 }
224 
clearSuppressExceptions()225 void ConcurrentMergeScheduler::clearSuppressExceptions() {
226     suppressExceptions = false;
227 }
228 
setTestMode()229 void ConcurrentMergeScheduler::setTestMode() {
230     allInstances = Collection<ConcurrentMergeSchedulerPtr>::newInstance();
231 }
232 
MergeThread(const ConcurrentMergeSchedulerPtr & merger,const IndexWriterPtr & writer,const OneMergePtr & startMerge)233 MergeThread::MergeThread(const ConcurrentMergeSchedulerPtr& merger, const IndexWriterPtr& writer, const OneMergePtr& startMerge) {
234     this->_merger = merger;
235     this->_writer = writer;
236     this->startMerge = startMerge;
237 }
238 
~MergeThread()239 MergeThread::~MergeThread() {
240 }
241 
setRunningMerge(const OneMergePtr & merge)242 void MergeThread::setRunningMerge(const OneMergePtr& merge) {
243     ConcurrentMergeSchedulerPtr merger(_merger);
244     SyncLock syncLock(merger);
245     runningMerge = merge;
246 }
247 
getRunningMerge()248 OneMergePtr MergeThread::getRunningMerge() {
249     ConcurrentMergeSchedulerPtr merger(_merger);
250     SyncLock syncLock(merger);
251     return runningMerge;
252 }
253 
setThreadPriority(int32_t pri)254 void MergeThread::setThreadPriority(int32_t pri) {
255     try {
256         setPriority(pri);
257     } catch (...) {
258     }
259 }
260 
run()261 void MergeThread::run() {
262     // First time through the while loop we do the merge that we were started with
263     OneMergePtr merge(this->startMerge);
264     ConcurrentMergeSchedulerPtr merger(_merger);
265 
266     LuceneException finally;
267     try {
268         merger->message(L"  merge thread: start");
269         IndexWriterPtr writer(_writer);
270 
271         while (true) {
272             setRunningMerge(merge);
273             merger->doMerge(merge);
274 
275             // Subsequent times through the loop we do any new merge that writer says is necessary
276             merge = writer->getNextMerge();
277             if (merge) {
278                 writer->mergeInit(merge);
279                 merger->message(L"  merge thread: do another merge " + merge->segString(merger->dir));
280             } else {
281                 break;
282             }
283         }
284 
285         merger->message(L"  merge thread: done");
286     } catch (MergeAbortedException&) {
287         // Ignore the exception if it was due to abort
288     } catch (LuceneException& e) {
289         if (!merger->suppressExceptions) {
290             // suppressExceptions is normally only set during testing.
291             merger->anyExceptions = true;
292             merger->handleMergeException(e);
293         } else {
294             finally = e;
295         }
296     }
297 
298     {
299         SyncLock syncLock(merger);
300         merger->notifyAll();
301 
302         bool removed = merger->mergeThreads.remove(shared_from_this());
303         BOOST_ASSERT(removed);
304     }
305     finally.throwException();
306 }
307 
308 }
309