1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "ConcurrentMergeScheduler.h"
9 #include "_ConcurrentMergeScheduler.h"
10 #include "IndexWriter.h"
11 #include "TestPoint.h"
12 #include "StringUtils.h"
13
14 namespace Lucene {
15
16 Collection<ConcurrentMergeSchedulerPtr> ConcurrentMergeScheduler::allInstances;
17 bool ConcurrentMergeScheduler::anyExceptions = false;
18
ConcurrentMergeScheduler()19 ConcurrentMergeScheduler::ConcurrentMergeScheduler() {
20 mergeThreadPriority = -1;
21 mergeThreads = SetMergeThread::newInstance();
22 maxThreadCount = 1;
23 suppressExceptions = false;
24 closed = false;
25 }
26
~ConcurrentMergeScheduler()27 ConcurrentMergeScheduler::~ConcurrentMergeScheduler() {
28 }
29
initialize()30 void ConcurrentMergeScheduler::initialize() {
31 // Only for testing
32 if (allInstances) {
33 addMyself();
34 }
35 }
36
setMaxThreadCount(int32_t count)37 void ConcurrentMergeScheduler::setMaxThreadCount(int32_t count) {
38 if (count < 1) {
39 boost::throw_exception(IllegalArgumentException(L"count should be at least 1"));
40 }
41 maxThreadCount = count;
42 }
43
getMaxThreadCount()44 int32_t ConcurrentMergeScheduler::getMaxThreadCount() {
45 return maxThreadCount;
46 }
47
getMergeThreadPriority()48 int32_t ConcurrentMergeScheduler::getMergeThreadPriority() {
49 SyncLock syncLock(this);
50 initMergeThreadPriority();
51 return mergeThreadPriority;
52 }
53
setMergeThreadPriority(int32_t pri)54 void ConcurrentMergeScheduler::setMergeThreadPriority(int32_t pri) {
55 SyncLock syncLock(this);
56 if (pri > LuceneThread::MAX_THREAD_PRIORITY || pri < LuceneThread::MIN_THREAD_PRIORITY) {
57 boost::throw_exception(IllegalArgumentException(L"priority must be in range " + StringUtils::toString(LuceneThread::MIN_THREAD_PRIORITY) +
58 L" .. " + StringUtils::toString(LuceneThread::MAX_THREAD_PRIORITY) + L" inclusive"));
59 }
60 mergeThreadPriority = pri;
61
62 for (SetMergeThread::iterator merge = mergeThreads.begin(); merge != mergeThreads.end(); ++merge) {
63 (*merge)->setThreadPriority(pri);
64 }
65 }
66
verbose()67 bool ConcurrentMergeScheduler::verbose() {
68 return (!_writer.expired() && IndexWriterPtr(_writer)->verbose());
69 }
70
message(const String & message)71 void ConcurrentMergeScheduler::message(const String& message) {
72 if (verbose() && !_writer.expired()) {
73 IndexWriterPtr(_writer)->message(L"CMS: " + message);
74 }
75 }
76
initMergeThreadPriority()77 void ConcurrentMergeScheduler::initMergeThreadPriority() {
78 SyncLock syncLock(this);
79 if (mergeThreadPriority == -1) {
80 // Default to slightly higher priority than our calling thread
81 mergeThreadPriority = std::min(LuceneThread::NORM_THREAD_PRIORITY + 1, LuceneThread::MAX_THREAD_PRIORITY);
82 }
83 }
84
close()85 void ConcurrentMergeScheduler::close() {
86 sync();
87 closed = true;
88 }
89
sync()90 void ConcurrentMergeScheduler::sync() {
91 SyncLock syncLock(this);
92 while (mergeThreadCount() > 0) {
93 message(L"now wait for threads; currently " + StringUtils::toString(mergeThreads.size()) + L" still running");
94 wait(1000);
95 }
96 mergeThreads.clear();
97 }
98
mergeThreadCount()99 int32_t ConcurrentMergeScheduler::mergeThreadCount() {
100 SyncLock syncLock(this);
101 int32_t count = 0;
102 for (SetMergeThread::iterator merge = mergeThreads.begin(); merge != mergeThreads.end(); ++merge) {
103 if ((*merge)->isAlive()) {
104 ++count;
105 }
106 }
107 return count;
108 }
109
merge(const IndexWriterPtr & writer)110 void ConcurrentMergeScheduler::merge(const IndexWriterPtr& writer) {
111 BOOST_ASSERT(!writer->holdsLock());
112
113 this->_writer = writer;
114
115 initMergeThreadPriority();
116
117 dir = writer->getDirectory();
118
119 // First, quickly run through the newly proposed merges and add any orthogonal merges (ie a merge not
120 // involving segments already pending to be merged) to the queue. If we are way behind on merging,
121 // many of these newly proposed merges will likely already be registered.
122 message(L"now merge");
123 message(L" index: " + writer->segString());
124
125 // Iterate, pulling from the IndexWriter's queue of pending merges, until it's empty
126 while (true) {
127 OneMergePtr merge(writer->getNextMerge());
128 if (!merge) {
129 message(L" no more merges pending; now return");
130 return;
131 }
132
133 // We do this with the primary thread to keep deterministic assignment of segment names
134 writer->mergeInit(merge);
135
136 bool success = false;
137 LuceneException finally;
138 try {
139 SyncLock syncLock(this);
140 MergeThreadPtr merger;
141 while (mergeThreadCount() >= maxThreadCount) {
142 message(L" too many merge threads running; stalling...");
143 wait(1000);
144 }
145
146 message(L" consider merge " + merge->segString(dir));
147
148 BOOST_ASSERT(mergeThreadCount() < maxThreadCount);
149
150 // OK to spawn a new merge thread to handle this merge
151 merger = getMergeThread(writer, merge);
152 mergeThreads.add(merger);
153 message(L" launch new thread");
154
155 merger->start();
156 success = true;
157 } catch (LuceneException& e) {
158 finally = e;
159 }
160 if (!success) {
161 writer->mergeFinish(merge);
162 }
163 finally.throwException();
164 }
165 }
166
doMerge(const OneMergePtr & merge)167 void ConcurrentMergeScheduler::doMerge(const OneMergePtr& merge) {
168 TestScope testScope(L"ConcurrentMergeScheduler", L"doMerge");
169 IndexWriterPtr(_writer)->merge(merge);
170 }
171
getMergeThread(const IndexWriterPtr & writer,const OneMergePtr & merge)172 MergeThreadPtr ConcurrentMergeScheduler::getMergeThread(const IndexWriterPtr& writer, const OneMergePtr& merge) {
173 SyncLock syncLock(this);
174 MergeThreadPtr thread(newLucene<MergeThread>(shared_from_this(), writer, merge));
175 thread->setThreadPriority(mergeThreadPriority);
176 return thread;
177 }
178
handleMergeException(const LuceneException & exc)179 void ConcurrentMergeScheduler::handleMergeException(const LuceneException& exc) {
180 // When an exception is hit during merge, IndexWriter removes any partial files and then
181 // allows another merge to run. If whatever caused the error is not transient then the
182 // exception will keep happening, so, we sleep here to avoid saturating CPU in such cases
183 LuceneThread::threadSleep(250); // pause 250 msec
184 boost::throw_exception(MergeException());
185 }
186
anyUnhandledExceptions()187 bool ConcurrentMergeScheduler::anyUnhandledExceptions() {
188 if (!allInstances) {
189 boost::throw_exception(RuntimeException(L"setTestMode() was not called"));
190 }
191 SyncLock instancesLock(&allInstances);
192 for (Collection<ConcurrentMergeSchedulerPtr>::iterator instance = allInstances.begin(); instance != allInstances.end(); ++instance) {
193 (*instance)->sync();
194 }
195 bool v = anyExceptions;
196 anyExceptions = false;
197 return v;
198 }
199
clearUnhandledExceptions()200 void ConcurrentMergeScheduler::clearUnhandledExceptions() {
201 SyncLock instancesLock(&allInstances);
202 anyExceptions = false;
203 }
204
addMyself()205 void ConcurrentMergeScheduler::addMyself() {
206 SyncLock instancesLock(&allInstances);
207 int32_t size = allInstances.size();
208 int32_t upto = 0;
209 for (int32_t i = 0; i < size; ++i) {
210 ConcurrentMergeSchedulerPtr other(allInstances[i]);
211 if (!(other->closed && other->mergeThreadCount() == 0)) {
212 // Keep this one for now: it still has threads or may spawn new threads
213 allInstances[upto++] = other;
214 }
215
216 allInstances.remove(allInstances.begin() + upto, allInstances.end());
217 allInstances.add(shared_from_this());
218 }
219 }
220
setSuppressExceptions()221 void ConcurrentMergeScheduler::setSuppressExceptions() {
222 suppressExceptions = true;
223 }
224
clearSuppressExceptions()225 void ConcurrentMergeScheduler::clearSuppressExceptions() {
226 suppressExceptions = false;
227 }
228
setTestMode()229 void ConcurrentMergeScheduler::setTestMode() {
230 allInstances = Collection<ConcurrentMergeSchedulerPtr>::newInstance();
231 }
232
MergeThread(const ConcurrentMergeSchedulerPtr & merger,const IndexWriterPtr & writer,const OneMergePtr & startMerge)233 MergeThread::MergeThread(const ConcurrentMergeSchedulerPtr& merger, const IndexWriterPtr& writer, const OneMergePtr& startMerge) {
234 this->_merger = merger;
235 this->_writer = writer;
236 this->startMerge = startMerge;
237 }
238
~MergeThread()239 MergeThread::~MergeThread() {
240 }
241
setRunningMerge(const OneMergePtr & merge)242 void MergeThread::setRunningMerge(const OneMergePtr& merge) {
243 ConcurrentMergeSchedulerPtr merger(_merger);
244 SyncLock syncLock(merger);
245 runningMerge = merge;
246 }
247
getRunningMerge()248 OneMergePtr MergeThread::getRunningMerge() {
249 ConcurrentMergeSchedulerPtr merger(_merger);
250 SyncLock syncLock(merger);
251 return runningMerge;
252 }
253
setThreadPriority(int32_t pri)254 void MergeThread::setThreadPriority(int32_t pri) {
255 try {
256 setPriority(pri);
257 } catch (...) {
258 }
259 }
260
run()261 void MergeThread::run() {
262 // First time through the while loop we do the merge that we were started with
263 OneMergePtr merge(this->startMerge);
264 ConcurrentMergeSchedulerPtr merger(_merger);
265
266 LuceneException finally;
267 try {
268 merger->message(L" merge thread: start");
269 IndexWriterPtr writer(_writer);
270
271 while (true) {
272 setRunningMerge(merge);
273 merger->doMerge(merge);
274
275 // Subsequent times through the loop we do any new merge that writer says is necessary
276 merge = writer->getNextMerge();
277 if (merge) {
278 writer->mergeInit(merge);
279 merger->message(L" merge thread: do another merge " + merge->segString(merger->dir));
280 } else {
281 break;
282 }
283 }
284
285 merger->message(L" merge thread: done");
286 } catch (MergeAbortedException&) {
287 // Ignore the exception if it was due to abort
288 } catch (LuceneException& e) {
289 if (!merger->suppressExceptions) {
290 // suppressExceptions is normally only set during testing.
291 merger->anyExceptions = true;
292 merger->handleMergeException(e);
293 } else {
294 finally = e;
295 }
296 }
297
298 {
299 SyncLock syncLock(merger);
300 merger->notifyAll();
301
302 bool removed = merger->mergeThreads.remove(shared_from_this());
303 BOOST_ASSERT(removed);
304 }
305 finally.throwException();
306 }
307
308 }
309