1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include <boost/algorithm/string.hpp>
9 #include "IndexFileDeleter.h"
10 #include "IndexFileNameFilter.h"
11 #include "IndexFileNames.h"
12 #include "IndexDeletionPolicy.h"
13 #include "SegmentInfos.h"
14 #include "SegmentInfo.h"
15 #include "Directory.h"
16 #include "DocumentsWriter.h"
17 #include "InfoStream.h"
18 #include "DateTools.h"
19 #include "LuceneThread.h"
20 #include "MiscUtils.h"
21 #include "StringUtils.h"
22
23 namespace Lucene {
24
25 /// Change to true to see details of reference counts when infoStream != null
26 bool IndexFileDeleter::VERBOSE_REF_COUNTS = false;
27
IndexFileDeleter(const DirectoryPtr & directory,const IndexDeletionPolicyPtr & policy,const SegmentInfosPtr & segmentInfos,const InfoStreamPtr & infoStream,const DocumentsWriterPtr & docWriter,HashSet<String> synced)28 IndexFileDeleter::IndexFileDeleter(const DirectoryPtr& directory, const IndexDeletionPolicyPtr& policy, const SegmentInfosPtr& segmentInfos, const InfoStreamPtr& infoStream, const DocumentsWriterPtr& docWriter, HashSet<String> synced) {
29 this->lastFiles = Collection< HashSet<String> >::newInstance();
30 this->commits = Collection<IndexCommitPtr>::newInstance();
31 this->commitsToDelete = Collection<CommitPointPtr>::newInstance();
32 this->refCounts = MapStringRefCount::newInstance();
33 this->docWriter = docWriter;
34 this->infoStream = infoStream;
35 this->synced = synced;
36
37 if (infoStream) {
38 message(L"init: current segments file is \"" + segmentInfos->getCurrentSegmentFileName());
39 }
40
41 this->policy = policy;
42 this->directory = directory;
43
44 // First pass: walk the files and initialize our ref counts
45 int64_t currentGen = segmentInfos->getGeneration();
46 IndexFileNameFilterPtr filter(IndexFileNameFilter::getFilter());
47
48 HashSet<String> files(directory->listAll());
49 CommitPointPtr currentCommitPoint;
50
51 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
52 if (filter->accept(L"", *fileName) && *fileName != IndexFileNames::SEGMENTS_GEN()) {
53 // Add this file to refCounts with initial count 0
54 getRefCount(*fileName);
55
56 if (boost::starts_with(*fileName, IndexFileNames::SEGMENTS())) {
57 // This is a commit (segments or segments_N), and it's valid (<= the max gen).
58 // Load it, then incref all files it refers to
59 if (infoStream) {
60 message(L"init: load commit \"" + *fileName + L"\"");
61 }
62 SegmentInfosPtr sis(newLucene<SegmentInfos>());
63 try {
64 sis->read(directory, *fileName);
65 } catch (IOException& e) {
66 if (SegmentInfos::generationFromSegmentsFileName(*fileName) <= currentGen) {
67 boost::throw_exception(e);
68 } else {
69 // Most likely we are opening an index that has an aborted "future" commit,
70 // so suppress exc in this case
71 sis.reset();
72 }
73 } catch (...) {
74 if (infoStream) {
75 message(L"init: hit exception when loading commit \"" + *fileName + L"\"; skipping this commit point");
76 }
77 sis.reset();
78 }
79 if (sis) {
80 CommitPointPtr commitPoint(newLucene<CommitPoint>(commitsToDelete, directory, sis));
81 if (sis->getGeneration() == segmentInfos->getGeneration()) {
82 currentCommitPoint = commitPoint;
83 }
84 commits.add(commitPoint);
85 incRef(sis, true);
86
87 if (!lastSegmentInfos || sis->getGeneration() > lastSegmentInfos->getGeneration()) {
88 lastSegmentInfos = sis;
89 }
90 }
91 }
92 }
93 }
94
95 if (!currentCommitPoint) {
96 // We did not in fact see the segments_N file corresponding to the segmentInfos that was passed
97 // in. Yet, it must exist, because our caller holds the write lock. This can happen when the
98 // directory listing was stale (eg when index accessed via NFS client with stale directory listing
99 // cache). So we try now to explicitly open this commit point.
100 SegmentInfosPtr sis(newLucene<SegmentInfos>());
101 try {
102 sis->read(directory, segmentInfos->getCurrentSegmentFileName());
103 } catch (LuceneException&) {
104 boost::throw_exception(CorruptIndexException(L"failed to locate current segments_N file"));
105 }
106 if (infoStream) {
107 message(L"forced open of current segments file " + segmentInfos->getCurrentSegmentFileName());
108 }
109 currentCommitPoint = newLucene<CommitPoint>(commitsToDelete, directory, sis);
110 commits.add(currentCommitPoint);
111 incRef(sis, true);
112 }
113
114 // We keep commits list in sorted order (oldest to newest)
115 std::sort(commits.begin(), commits.end(), luceneCompare<IndexCommitPtr>());
116
117 // Now delete anything with ref count at 0. These are presumably abandoned files eg due to crash of IndexWriter.
118 for (MapStringRefCount::iterator entry = refCounts.begin(); entry != refCounts.end(); ++entry) {
119 if (entry->second->count == 0) {
120 if (infoStream) {
121 message(L"init: removing unreferenced file \"" + entry->first + L"\"");
122 }
123 deleteFile(entry->first);
124 }
125 }
126
127 // Finally, give policy a chance to remove things on startup
128 policy->onInit(commits);
129
130 // Always protect the incoming segmentInfos since sometime it may not be the most recent commit
131 checkpoint(segmentInfos, false);
132
133 startingCommitDeleted = currentCommitPoint->isDeleted();
134
135 deleteCommits();
136 }
137
~IndexFileDeleter()138 IndexFileDeleter::~IndexFileDeleter() {
139 }
140
setInfoStream(const InfoStreamPtr & infoStream)141 void IndexFileDeleter::setInfoStream(const InfoStreamPtr& infoStream) {
142 this->infoStream = infoStream;
143 }
144
message(const String & message)145 void IndexFileDeleter::message(const String& message) {
146 if (infoStream) {
147 *infoStream << L"IFD [" << DateTools::timeToString(MiscUtils::currentTimeMillis(), DateTools::RESOLUTION_SECOND);
148 *infoStream << L"; " << StringUtils::toString(LuceneThread::currentId()) << L"]: " << message << L"\n";
149 }
150 }
151
getLastSegmentInfos()152 SegmentInfosPtr IndexFileDeleter::getLastSegmentInfos() {
153 return lastSegmentInfos;
154 }
155
deleteCommits()156 void IndexFileDeleter::deleteCommits() {
157 if (!commitsToDelete.empty()) {
158 // First decref all files that had been referred to by the now-deleted commits
159 for (Collection<CommitPointPtr>::iterator commit = commitsToDelete.begin(); commit != commitsToDelete.end(); ++commit) {
160 if (infoStream) {
161 message(L"deleteCommits: now decRef commit \"" + (*commit)->getSegmentsFileName() + L"\"");
162 }
163 for (HashSet<String>::iterator file = (*commit)->files.begin(); file != (*commit)->files.end(); ++file) {
164 decRef(*file);
165 }
166 }
167 commitsToDelete.clear();
168
169 // Now compact commits to remove deleted ones (preserving the sort)
170 int32_t size = commits.size();
171 int32_t readFrom = 0;
172 int32_t writeTo = 0;
173 while (readFrom < size) {
174 CommitPointPtr commit(boost::dynamic_pointer_cast<CommitPoint>(commits[readFrom]));
175 if (!commit->deleted) {
176 if (writeTo != readFrom) {
177 commits[writeTo] = commits[readFrom];
178 }
179 ++writeTo;
180 }
181 ++readFrom;
182 }
183
184 while (size > writeTo) {
185 commits.removeLast();
186 --size;
187 }
188 }
189 }
190
refresh(const String & segmentName)191 void IndexFileDeleter::refresh(const String& segmentName) {
192 HashSet<String> files(directory->listAll());
193 IndexFileNameFilterPtr filter(IndexFileNameFilter::getFilter());
194 String segmentPrefix1(segmentName + L".");
195 String segmentPrefix2(segmentName + L"_");
196
197 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
198 if (filter->accept(L"", *fileName) &&
199 (segmentName.empty() || boost::starts_with(*fileName, segmentPrefix1) || boost::starts_with(*fileName, segmentPrefix2)) &&
200 !refCounts.contains(*fileName) && *fileName != IndexFileNames::SEGMENTS_GEN()) {
201 // Unreferenced file, so remove it
202 if (infoStream) {
203 message(L"refresh [prefix=" + segmentName + L"]: removing newly created unreferenced file \"" + *fileName + L"\"");
204 }
205 deleteFile(*fileName);
206 }
207 }
208 }
209
refresh()210 void IndexFileDeleter::refresh() {
211 refresh(L"");
212 }
213
close()214 void IndexFileDeleter::close() {
215 // DecRef old files from the last checkpoint, if any
216 for (Collection< HashSet<String> >::iterator file = lastFiles.begin(); file != lastFiles.end(); ++file) {
217 decRef(*file);
218 }
219 lastFiles.clear();
220 deletePendingFiles();
221 }
222
deletePendingFiles()223 void IndexFileDeleter::deletePendingFiles() {
224 if (deletable) {
225 HashSet<String> oldDeletable(deletable);
226 deletable.reset();
227 for (HashSet<String>::iterator fileName = oldDeletable.begin(); fileName != oldDeletable.end(); ++fileName) {
228 if (infoStream) {
229 message(L"delete pending file " + *fileName);
230 }
231 deleteFile(*fileName);
232 }
233 }
234 }
235
checkpoint(const SegmentInfosPtr & segmentInfos,bool isCommit)236 void IndexFileDeleter::checkpoint(const SegmentInfosPtr& segmentInfos, bool isCommit) {
237 if (infoStream) {
238 message(L"now checkpoint \"" + segmentInfos->getCurrentSegmentFileName() + L"\" [" + StringUtils::toString(segmentInfos->size()) + L" segments; isCommit = " + StringUtils::toString(isCommit) + L"]");
239 }
240
241 // Try again now to delete any previously un-deletable files (because they were in use, on Windows)
242 deletePendingFiles();
243
244 // Incref the files
245 incRef(segmentInfos, isCommit);
246
247 if (isCommit) {
248 // Append to our commits list
249 commits.add(newLucene<CommitPoint>(commitsToDelete, directory, segmentInfos));
250
251 // Tell policy so it can remove commits
252 policy->onCommit(commits);
253
254 // Decref files for commits that were deleted by the policy
255 deleteCommits();
256 } else {
257 HashSet<String> docWriterFiles;
258 if (docWriter) {
259 docWriterFiles = docWriter->openFiles();
260 if (docWriterFiles) {
261 // We must incRef these files before decRef'ing last files to make sure we
262 // don't accidentally delete them
263 incRef(docWriterFiles);
264 }
265 }
266
267 // DecRef old files from the last checkpoint, if any
268 for (Collection< HashSet<String> >::iterator file = lastFiles.begin(); file != lastFiles.end(); ++file) {
269 decRef(*file);
270 }
271 lastFiles.clear();
272
273 // Save files so we can decr on next checkpoint/commit
274 lastFiles.add(segmentInfos->files(directory, false));
275
276 if (docWriterFiles) {
277 lastFiles.add(docWriterFiles);
278 }
279 }
280 }
281
incRef(const SegmentInfosPtr & segmentInfos,bool isCommit)282 void IndexFileDeleter::incRef(const SegmentInfosPtr& segmentInfos, bool isCommit) {
283 // If this is a commit point, also incRef the segments_N file
284 HashSet<String> files(segmentInfos->files(directory, isCommit));
285 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
286 incRef(*fileName);
287 }
288 }
289
incRef(HashSet<String> files)290 void IndexFileDeleter::incRef(HashSet<String> files) {
291 for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
292 incRef(*file);
293 }
294 }
295
incRef(const String & fileName)296 void IndexFileDeleter::incRef(const String& fileName) {
297 RefCountPtr rc(getRefCount(fileName));
298 if (infoStream && VERBOSE_REF_COUNTS) {
299 message(L" IncRef \"" + fileName + L"\": pre-incr count is " + StringUtils::toString(rc->count));
300 }
301 rc->IncRef();
302 }
303
decRef(HashSet<String> files)304 void IndexFileDeleter::decRef(HashSet<String> files) {
305 for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
306 decRef(*file);
307 }
308 }
309
decRef(const String & fileName)310 void IndexFileDeleter::decRef(const String& fileName) {
311 RefCountPtr rc(getRefCount(fileName));
312 if (infoStream && VERBOSE_REF_COUNTS) {
313 message(L" DecRef \"" + fileName + L"\": pre-decr count is " + StringUtils::toString(rc->count));
314 }
315 if (rc->DecRef() == 0) {
316 // This file is no longer referenced by any past commit points nor by the in-memory SegmentInfos
317 deleteFile(fileName);
318 refCounts.remove(fileName);
319
320 if (synced) {
321 SyncLock syncLock(&synced);
322 synced.remove(fileName);
323 }
324 }
325 }
326
decRef(const SegmentInfosPtr & segmentInfos)327 void IndexFileDeleter::decRef(const SegmentInfosPtr& segmentInfos) {
328 decRef(segmentInfos->files(directory, false));
329 }
330
exists(const String & fileName)331 bool IndexFileDeleter::exists(const String& fileName) {
332 return refCounts.contains(fileName) ? getRefCount(fileName)->count > 0 : false;
333 }
334
getRefCount(const String & fileName)335 RefCountPtr IndexFileDeleter::getRefCount(const String& fileName) {
336 RefCountPtr rc;
337 MapStringRefCount::iterator ref = refCounts.find(fileName);
338 if (ref == refCounts.end()) {
339 rc = newLucene<RefCount>(fileName);
340 refCounts.put(fileName, rc);
341 } else {
342 rc = ref->second;
343 }
344 return rc;
345 }
346
deleteFiles(HashSet<String> files)347 void IndexFileDeleter::deleteFiles(HashSet<String> files) {
348 for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
349 deleteFile(*file);
350 }
351 }
352
deleteNewFiles(HashSet<String> files)353 void IndexFileDeleter::deleteNewFiles(HashSet<String> files) {
354 for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
355 if (!refCounts.contains(*fileName)) {
356 if (infoStream) {
357 message(L"delete new file \"" + *fileName + L"\"");
358 }
359 deleteFile(*fileName);
360 }
361 }
362 }
363
deleteFile(const String & fileName)364 void IndexFileDeleter::deleteFile(const String& fileName) {
365 try {
366 if (infoStream) {
367 message(L"delete \"" + fileName + L"\"");
368 }
369 directory->deleteFile(fileName);
370 } catch (IOException& e) { // if delete fails
371 if (directory->fileExists(fileName)) { // if delete fails
372 // Some operating systems (eg. Windows) don't permit a file to be deleted while it is opened
373 // for read (eg. by another process or thread). So we assume that when a delete fails it is
374 // because the file is open in another process, and queue the file for subsequent deletion.
375 if (infoStream) {
376 message(L"IndexFileDeleter: unable to remove file \"" + fileName + L"\": " + e.getError() + L"; Will re-try later.");
377 }
378 if (!deletable) {
379 deletable = HashSet<String>::newInstance();
380 }
381 deletable.add(fileName); // add to deletable
382 }
383 }
384 }
385
RefCount(const String & fileName)386 RefCount::RefCount(const String& fileName) {
387 initDone = false;
388 count = 0;
389 this->fileName = fileName;
390 }
391
~RefCount()392 RefCount::~RefCount() {
393 }
394
IncRef()395 int32_t RefCount::IncRef() {
396 if (!initDone) {
397 initDone = true;
398 } else {
399 BOOST_ASSERT(count > 0);
400 }
401 return ++count;
402 }
403
DecRef()404 int32_t RefCount::DecRef() {
405 BOOST_ASSERT(count > 0);
406 return --count;
407 }
408
CommitPoint(Collection<CommitPointPtr> commitsToDelete,const DirectoryPtr & directory,const SegmentInfosPtr & segmentInfos)409 CommitPoint::CommitPoint(Collection<CommitPointPtr> commitsToDelete, const DirectoryPtr& directory, const SegmentInfosPtr& segmentInfos) {
410 deleted = false;
411
412 this->directory = directory;
413 this->commitsToDelete = commitsToDelete;
414 userData = segmentInfos->getUserData();
415 segmentsFileName = segmentInfos->getCurrentSegmentFileName();
416 version = segmentInfos->getVersion();
417 generation = segmentInfos->getGeneration();
418 HashSet<String> files(segmentInfos->files(directory, true));
419 this->files = HashSet<String>::newInstance(files.begin(), files.end());
420 gen = segmentInfos->getGeneration();
421 _isOptimized = (segmentInfos->size() == 1 && !segmentInfos->info(0)->hasDeletions());
422
423 BOOST_ASSERT(!segmentInfos->hasExternalSegments(directory));
424 }
425
~CommitPoint()426 CommitPoint::~CommitPoint() {
427 }
428
toString()429 String CommitPoint::toString() {
430 return L"IndexFileDeleter::CommitPoint(" + segmentsFileName + L")";
431 }
432
isOptimized()433 bool CommitPoint::isOptimized() {
434 return _isOptimized;
435 }
436
getSegmentsFileName()437 String CommitPoint::getSegmentsFileName() {
438 return segmentsFileName;
439 }
440
getFileNames()441 HashSet<String> CommitPoint::getFileNames() {
442 return files;
443 }
444
getDirectory()445 DirectoryPtr CommitPoint::getDirectory() {
446 return directory;
447 }
448
getVersion()449 int64_t CommitPoint::getVersion() {
450 return version;
451 }
452
getGeneration()453 int64_t CommitPoint::getGeneration() {
454 return generation;
455 }
456
getUserData()457 MapStringString CommitPoint::getUserData() {
458 return userData;
459 }
460
deleteCommit()461 void CommitPoint::deleteCommit() {
462 if (!deleted) {
463 deleted = true;
464 commitsToDelete.add(shared_from_this());
465 }
466 }
467
isDeleted()468 bool CommitPoint::isDeleted() {
469 return deleted;
470 }
471
compareTo(const LuceneObjectPtr & other)472 int32_t CommitPoint::compareTo(const LuceneObjectPtr& other) {
473 CommitPointPtr otherCommit(boost::static_pointer_cast<CommitPoint>(other));
474 if (gen < otherCommit->gen) {
475 return -1;
476 }
477 if (gen > otherCommit->gen) {
478 return 1;
479 }
480 return 0;
481 }
482
483 }
484