1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include <boost/algorithm/string.hpp>
9 #include "IndexFileDeleter.h"
10 #include "IndexFileNameFilter.h"
11 #include "IndexFileNames.h"
12 #include "IndexDeletionPolicy.h"
13 #include "SegmentInfos.h"
14 #include "SegmentInfo.h"
15 #include "Directory.h"
16 #include "DocumentsWriter.h"
17 #include "InfoStream.h"
18 #include "DateTools.h"
19 #include "LuceneThread.h"
20 #include "MiscUtils.h"
21 #include "StringUtils.h"
22 
23 namespace Lucene {
24 
25 /// Change to true to see details of reference counts when infoStream != null
26 bool IndexFileDeleter::VERBOSE_REF_COUNTS = false;
27 
IndexFileDeleter(const DirectoryPtr & directory,const IndexDeletionPolicyPtr & policy,const SegmentInfosPtr & segmentInfos,const InfoStreamPtr & infoStream,const DocumentsWriterPtr & docWriter,HashSet<String> synced)28 IndexFileDeleter::IndexFileDeleter(const DirectoryPtr& directory, const IndexDeletionPolicyPtr& policy, const SegmentInfosPtr& segmentInfos, const InfoStreamPtr& infoStream, const DocumentsWriterPtr& docWriter, HashSet<String> synced) {
29     this->lastFiles = Collection< HashSet<String> >::newInstance();
30     this->commits = Collection<IndexCommitPtr>::newInstance();
31     this->commitsToDelete = Collection<CommitPointPtr>::newInstance();
32     this->refCounts = MapStringRefCount::newInstance();
33     this->docWriter = docWriter;
34     this->infoStream = infoStream;
35     this->synced = synced;
36 
37     if (infoStream) {
38         message(L"init: current segments file is \"" + segmentInfos->getCurrentSegmentFileName());
39     }
40 
41     this->policy = policy;
42     this->directory = directory;
43 
44     // First pass: walk the files and initialize our ref counts
45     int64_t currentGen = segmentInfos->getGeneration();
46     IndexFileNameFilterPtr filter(IndexFileNameFilter::getFilter());
47 
48     HashSet<String> files(directory->listAll());
49     CommitPointPtr currentCommitPoint;
50 
51     for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
52         if (filter->accept(L"", *fileName) && *fileName != IndexFileNames::SEGMENTS_GEN()) {
53             // Add this file to refCounts with initial count 0
54             getRefCount(*fileName);
55 
56             if (boost::starts_with(*fileName, IndexFileNames::SEGMENTS())) {
57                 // This is a commit (segments or segments_N), and it's valid (<= the max gen).
58                 // Load it, then incref all files it refers to
59                 if (infoStream) {
60                     message(L"init: load commit \"" + *fileName + L"\"");
61                 }
62                 SegmentInfosPtr sis(newLucene<SegmentInfos>());
63                 try {
64                     sis->read(directory, *fileName);
65                 } catch (IOException& e) {
66                     if (SegmentInfos::generationFromSegmentsFileName(*fileName) <= currentGen) {
67                         boost::throw_exception(e);
68                     } else {
69                         // Most likely we are opening an index that has an aborted "future" commit,
70                         // so suppress exc in this case
71                         sis.reset();
72                     }
73                 } catch (...) {
74                     if (infoStream) {
75                         message(L"init: hit exception when loading commit \"" + *fileName + L"\"; skipping this commit point");
76                     }
77                     sis.reset();
78                 }
79                 if (sis) {
80                     CommitPointPtr commitPoint(newLucene<CommitPoint>(commitsToDelete, directory, sis));
81                     if (sis->getGeneration() == segmentInfos->getGeneration()) {
82                         currentCommitPoint = commitPoint;
83                     }
84                     commits.add(commitPoint);
85                     incRef(sis, true);
86 
87                     if (!lastSegmentInfos || sis->getGeneration() > lastSegmentInfos->getGeneration()) {
88                         lastSegmentInfos = sis;
89                     }
90                 }
91             }
92         }
93     }
94 
95     if (!currentCommitPoint) {
96         // We did not in fact see the segments_N file corresponding to the segmentInfos that was passed
97         // in.  Yet, it must exist, because our caller holds the write lock.  This can happen when the
98         // directory listing was stale (eg when index accessed via NFS client with stale directory listing
99         // cache).  So we try now to explicitly open this commit point.
100         SegmentInfosPtr sis(newLucene<SegmentInfos>());
101         try {
102             sis->read(directory, segmentInfos->getCurrentSegmentFileName());
103         } catch (LuceneException&) {
104             boost::throw_exception(CorruptIndexException(L"failed to locate current segments_N file"));
105         }
106         if (infoStream) {
107             message(L"forced open of current segments file " + segmentInfos->getCurrentSegmentFileName());
108         }
109         currentCommitPoint = newLucene<CommitPoint>(commitsToDelete, directory, sis);
110         commits.add(currentCommitPoint);
111         incRef(sis, true);
112     }
113 
114     // We keep commits list in sorted order (oldest to newest)
115     std::sort(commits.begin(), commits.end(), luceneCompare<IndexCommitPtr>());
116 
117     // Now delete anything with ref count at 0.  These are presumably abandoned files eg due to crash of IndexWriter.
118     for (MapStringRefCount::iterator entry = refCounts.begin(); entry != refCounts.end(); ++entry) {
119         if (entry->second->count == 0) {
120             if (infoStream) {
121                 message(L"init: removing unreferenced file \"" + entry->first + L"\"");
122             }
123             deleteFile(entry->first);
124         }
125     }
126 
127     // Finally, give policy a chance to remove things on startup
128     policy->onInit(commits);
129 
130     // Always protect the incoming segmentInfos since sometime it may not be the most recent commit
131     checkpoint(segmentInfos, false);
132 
133     startingCommitDeleted = currentCommitPoint->isDeleted();
134 
135     deleteCommits();
136 }
137 
~IndexFileDeleter()138 IndexFileDeleter::~IndexFileDeleter() {
139 }
140 
setInfoStream(const InfoStreamPtr & infoStream)141 void IndexFileDeleter::setInfoStream(const InfoStreamPtr& infoStream) {
142     this->infoStream = infoStream;
143 }
144 
message(const String & message)145 void IndexFileDeleter::message(const String& message) {
146     if (infoStream) {
147         *infoStream << L"IFD [" << DateTools::timeToString(MiscUtils::currentTimeMillis(), DateTools::RESOLUTION_SECOND);
148         *infoStream << L"; " << StringUtils::toString(LuceneThread::currentId()) << L"]: " << message << L"\n";
149     }
150 }
151 
getLastSegmentInfos()152 SegmentInfosPtr IndexFileDeleter::getLastSegmentInfos() {
153     return lastSegmentInfos;
154 }
155 
deleteCommits()156 void IndexFileDeleter::deleteCommits() {
157     if (!commitsToDelete.empty()) {
158         // First decref all files that had been referred to by the now-deleted commits
159         for (Collection<CommitPointPtr>::iterator commit = commitsToDelete.begin(); commit != commitsToDelete.end(); ++commit) {
160             if (infoStream) {
161                 message(L"deleteCommits: now decRef commit \"" + (*commit)->getSegmentsFileName() + L"\"");
162             }
163             for (HashSet<String>::iterator file = (*commit)->files.begin(); file != (*commit)->files.end(); ++file) {
164                 decRef(*file);
165             }
166         }
167         commitsToDelete.clear();
168 
169         // Now compact commits to remove deleted ones (preserving the sort)
170         int32_t size = commits.size();
171         int32_t readFrom = 0;
172         int32_t writeTo = 0;
173         while (readFrom < size) {
174             CommitPointPtr commit(boost::dynamic_pointer_cast<CommitPoint>(commits[readFrom]));
175             if (!commit->deleted) {
176                 if (writeTo != readFrom) {
177                     commits[writeTo] = commits[readFrom];
178                 }
179                 ++writeTo;
180             }
181             ++readFrom;
182         }
183 
184         while (size > writeTo) {
185             commits.removeLast();
186             --size;
187         }
188     }
189 }
190 
refresh(const String & segmentName)191 void IndexFileDeleter::refresh(const String& segmentName) {
192     HashSet<String> files(directory->listAll());
193     IndexFileNameFilterPtr filter(IndexFileNameFilter::getFilter());
194     String segmentPrefix1(segmentName + L".");
195     String segmentPrefix2(segmentName + L"_");
196 
197     for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
198         if (filter->accept(L"", *fileName) &&
199                 (segmentName.empty() || boost::starts_with(*fileName, segmentPrefix1) || boost::starts_with(*fileName, segmentPrefix2)) &&
200                 !refCounts.contains(*fileName) && *fileName != IndexFileNames::SEGMENTS_GEN()) {
201             // Unreferenced file, so remove it
202             if (infoStream) {
203                 message(L"refresh [prefix=" + segmentName + L"]: removing newly created unreferenced file \"" + *fileName + L"\"");
204             }
205             deleteFile(*fileName);
206         }
207     }
208 }
209 
refresh()210 void IndexFileDeleter::refresh() {
211     refresh(L"");
212 }
213 
close()214 void IndexFileDeleter::close() {
215     // DecRef old files from the last checkpoint, if any
216     for (Collection< HashSet<String> >::iterator file = lastFiles.begin(); file != lastFiles.end(); ++file) {
217         decRef(*file);
218     }
219     lastFiles.clear();
220     deletePendingFiles();
221 }
222 
deletePendingFiles()223 void IndexFileDeleter::deletePendingFiles() {
224     if (deletable) {
225         HashSet<String> oldDeletable(deletable);
226         deletable.reset();
227         for (HashSet<String>::iterator fileName = oldDeletable.begin(); fileName != oldDeletable.end(); ++fileName) {
228             if (infoStream) {
229                 message(L"delete pending file " + *fileName);
230             }
231             deleteFile(*fileName);
232         }
233     }
234 }
235 
checkpoint(const SegmentInfosPtr & segmentInfos,bool isCommit)236 void IndexFileDeleter::checkpoint(const SegmentInfosPtr& segmentInfos, bool isCommit) {
237     if (infoStream) {
238         message(L"now checkpoint \"" + segmentInfos->getCurrentSegmentFileName() + L"\" [" + StringUtils::toString(segmentInfos->size()) + L" segments; isCommit = " + StringUtils::toString(isCommit) + L"]");
239     }
240 
241     // Try again now to delete any previously un-deletable files (because they were in use, on Windows)
242     deletePendingFiles();
243 
244     // Incref the files
245     incRef(segmentInfos, isCommit);
246 
247     if (isCommit) {
248         // Append to our commits list
249         commits.add(newLucene<CommitPoint>(commitsToDelete, directory, segmentInfos));
250 
251         // Tell policy so it can remove commits
252         policy->onCommit(commits);
253 
254         // Decref files for commits that were deleted by the policy
255         deleteCommits();
256     } else {
257         HashSet<String> docWriterFiles;
258         if (docWriter) {
259             docWriterFiles = docWriter->openFiles();
260             if (docWriterFiles) {
261                 // We must incRef these files before decRef'ing last files to make sure we
262                 // don't accidentally delete them
263                 incRef(docWriterFiles);
264             }
265         }
266 
267         // DecRef old files from the last checkpoint, if any
268         for (Collection< HashSet<String> >::iterator file = lastFiles.begin(); file != lastFiles.end(); ++file) {
269             decRef(*file);
270         }
271         lastFiles.clear();
272 
273         // Save files so we can decr on next checkpoint/commit
274         lastFiles.add(segmentInfos->files(directory, false));
275 
276         if (docWriterFiles) {
277             lastFiles.add(docWriterFiles);
278         }
279     }
280 }
281 
incRef(const SegmentInfosPtr & segmentInfos,bool isCommit)282 void IndexFileDeleter::incRef(const SegmentInfosPtr& segmentInfos, bool isCommit) {
283     // If this is a commit point, also incRef the segments_N file
284     HashSet<String> files(segmentInfos->files(directory, isCommit));
285     for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
286         incRef(*fileName);
287     }
288 }
289 
incRef(HashSet<String> files)290 void IndexFileDeleter::incRef(HashSet<String> files) {
291     for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
292         incRef(*file);
293     }
294 }
295 
incRef(const String & fileName)296 void IndexFileDeleter::incRef(const String& fileName) {
297     RefCountPtr rc(getRefCount(fileName));
298     if (infoStream && VERBOSE_REF_COUNTS) {
299         message(L"  IncRef \"" + fileName + L"\": pre-incr count is " + StringUtils::toString(rc->count));
300     }
301     rc->IncRef();
302 }
303 
decRef(HashSet<String> files)304 void IndexFileDeleter::decRef(HashSet<String> files) {
305     for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
306         decRef(*file);
307     }
308 }
309 
decRef(const String & fileName)310 void IndexFileDeleter::decRef(const String& fileName) {
311     RefCountPtr rc(getRefCount(fileName));
312     if (infoStream && VERBOSE_REF_COUNTS) {
313         message(L"  DecRef \"" + fileName + L"\": pre-decr count is " + StringUtils::toString(rc->count));
314     }
315     if (rc->DecRef() == 0) {
316         // This file is no longer referenced by any past commit points nor by the in-memory SegmentInfos
317         deleteFile(fileName);
318         refCounts.remove(fileName);
319 
320         if (synced) {
321             SyncLock syncLock(&synced);
322             synced.remove(fileName);
323         }
324     }
325 }
326 
decRef(const SegmentInfosPtr & segmentInfos)327 void IndexFileDeleter::decRef(const SegmentInfosPtr& segmentInfos) {
328     decRef(segmentInfos->files(directory, false));
329 }
330 
exists(const String & fileName)331 bool IndexFileDeleter::exists(const String& fileName) {
332     return refCounts.contains(fileName) ? getRefCount(fileName)->count > 0 : false;
333 }
334 
getRefCount(const String & fileName)335 RefCountPtr IndexFileDeleter::getRefCount(const String& fileName) {
336     RefCountPtr rc;
337     MapStringRefCount::iterator ref = refCounts.find(fileName);
338     if (ref == refCounts.end()) {
339         rc = newLucene<RefCount>(fileName);
340         refCounts.put(fileName, rc);
341     } else {
342         rc = ref->second;
343     }
344     return rc;
345 }
346 
deleteFiles(HashSet<String> files)347 void IndexFileDeleter::deleteFiles(HashSet<String> files) {
348     for (HashSet<String>::iterator file = files.begin(); file != files.end(); ++file) {
349         deleteFile(*file);
350     }
351 }
352 
deleteNewFiles(HashSet<String> files)353 void IndexFileDeleter::deleteNewFiles(HashSet<String> files) {
354     for (HashSet<String>::iterator fileName = files.begin(); fileName != files.end(); ++fileName) {
355         if (!refCounts.contains(*fileName)) {
356             if (infoStream) {
357                 message(L"delete new file \"" + *fileName + L"\"");
358             }
359             deleteFile(*fileName);
360         }
361     }
362 }
363 
deleteFile(const String & fileName)364 void IndexFileDeleter::deleteFile(const String& fileName) {
365     try {
366         if (infoStream) {
367             message(L"delete \"" + fileName + L"\"");
368         }
369         directory->deleteFile(fileName);
370     } catch (IOException& e) { // if delete fails
371         if (directory->fileExists(fileName)) { // if delete fails
372             // Some operating systems (eg. Windows) don't permit a file to be deleted while it is opened
373             // for read (eg. by another process or thread). So we assume that when a delete fails it is
374             // because the file is open in another process, and queue the file for subsequent deletion.
375             if (infoStream) {
376                 message(L"IndexFileDeleter: unable to remove file \"" + fileName + L"\": " + e.getError() + L"; Will re-try later.");
377             }
378             if (!deletable) {
379                 deletable = HashSet<String>::newInstance();
380             }
381             deletable.add(fileName); // add to deletable
382         }
383     }
384 }
385 
RefCount(const String & fileName)386 RefCount::RefCount(const String& fileName) {
387     initDone = false;
388     count = 0;
389     this->fileName = fileName;
390 }
391 
~RefCount()392 RefCount::~RefCount() {
393 }
394 
IncRef()395 int32_t RefCount::IncRef() {
396     if (!initDone) {
397         initDone = true;
398     } else {
399         BOOST_ASSERT(count > 0);
400     }
401     return ++count;
402 }
403 
DecRef()404 int32_t RefCount::DecRef() {
405     BOOST_ASSERT(count > 0);
406     return --count;
407 }
408 
CommitPoint(Collection<CommitPointPtr> commitsToDelete,const DirectoryPtr & directory,const SegmentInfosPtr & segmentInfos)409 CommitPoint::CommitPoint(Collection<CommitPointPtr> commitsToDelete, const DirectoryPtr& directory, const SegmentInfosPtr& segmentInfos) {
410     deleted = false;
411 
412     this->directory = directory;
413     this->commitsToDelete = commitsToDelete;
414     userData = segmentInfos->getUserData();
415     segmentsFileName = segmentInfos->getCurrentSegmentFileName();
416     version = segmentInfos->getVersion();
417     generation = segmentInfos->getGeneration();
418     HashSet<String> files(segmentInfos->files(directory, true));
419     this->files = HashSet<String>::newInstance(files.begin(), files.end());
420     gen = segmentInfos->getGeneration();
421     _isOptimized = (segmentInfos->size() == 1 && !segmentInfos->info(0)->hasDeletions());
422 
423     BOOST_ASSERT(!segmentInfos->hasExternalSegments(directory));
424 }
425 
~CommitPoint()426 CommitPoint::~CommitPoint() {
427 }
428 
toString()429 String CommitPoint::toString() {
430     return L"IndexFileDeleter::CommitPoint(" + segmentsFileName + L")";
431 }
432 
isOptimized()433 bool CommitPoint::isOptimized() {
434     return _isOptimized;
435 }
436 
getSegmentsFileName()437 String CommitPoint::getSegmentsFileName() {
438     return segmentsFileName;
439 }
440 
getFileNames()441 HashSet<String> CommitPoint::getFileNames() {
442     return files;
443 }
444 
getDirectory()445 DirectoryPtr CommitPoint::getDirectory() {
446     return directory;
447 }
448 
getVersion()449 int64_t CommitPoint::getVersion() {
450     return version;
451 }
452 
getGeneration()453 int64_t CommitPoint::getGeneration() {
454     return generation;
455 }
456 
getUserData()457 MapStringString CommitPoint::getUserData() {
458     return userData;
459 }
460 
deleteCommit()461 void CommitPoint::deleteCommit() {
462     if (!deleted) {
463         deleted = true;
464         commitsToDelete.add(shared_from_this());
465     }
466 }
467 
isDeleted()468 bool CommitPoint::isDeleted() {
469     return deleted;
470 }
471 
compareTo(const LuceneObjectPtr & other)472 int32_t CommitPoint::compareTo(const LuceneObjectPtr& other) {
473     CommitPointPtr otherCommit(boost::static_pointer_cast<CommitPoint>(other));
474     if (gen < otherCommit->gen) {
475         return -1;
476     }
477     if (gen > otherCommit->gen) {
478         return 1;
479     }
480     return 0;
481 }
482 
483 }
484