1 #include "CLucene/_ApiHeader.h"
2 #include "_IndexFileDeleter.h"
3 #include "_IndexFileNameFilter.h"
4 #include "_DocumentsWriter.h"
5 #include "_SegmentHeader.h"
6 #include "CLucene/store/Directory.h"
7 #include "CLucene/LuceneThreads.h"
8 #include <algorithm>
9 #include <assert.h>
10 #include <iostream>
11
12 CL_NS_USE(store)
13 CL_NS_USE(util)
14 CL_NS_DEF(index)
15
16 bool IndexFileDeleter::VERBOSE_REF_COUNTS = false;
17
CommitPoint(IndexFileDeleter * _this,SegmentInfos * segmentInfos)18 IndexFileDeleter::CommitPoint::CommitPoint(IndexFileDeleter* _this, SegmentInfos* segmentInfos){
19 this->_this = _this;
20 this->deleted = false;
21 this->gen = 0;
22 segmentsFileName = segmentInfos->getCurrentSegmentFileName();
23 int32_t size = segmentInfos->size();
24 files.push_back(segmentsFileName);
25 gen = segmentInfos->getGeneration();
26 for(int32_t i=0;i<size;i++) {
27 SegmentInfo* segmentInfo = segmentInfos->info(i);
28 if (segmentInfo->dir == _this->directory) {
29 const vector<string>& ff = segmentInfo->files();
30 files.insert(files.end(),ff.begin(), ff.end());
31 }
32 }
33
34 }
~CommitPoint()35 IndexFileDeleter::CommitPoint::~CommitPoint(){
36 }
37
38 /**
39 * Get the segments_N file for this commit point32_t.
40 */
getSegmentsFileName()41 std::string IndexFileDeleter::CommitPoint::getSegmentsFileName() {
42 return segmentsFileName;
43 }
sort(IndexCommitPoint * elem1,IndexCommitPoint * elem2)44 bool IndexFileDeleter::CommitPoint::sort(IndexCommitPoint* elem1, IndexCommitPoint* elem2){
45 if (((CommitPoint*)elem1)->gen < ((CommitPoint*)elem2)->gen)
46 return true;
47 return false;
48 }
49
getFileNames()50 const std::vector<std::string>& IndexFileDeleter::CommitPoint::getFileNames() {
51 return files;
52 }
53
54 /**
55 * Called only be the deletion policy, to remove this
56 * commit point32_t from the index.
57 */
deleteCommitPoint()58 void IndexFileDeleter::CommitPoint::deleteCommitPoint() {
59 if (!deleted) {
60 deleted = true;
61 _this->commitsToDelete.push_back(this);
62 }
63 }
64
getClassName()65 const char* IndexFileDeleter::CommitPoint::getClassName(){
66 return "IndexFileDeleter::CommitPoint";
67 }
getObjectName() const68 const char* IndexFileDeleter::CommitPoint::getObjectName() const{
69 return getClassName();
70 }
compareTo(NamedObject * obj)71 int32_t IndexFileDeleter::CommitPoint::compareTo(NamedObject* obj) {
72 if ( obj->getObjectName() != CommitPoint::getClassName() )
73 return -1;
74
75 CommitPoint* commit = (CommitPoint*) obj;
76 if (gen < commit->gen) {
77 return -1;
78 } else if (gen > commit->gen) {
79 return 1;
80 } else {
81 return 0;
82 }
83 }
84
setInfoStream(std::ostream * infoStream)85 void IndexFileDeleter::setInfoStream(std::ostream* infoStream) {
86 this->infoStream = infoStream;
87 if (infoStream != NULL){
88 string msg = string("setInfoStream deletionPolicy=") + policy->getObjectName();
89 message( msg );
90 }
91 }
92
message(string message)93 void IndexFileDeleter::message(string message) {
94 (*infoStream) << string("IFD [") << Misc::toString( _LUCENE_CURRTHREADID ) << string("]: ") << message << string("\n");
95 }
96
97
~IndexFileDeleter()98 IndexFileDeleter::~IndexFileDeleter(){
99 _CLDELETE(policy);
100 commitsToDelete.clear();
101 commits.clear();
102 refCounts.clear();
103 }
IndexFileDeleter(Directory * directory,IndexDeletionPolicy * policy,SegmentInfos * segmentInfos,std::ostream * infoStream,DocumentsWriter * docWriter)104 IndexFileDeleter::IndexFileDeleter(Directory* directory, IndexDeletionPolicy* policy,
105 SegmentInfos* segmentInfos, std::ostream* infoStream, DocumentsWriter* docWriter):
106 refCounts( RefCountsType(true,true) ), commits(CommitsType(true))
107 {
108 this->docWriter = docWriter;
109 this->infoStream = infoStream;
110
111 if (infoStream != NULL)
112 message( string("init: current segments file is \"") + segmentInfos->getCurrentSegmentFileName() + "\"; deletionPolicy=" + policy->getObjectName());
113
114 this->policy = policy;
115 this->directory = directory;
116 CommitPoint* currentCommitPoint = NULL;
117
118 // First pass: walk the files and initialize our ref
119 // counts:
120 int64_t currentGen = segmentInfos->getGeneration();
121 const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
122
123 vector<string> files;
124 if ( !directory->list(&files) )
125 _CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str());
126
127
128 for(size_t i=0;i<files.size();i++) {
129
130 string& fileName = files.at(i);
131
132 if (filter->accept(NULL, fileName.c_str()) && fileName.compare(IndexFileNames::SEGMENTS_GEN) != 0) {
133
134 // Add this file to refCounts with initial count 0:
135 getRefCount(fileName.c_str());
136
137 if ( strncmp(fileName.c_str(), IndexFileNames::SEGMENTS, strlen(IndexFileNames::SEGMENTS)) == 0 ) {
138
139 // This is a commit (segments or segments_N), and
140 // it's valid (<= the max gen). Load it, then
141 // incref all files it refers to:
142 if (SegmentInfos::generationFromSegmentsFileName(fileName.c_str()) <= currentGen) {
143 if (infoStream != NULL) {
144 message("init: load commit \"" + fileName + "\"");
145 }
146 SegmentInfos sis;
147 bool failed = false;
148 try {
149 sis.read(directory, fileName.c_str());
150 } catch (CLuceneError& e) {
151 if ( e.number() != CL_ERR_IO ){
152 throw e;
153 }
154 // LUCENE-948: on NFS (and maybe others), if
155 // you have writers switching back and forth
156 // between machines, it's very likely that the
157 // dir listing will be stale and will claim a
158 // file segments_X exists when in fact it
159 // doesn't. So, we catch this and handle it
160 // as if the file does not exist
161 if (infoStream != NULL) {
162 message("init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point32_t");
163 }
164 failed = true;
165 }
166 if (!failed) {
167 CommitPoint* commitPoint = _CLNEW CommitPoint(this,&sis);
168 if (sis.getGeneration() == segmentInfos->getGeneration()) {
169 currentCommitPoint = commitPoint;
170 }
171 commits.push_back(commitPoint);
172 incRef(&sis, true);
173 }
174 }
175 }
176 }
177 }
178
179 if (currentCommitPoint == NULL) {
180 // We did not in fact see the segments_N file
181 // corresponding to the segmentInfos that was passed
182 // in. Yet, it must exist, because our caller holds
183 // the write lock. This can happen when the directory
184 // listing was stale (eg when index accessed via NFS
185 // client with stale directory listing cache). So we
186 // try now to explicitly open this commit point32_t:
187 SegmentInfos sis;
188 try {
189 sis.read(directory, segmentInfos->getCurrentSegmentFileName().c_str());
190 } catch (CLuceneError& e) {
191 if ( e.number() == CL_ERR_IO ){
192 _CLTHROWA(CL_ERR_CorruptIndex, "failed to locate current segments_N file");
193 }
194 }
195 if (infoStream != NULL)
196 message("forced open of current segments file " + segmentInfos->getCurrentSegmentFileName());
197 currentCommitPoint = _CLNEW CommitPoint(this,&sis);
198 commits.push_back(currentCommitPoint);
199 incRef(&sis, true);
200 }
201
202 // We keep commits list in sorted order (oldest to newest):
203 std::sort(commits.begin(), commits.end(), CommitPoint::sort);
204
205 // Now delete anything with ref count at 0. These are
206 // presumably abandoned files eg due to crash of
207 // IndexWriter.
208 RefCountsType::iterator it = refCounts.begin();
209 while(it != refCounts.end()) {
210 char* fileName = it->first;
211 RefCount* rc = it->second;
212 if (0 == rc->count) {
213 if (infoStream != NULL) {
214 message( string("init: removing unreferenced file \"") + fileName + "\"");
215 }
216 deleteFile(fileName);
217 }
218 it++;
219 }
220
221 // Finally, give policy a chance to remove things on
222 // startup:
223 policy->onInit(commits);
224
225 // It's OK for the onInit to remove the current commit
226 // point; we just have to checkpoint our in-memory
227 // SegmentInfos to protect those files that it uses:
228 if (currentCommitPoint->deleted) {
229 checkpoint(segmentInfos, false);
230 }
231
232 deleteCommits();
233 }
234
235 /**
236 * Remove the CommitPoints in the commitsToDelete List by
237 * DecRef'ing all files from each segmentInfos->
238 */
deleteCommits()239 void IndexFileDeleter::deleteCommits() {
240
241 int32_t size = commitsToDelete.size();
242
243 if (size > 0) {
244
245 // First decref all files that had been referred to by
246 // the now-deleted commits:
247 for(int32_t i=0;i<size;i++) {
248 CommitPoint* commit = commitsToDelete[i];
249 if (infoStream != NULL) {
250 message("deleteCommits: now remove commit \"" + commit->getSegmentsFileName() + "\"");
251 }
252 decRef(commit->files);
253 }
254 commitsToDelete.clear();
255
256 // Now compact commits to remove deleted ones (preserving the sort):
257 size = commits.size();
258 int32_t readFrom = 0;
259 int32_t writeTo = 0;
260 while(readFrom < size) {
261 CommitPoint* commit = (CommitPoint*)commits[readFrom];
262 if (!commit->deleted) {
263 if (writeTo != readFrom) {
264 commits.remove(readFrom,true);
265 commits.remove(writeTo,false);//delete this one...
266 if ( commits.size() == writeTo )
267 commits.push_back(commit);
268 else
269 commits[writeTo] = commit;
270 }
271 writeTo++;
272 }
273 readFrom++;
274 }
275
276 while(size > writeTo) {
277 commits.remove(size-1);
278 size--;
279 }
280 }
281 }
282
283 /**
284 * Writer calls this when it has hit an error and had to
285 * roll back, to tell us that there may now be
286 * unreferenced files in the filesystem. So we re-list
287 * the filesystem and delete such files. If segmentName
288 * is non-NULL, we will only delete files corresponding to
289 * that segment.
290 */
refresh(const char * segmentName)291 void IndexFileDeleter::refresh(const char* segmentName) {
292 vector<string> files;
293 if ( !directory->list(files) )
294 _CLTHROWA(CL_ERR_IO, (string("cannot read directory ") + directory->toString() + ": list() returned NULL").c_str() );
295 const IndexFileNameFilter* filter = IndexFileNameFilter::getFilter();
296 string segmentPrefix1;
297 string segmentPrefix2;
298 if (segmentName != NULL) {
299 segmentPrefix1 = string(segmentName) + ".";
300 segmentPrefix2 = string(segmentName) + "_";
301 }
302
303 for(size_t i=0;i<files.size();i++) {
304 string& fileName = files[i];
305 if ( filter->accept(NULL, fileName.c_str()) &&
306 ( (segmentName==NULL || fileName.compare(0,segmentPrefix1.length(),segmentPrefix1) == 0 || fileName.compare(0,segmentPrefix2.length(),segmentPrefix2)==0)
307 && refCounts.find((char*)fileName.c_str())== refCounts.end() && fileName.compare(IndexFileNames::SEGMENTS_GEN)!=0) ){
308
309 // Unreferenced file, so remove it
310 if (infoStream != NULL) {
311 message( string("refresh [prefix=") + segmentName + "]: removing newly created unreferenced file \"" + fileName + "\"");
312 }
313 deleteFile(fileName.c_str());
314 }
315 }
316 }
317
refresh()318 void IndexFileDeleter::refresh() {
319 refresh(NULL);
320 }
321
close()322 void IndexFileDeleter::close() {
323 deletePendingFiles();
324 }
325
deletePendingFiles()326 void IndexFileDeleter::deletePendingFiles() {
327 if (!deletable.empty()) {
328 vector<string> oldDeletable;
329 oldDeletable.insert(oldDeletable.end(),deletable.begin(),deletable.end());
330 deletable.clear();
331
332 int32_t size = oldDeletable.size();
333 for(int32_t i=0;i<size;i++) {
334 if (infoStream != NULL)
335 message("delete pending file " + oldDeletable[i]);
336 deleteFile(oldDeletable[i].c_str());
337 }
338 }
339 }
340
341 /**
342 * For definition of "check point32_t" see IndexWriter comments:
343 * "Clarification: Check Point32_ts (and commits)".
344 *
345 * Writer calls this when it has made a "consistent
346 * change" to the index, meaning new files are written to
347 * the index and the in-memory SegmentInfos have been
348 * modified to point32_t to those files.
349 *
350 * This may or may not be a commit (segments_N may or may
351 * not have been written).
352 *
353 * We simply incref the files referenced by the new
354 * SegmentInfos and decref the files we had previously
355 * seen (if any).
356 *
357 * If this is a commit, we also call the policy to give it
358 * a chance to remove other commits. If any commits are
359 * removed, we decref their files as well.
360 */
checkpoint(SegmentInfos * segmentInfos,bool isCommit)361 void IndexFileDeleter::checkpoint(SegmentInfos* segmentInfos, bool isCommit) {
362
363 if (infoStream != NULL) {
364 message(string("now checkpoint \"") + segmentInfos->getCurrentSegmentFileName() + "\" [" +
365 Misc::toString(segmentInfos->size()) + " segments ; isCommit = " + Misc::toString(isCommit) + "]");
366 }
367
368 // Try again now to delete any previously un-deletable
369 // files (because they were in use, on Windows):
370 deletePendingFiles();
371
372 // Incref the files:
373 incRef(segmentInfos, isCommit);
374 const vector<string>* docWriterFiles = NULL;
375 if (docWriter != NULL) {
376 docWriterFiles = &docWriter->files();
377 if (!docWriterFiles->empty())
378 incRef(*docWriterFiles);
379 else
380 docWriterFiles = NULL;
381 }
382
383 if (isCommit) {
384 // Append to our commits list:
385 commits.push_back(_CLNEW CommitPoint(this, segmentInfos));
386
387 // Tell policy so it can remove commits:
388 policy->onCommit(commits);
389
390 // Decref files for commits that were deleted by the policy:
391 deleteCommits();
392 }
393
394 // DecRef old files from the last checkpoint, if any:
395 int32_t size = lastFiles.size();
396 if (size > 0) {
397 for(int32_t i=0;i<size;i++)
398 decRef(lastFiles[i]);
399 lastFiles.clear();
400 }
401
402 if (!isCommit) {
403 // Save files so we can decr on next checkpoint/commit:
404 size = segmentInfos->size();
405 for(int32_t i=0;i<size;i++) {
406 SegmentInfo* segmentInfo = segmentInfos->info(i);
407 if (segmentInfo->dir == directory) {
408 const vector<string>& files = segmentInfo->files();
409 lastFiles.insert(lastFiles.end(), files.begin(), files.end());
410 }
411 }
412 }
413 if (docWriterFiles != NULL)
414 lastFiles.insert(lastFiles.end(), docWriterFiles->begin(),docWriterFiles->end());
415 }
416
incRef(SegmentInfos * segmentInfos,bool isCommit)417 void IndexFileDeleter::incRef(SegmentInfos* segmentInfos, bool isCommit) {
418 int32_t size = segmentInfos->size();
419 for(int32_t i=0;i<size;i++) {
420 SegmentInfo* segmentInfo = segmentInfos->info(i);
421 if (segmentInfo->dir == directory) {
422 incRef(segmentInfo->files());
423 }
424 }
425
426 if (isCommit) {
427 // Since this is a commit point32_t, also incref its
428 // segments_N file:
429 getRefCount(segmentInfos->getCurrentSegmentFileName().c_str())->IncRef();
430 }
431 }
432
incRef(const vector<string> & files)433 void IndexFileDeleter::incRef(const vector<string>& files) {
434 int32_t size = files.size();
435 for(int32_t i=0;i<size;i++) {
436 const string& fileName = files[i];
437 RefCount* rc = getRefCount(fileName.c_str());
438 if (infoStream != NULL && VERBOSE_REF_COUNTS) {
439 message(string(" IncRef \"") + fileName + "\": pre-incr count is " + Misc::toString((int32_t)rc->count));
440 }
441 rc->IncRef();
442 }
443 }
444
decRef(const vector<string> & files)445 void IndexFileDeleter::decRef(const vector<string>& files) {
446 int32_t size = files.size();
447 for(int32_t i=0;i<size;i++) {
448 decRef(files[i]);
449 }
450 }
451
decRef(const string & fileName)452 void IndexFileDeleter::decRef(const string& fileName) {
453 RefCount* rc = getRefCount(fileName.c_str());
454 if (infoStream != NULL && VERBOSE_REF_COUNTS) {
455 message(string(" DecRef \"") + fileName + "\": pre-decr count is " + Misc::toString((int32_t)rc->count));
456 }
457 if (0 == rc->DecRef()) {
458 // This file is no int32_t64_ter referenced by any past
459 // commit point32_ts nor by the in-memory SegmentInfos:
460 deleteFile(fileName.c_str());
461 refCounts.remove((char*)fileName.c_str());
462 }
463 }
464
decRef(SegmentInfos * segmentInfos)465 void IndexFileDeleter::decRef(SegmentInfos* segmentInfos) {
466 int32_t size = segmentInfos->size();
467 for(int32_t i=0;i<size;i++) {
468 SegmentInfo* segmentInfo = segmentInfos->info(i);
469 if (segmentInfo->dir == directory) {
470 decRef(segmentInfo->files());
471 }
472 }
473 }
474
getRefCount(const char * fileName)475 IndexFileDeleter::RefCount* IndexFileDeleter::getRefCount(const char* fileName) {
476 RefCount* rc;
477 RefCountsType::iterator itr = refCounts.find((char*)fileName);
478 if (itr == refCounts.end()) {
479 rc = _CLNEW RefCount();
480 refCounts.put( STRDUP_AtoA(fileName), rc);
481 } else {
482 rc = itr->second;
483 }
484 return rc;
485 }
486
deleteFiles(vector<string> & files)487 void IndexFileDeleter::deleteFiles(vector<string>& files) {
488 int32_t size = files.size();
489 for(int32_t i=0;i<size;i++)
490 deleteFile(files[i].c_str());
491 }
492
493 /** Delets the specified files, but only if they are new
494 * (have not yet been incref'd). */
deleteNewFiles(const std::vector<std::string> & files)495 void IndexFileDeleter::deleteNewFiles(const std::vector<std::string>& files) {
496 int32_t size = files.size();
497 for(int32_t i=0;i<size;i++)
498 if (refCounts.find((char*)files[i].c_str()) == refCounts.end())
499 deleteFile(files[i].c_str());
500 }
501
deleteFile(const char * fileName)502 void IndexFileDeleter::deleteFile(const char* fileName)
503 {
504 try {
505 if (infoStream != NULL) {
506 message(string("delete \"") + fileName + "\"");
507 }
508 directory->deleteFile(fileName);
509 } catch (CLuceneError& e) { // if delete fails
510 if ( e.number() != CL_ERR_IO ){
511 throw e;
512 }
513 if (directory->fileExists(fileName)) {
514
515 // Some operating systems (e.g. Windows) don't
516 // permit a file to be deleted while it is opened
517 // for read (e.g. by another process or thread). So
518 // we assume that when a delete fails it is because
519 // the file is open in another process, and queue
520 // the file for subsequent deletion.
521
522 if (infoStream != NULL) {
523 message(string("IndexFileDeleter: unable to remove file \"") + fileName + "\": " + e.what() + "; Will re-try later.");
524 }
525 deletable.push_back(fileName); // add to deletable
526 }
527 }
528 }
529
530 CL_NS_END
531