1 #include "CLucene/_ApiHeader.h"
2 #include "_IndexFileDeleter.h"
3 #include "_IndexFileNameFilter.h"
4 #include "_DocumentsWriter.h"
5 #include "_SegmentHeader.h"
6 #include "CLucene/store/Directory.h"
7 #include "CLucene/LuceneThreads.h"
8 #include <algorithm>
9 #include <assert.h>
10 #include <iostream>
11 
12 CL_NS_USE(store)
13 CL_NS_USE(util)
14 CL_NS_DEF(index)
15 
16 bool IndexFileDeleter::VERBOSE_REF_COUNTS = false;
17 
CommitPoint(IndexFileDeleter * _this,SegmentInfos * segmentInfos)18 IndexFileDeleter::CommitPoint::CommitPoint(IndexFileDeleter* _this, SegmentInfos* segmentInfos){
19   this->_this = _this;
20   this->deleted = false;
21   this->gen = 0;
22 	segmentsFileName = segmentInfos->getCurrentSegmentFileName();
23 	int32_t size = segmentInfos->size();
24 	files.push_back(segmentsFileName);
25 	gen = segmentInfos->getGeneration();
26 	for(int32_t i=0;i<size;i++) {
27 	  SegmentInfo* segmentInfo = segmentInfos->info(i);
28 	  if (segmentInfo->dir == _this->directory) {
29       const vector<string>& ff = segmentInfo->files();
30       files.insert(files.end(),ff.begin(), ff.end());
31 	  }
32 	}
33 
34 }
~CommitPoint()35 IndexFileDeleter::CommitPoint::~CommitPoint(){
36 }
37 
38 /**
39 * Get the segments_N file for this commit point32_t.
40 */
getSegmentsFileName()41 std::string IndexFileDeleter::CommitPoint::getSegmentsFileName() {
42 	return segmentsFileName;
43 }
sort(IndexCommitPoint * elem1,IndexCommitPoint * elem2)44 bool IndexFileDeleter::CommitPoint::sort(IndexCommitPoint* elem1, IndexCommitPoint* elem2){
45   if (((CommitPoint*)elem1)->gen < ((CommitPoint*)elem2)->gen)
46     return true;
47   return false;
48 }
49 
getFileNames()50 const std::vector<std::string>& IndexFileDeleter::CommitPoint::getFileNames() {
51 	return files;
52 }
53 
54 /**
55 * Called only be the deletion policy, to remove this
56 * commit point32_t from the index.
57 */
deleteCommitPoint()58 void IndexFileDeleter::CommitPoint::deleteCommitPoint() {
59 	if (!deleted) {
60 	  deleted = true;
61 	  _this->commitsToDelete.push_back(this);
62 	}
63 }
64 
getClassName()65 const char* IndexFileDeleter::CommitPoint::getClassName(){
66   return "IndexFileDeleter::CommitPoint";
67 }
getObjectName() const68 const char* IndexFileDeleter::CommitPoint::getObjectName() const{
69   return getClassName();
70 }
compareTo(NamedObject * obj)71 int32_t IndexFileDeleter::CommitPoint::compareTo(NamedObject* obj) {
72   if ( obj->getObjectName() != CommitPoint::getClassName() )
73     return -1;
74 
75 	CommitPoint* commit = (CommitPoint*) obj;
76 	if (gen < commit->gen) {
77 	  return -1;
78 	} else if (gen > commit->gen) {
79 	  return 1;
80 	} else {
81 	  return 0;
82 	}
83 }
84 
setInfoStream(std::ostream * infoStream)85 void IndexFileDeleter::setInfoStream(std::ostream* infoStream) {
86 	this->infoStream = infoStream;
87 	if (infoStream != NULL){
88 		string msg = string("setInfoStream deletionPolicy=") + policy->getObjectName();
89 	  message( msg );
90 	}
91 }
92 
message(string message)93 void IndexFileDeleter::message(string message) {
94 	(*infoStream) << string("IFD [") << Misc::toString( _LUCENE_CURRTHREADID ) << string("]: ") << message << string("\n");
95 }
96 
97 
~IndexFileDeleter()98 IndexFileDeleter::~IndexFileDeleter(){
99   _CLDELETE(policy);
100   commitsToDelete.clear();
101   commits.clear();
102   refCounts.clear();
103 }
IndexFileDeleter(Directory * directory,IndexDeletionPolicy * policy,SegmentInfos * segmentInfos,std::ostream * infoStream,DocumentsWriter * docWriter)104 IndexFileDeleter::IndexFileDeleter(Directory* directory, IndexDeletionPolicy* policy,
105   SegmentInfos* segmentInfos, std::ostream* infoStream, DocumentsWriter* docWriter):
106   refCounts( RefCountsType(true,true) ), commits(CommitsType(true))
107 {
108 	this->docWriter = docWriter;
109 	this->infoStream = infoStream;
110 
111 	if (infoStream != NULL)
112 	  message( string("init: current segments file is \"") + segmentInfos->getCurrentSegmentFileName() + "\"; deletionPolicy=" + policy->getObjectName());
113 
114 	this->policy = policy;
115 	this->directory = directory;
116   CommitPoint* currentCommitPoint = NULL;
117 
118 	// First pass: walk the files and initialize our ref
119 	// counts:
120 	int64_t currentGen = segmentInfos->getGeneration();
121 	const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
122 
123 	vector<string> files;
124   if ( !directory->list(&files) )
125 	  _CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str());
126 
127 
128 	for(size_t i=0;i<files.size();i++) {
129 
130 	  string& fileName = files.at(i);
131 
132     if (filter->accept(NULL, fileName.c_str()) && fileName.compare(IndexFileNames::SEGMENTS_GEN) != 0) {
133 
134 	    // Add this file to refCounts with initial count 0:
135 	    getRefCount(fileName.c_str());
136 
137 	    if ( strncmp(fileName.c_str(), IndexFileNames::SEGMENTS, strlen(IndexFileNames::SEGMENTS)) == 0 ) {
138 
139 	      // This is a commit (segments or segments_N), and
140 	      // it's valid (<= the max gen).  Load it, then
141 	      // incref all files it refers to:
142 	      if (SegmentInfos::generationFromSegmentsFileName(fileName.c_str()) <= currentGen) {
143 	        if (infoStream != NULL) {
144 	          message("init: load commit \"" + fileName + "\"");
145 	        }
146 	        SegmentInfos sis;
147           bool failed = false;
148 	        try {
149 	          sis.read(directory, fileName.c_str());
150 	        } catch (CLuceneError& e) {
151             if ( e.number() != CL_ERR_IO ){
152               throw e;
153             }
154 	          // LUCENE-948: on NFS (and maybe others), if
155 	          // you have writers switching back and forth
156 	          // between machines, it's very likely that the
157 	          // dir listing will be stale and will claim a
158 	          // file segments_X exists when in fact it
159 	          // doesn't.  So, we catch this and handle it
160 	          // as if the file does not exist
161 	          if (infoStream != NULL) {
162 	            message("init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point32_t");
163 	          }
164 	          failed = true;
165 	        }
166 	        if (!failed) {
167 	          CommitPoint* commitPoint = _CLNEW CommitPoint(this,&sis);
168 	          if (sis.getGeneration() == segmentInfos->getGeneration()) {
169 	            currentCommitPoint = commitPoint;
170 	          }
171 	          commits.push_back(commitPoint);
172 	          incRef(&sis, true);
173 	        }
174 	      }
175 	    }
176 	  }
177 	}
178 
179 	if (currentCommitPoint == NULL) {
180 	  // We did not in fact see the segments_N file
181 	  // corresponding to the segmentInfos that was passed
182 	  // in.  Yet, it must exist, because our caller holds
183 	  // the write lock.  This can happen when the directory
184 	  // listing was stale (eg when index accessed via NFS
185 	  // client with stale directory listing cache).  So we
186 	  // try now to explicitly open this commit point32_t:
187 	  SegmentInfos sis;
188 	  try {
189 	    sis.read(directory, segmentInfos->getCurrentSegmentFileName().c_str());
190 	  } catch (CLuceneError& e) {
191       if ( e.number() == CL_ERR_IO ){
192 	      _CLTHROWA(CL_ERR_CorruptIndex, "failed to locate current segments_N file");
193       }
194 	  }
195 	  if (infoStream != NULL)
196 	    message("forced open of current segments file " + segmentInfos->getCurrentSegmentFileName());
197 	  currentCommitPoint = _CLNEW CommitPoint(this,&sis);
198     commits.push_back(currentCommitPoint);
199 	  incRef(&sis, true);
200 	}
201 
202 	// We keep commits list in sorted order (oldest to newest):
203   	std::sort(commits.begin(), commits.end(), CommitPoint::sort);
204 
205 	// Now delete anything with ref count at 0.  These are
206 	// presumably abandoned files eg due to crash of
207 	// IndexWriter.
208   RefCountsType::iterator it = refCounts.begin();
209 	while(it != refCounts.end()) {
210     char* fileName = it->first;
211     RefCount* rc = it->second;
212 	  if (0 == rc->count) {
213 	    if (infoStream != NULL) {
214 	      message( string("init: removing unreferenced file \"") + fileName + "\"");
215 	    }
216 	    deleteFile(fileName);
217 	  }
218     it++;
219 	}
220 
221 	// Finally, give policy a chance to remove things on
222 	// startup:
223 	policy->onInit(commits);
224 
225 	// It's OK for the onInit to remove the current commit
226 	// point; we just have to checkpoint our in-memory
227 	// SegmentInfos to protect those files that it uses:
228 	if (currentCommitPoint->deleted) {
229 	  checkpoint(segmentInfos, false);
230 	}
231 
232 	deleteCommits();
233 }
234 
235 /**
236 * Remove the CommitPoints in the commitsToDelete List by
237 * DecRef'ing all files from each segmentInfos->
238 */
deleteCommits()239 void IndexFileDeleter::deleteCommits() {
240 
241   int32_t size = commitsToDelete.size();
242 
243   if (size > 0) {
244 
245     // First decref all files that had been referred to by
246     // the now-deleted commits:
247     for(int32_t i=0;i<size;i++) {
248       CommitPoint* commit = commitsToDelete[i];
249       if (infoStream != NULL) {
250         message("deleteCommits: now remove commit \"" + commit->getSegmentsFileName() + "\"");
251       }
252       decRef(commit->files);
253     }
254     commitsToDelete.clear();
255 
256     // Now compact commits to remove deleted ones (preserving the sort):
257     size = commits.size();
258     int32_t readFrom = 0;
259     int32_t writeTo = 0;
260     while(readFrom < size) {
261       CommitPoint* commit = (CommitPoint*)commits[readFrom];
262       if (!commit->deleted) {
263         if (writeTo != readFrom) {
264           commits.remove(readFrom,true);
265           commits.remove(writeTo,false);//delete this one...
266           if ( commits.size() == writeTo )
267             commits.push_back(commit);
268           else
269             commits[writeTo] = commit;
270         }
271         writeTo++;
272       }
273       readFrom++;
274     }
275 
276     while(size > writeTo) {
277       commits.remove(size-1);
278       size--;
279     }
280   }
281 }
282 
283 /**
284 * Writer calls this when it has hit an error and had to
285 * roll back, to tell us that there may now be
286 * unreferenced files in the filesystem.  So we re-list
287 * the filesystem and delete such files.  If segmentName
288 * is non-NULL, we will only delete files corresponding to
289 * that segment.
290 */
refresh(const char * segmentName)291 void IndexFileDeleter::refresh(const char* segmentName) {
292   vector<string> files;
293   if ( !directory->list(files) )
294     _CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str() );
295   const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
296   string segmentPrefix1;
297   string segmentPrefix2;
298   if (segmentName != NULL) {
299     segmentPrefix1 = string(segmentName) + ".";
300     segmentPrefix2 = string(segmentName) + "_";
301   }
302 
303   for(size_t i=0;i<files.size();i++) {
304     string& fileName = files[i];
305     if ( filter->accept(NULL, fileName.c_str()) &&
306         ( (segmentName==NULL || fileName.compare(0,segmentPrefix1.length(),segmentPrefix1) == 0 || fileName.compare(0,segmentPrefix2.length(),segmentPrefix2)==0)
307           && refCounts.find((char*)fileName.c_str())== refCounts.end() && fileName.compare(IndexFileNames::SEGMENTS_GEN)!=0) ){
308 
309       // Unreferenced file, so remove it
310       if (infoStream != NULL) {
311         message( string("refresh [prefix=") + segmentName + "]: removing newly created unreferenced file \"" + fileName + "\"");
312       }
313       deleteFile(fileName.c_str());
314     }
315   }
316 }
317 
refresh()318 void IndexFileDeleter::refresh() {
319   refresh(NULL);
320 }
321 
close()322 void IndexFileDeleter::close() {
323   deletePendingFiles();
324 }
325 
deletePendingFiles()326 void IndexFileDeleter::deletePendingFiles() {
327   if (!deletable.empty()) {
328     vector<string> oldDeletable;
329     oldDeletable.insert(oldDeletable.end(),deletable.begin(),deletable.end());
330     deletable.clear();
331 
332     int32_t size = oldDeletable.size();
333     for(int32_t i=0;i<size;i++) {
334       if (infoStream != NULL)
335         message("delete pending file " + oldDeletable[i]);
336       deleteFile(oldDeletable[i].c_str());
337     }
338   }
339 }
340 
341 /**
342 * For definition of "check point32_t" see IndexWriter comments:
343 * "Clarification: Check Point32_ts (and commits)".
344 *
345 * Writer calls this when it has made a "consistent
346 * change" to the index, meaning new files are written to
347 * the index and the in-memory SegmentInfos have been
348 * modified to point32_t to those files.
349 *
350 * This may or may not be a commit (segments_N may or may
351 * not have been written).
352 *
353 * We simply incref the files referenced by the new
354 * SegmentInfos and decref the files we had previously
355 * seen (if any).
356 *
357 * If this is a commit, we also call the policy to give it
358 * a chance to remove other commits.  If any commits are
359 * removed, we decref their files as well.
360 */
checkpoint(SegmentInfos * segmentInfos,bool isCommit)361 void IndexFileDeleter::checkpoint(SegmentInfos* segmentInfos, bool isCommit) {
362 
363   if (infoStream != NULL) {
364     message(string("now checkpoint \"") + segmentInfos->getCurrentSegmentFileName() + "\" [" +
365       Misc::toString(segmentInfos->size()) + " segments ; isCommit = " + Misc::toString(isCommit) + "]");
366   }
367 
368   // Try again now to delete any previously un-deletable
369   // files (because they were in use, on Windows):
370   deletePendingFiles();
371 
372   // Incref the files:
373   incRef(segmentInfos, isCommit);
374   const vector<string>* docWriterFiles = NULL;
375   if (docWriter != NULL) {
376     docWriterFiles = &docWriter->files();
377     if (!docWriterFiles->empty())
378       incRef(*docWriterFiles);
379     else
380       docWriterFiles = NULL;
381   }
382 
383   if (isCommit) {
384     // Append to our commits list:
385     commits.push_back(_CLNEW CommitPoint(this, segmentInfos));
386 
387     // Tell policy so it can remove commits:
388     policy->onCommit(commits);
389 
390     // Decref files for commits that were deleted by the policy:
391     deleteCommits();
392   }
393 
394   // DecRef old files from the last checkpoint, if any:
395   int32_t size = lastFiles.size();
396   if (size > 0) {
397     for(int32_t i=0;i<size;i++)
398       decRef(lastFiles[i]);
399     lastFiles.clear();
400   }
401 
402   if (!isCommit) {
403     // Save files so we can decr on next checkpoint/commit:
404     size = segmentInfos->size();
405     for(int32_t i=0;i<size;i++) {
406       SegmentInfo* segmentInfo = segmentInfos->info(i);
407       if (segmentInfo->dir == directory) {
408         const vector<string>& files = segmentInfo->files();
409         lastFiles.insert(lastFiles.end(), files.begin(), files.end());
410       }
411     }
412   }
413   if (docWriterFiles != NULL)
414     lastFiles.insert(lastFiles.end(), docWriterFiles->begin(),docWriterFiles->end());
415 }
416 
incRef(SegmentInfos * segmentInfos,bool isCommit)417 void IndexFileDeleter::incRef(SegmentInfos* segmentInfos, bool isCommit) {
418   int32_t size = segmentInfos->size();
419   for(int32_t i=0;i<size;i++) {
420     SegmentInfo* segmentInfo = segmentInfos->info(i);
421     if (segmentInfo->dir == directory) {
422       incRef(segmentInfo->files());
423     }
424   }
425 
426   if (isCommit) {
427     // Since this is a commit point32_t, also incref its
428     // segments_N file:
429     getRefCount(segmentInfos->getCurrentSegmentFileName().c_str())->IncRef();
430   }
431 }
432 
incRef(const vector<string> & files)433 void IndexFileDeleter::incRef(const vector<string>& files) {
434   int32_t size = files.size();
435   for(int32_t i=0;i<size;i++) {
436     const string& fileName = files[i];
437     RefCount* rc = getRefCount(fileName.c_str());
438     if (infoStream != NULL && VERBOSE_REF_COUNTS) {
439       message(string("  IncRef \"") + fileName + "\": pre-incr count is " + Misc::toString((int32_t)rc->count));
440     }
441     rc->IncRef();
442   }
443 }
444 
decRef(const vector<string> & files)445 void IndexFileDeleter::decRef(const vector<string>& files) {
446   int32_t size = files.size();
447   for(int32_t i=0;i<size;i++) {
448     decRef(files[i]);
449   }
450 }
451 
decRef(const string & fileName)452 void IndexFileDeleter::decRef(const string& fileName) {
453   RefCount* rc = getRefCount(fileName.c_str());
454   if (infoStream != NULL && VERBOSE_REF_COUNTS) {
455     message(string("  DecRef \"") + fileName + "\": pre-decr count is " + Misc::toString((int32_t)rc->count));
456   }
457   if (0 == rc->DecRef()) {
458     // This file is no int32_t64_ter referenced by any past
459     // commit point32_ts nor by the in-memory SegmentInfos:
460     deleteFile(fileName.c_str());
461     refCounts.remove((char*)fileName.c_str());
462   }
463 }
464 
decRef(SegmentInfos * segmentInfos)465 void IndexFileDeleter::decRef(SegmentInfos* segmentInfos) {
466   int32_t size = segmentInfos->size();
467   for(int32_t i=0;i<size;i++) {
468     SegmentInfo* segmentInfo = segmentInfos->info(i);
469     if (segmentInfo->dir == directory) {
470       decRef(segmentInfo->files());
471     }
472   }
473 }
474 
getRefCount(const char * fileName)475 IndexFileDeleter::RefCount* IndexFileDeleter::getRefCount(const char* fileName) {
476   RefCount* rc;
477   RefCountsType::iterator itr = refCounts.find((char*)fileName);
478   if (itr == refCounts.end()) {
479     rc = _CLNEW RefCount();
480     refCounts.put( STRDUP_AtoA(fileName), rc);
481   } else {
482     rc = itr->second;
483   }
484   return rc;
485 }
486 
deleteFiles(vector<string> & files)487 void IndexFileDeleter::deleteFiles(vector<string>& files) {
488   int32_t size = files.size();
489   for(int32_t i=0;i<size;i++)
490     deleteFile(files[i].c_str());
491 }
492 
493 /** Delets the specified files, but only if they are new
494 *  (have not yet been incref'd). */
deleteNewFiles(const std::vector<std::string> & files)495 void IndexFileDeleter::deleteNewFiles(const std::vector<std::string>& files) {
496 	int32_t size = files.size();
497 	for(int32_t i=0;i<size;i++)
498 	  if (refCounts.find((char*)files[i].c_str()) == refCounts.end())
499 	    deleteFile(files[i].c_str());
500 }
501 
deleteFile(const char * fileName)502 void IndexFileDeleter::deleteFile(const char* fileName)
503 {
504 	try {
505 	  if (infoStream != NULL) {
506 	    message(string("delete \"") + fileName + "\"");
507 	  }
508 	  directory->deleteFile(fileName);
509 	} catch (CLuceneError& e) {       // if delete fails
510     if ( e.number() != CL_ERR_IO ){
511       throw e;
512     }
513 	  if (directory->fileExists(fileName)) {
514 
515 	    // Some operating systems (e.g. Windows) don't
516 	    // permit a file to be deleted while it is opened
517 	    // for read (e.g. by another process or thread). So
518 	    // we assume that when a delete fails it is because
519 	    // the file is open in another process, and queue
520 	    // the file for subsequent deletion.
521 
522 	    if (infoStream != NULL) {
523 	      message(string("IndexFileDeleter: unable to remove file \"") + fileName + "\": " + e.what() + "; Will re-try later.");
524 	    }
525 	    deletable.push_back(fileName);                  // add to deletable
526 	  }
527 	}
528 }
529 
530 CL_NS_END
531