1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #include "LuceneInc.h"
8 #include "TermsHash.h"
9 #include "DocumentsWriter.h"
10 #include "TermsHashConsumer.h"
11 #include "TermsHashPerThread.h"
12 #include "TermsHashPerField.h"
13 #include "TermsHashConsumerPerThread.h"
14 #include "DocInverterPerThread.h"
15 #include "TermsHashConsumerPerField.h"
16 #include "IndexWriter.h"
17 #include "MiscUtils.h"
18 
19 namespace Lucene {
20 
TermsHash(const DocumentsWriterPtr & docWriter,bool trackAllocations,const TermsHashConsumerPtr & consumer,const TermsHashPtr & nextTermsHash)21 TermsHash::TermsHash(const DocumentsWriterPtr& docWriter, bool trackAllocations, const TermsHashConsumerPtr& consumer, const TermsHashPtr& nextTermsHash) {
22     this->postingsFreeCount = 0;
23     this->postingsAllocCount = 0;
24     this->trackAllocations = false;
25     this->postingsFreeList = Collection<RawPostingListPtr>::newInstance(1);
26 
27     this->_docWriter = docWriter;
28     this->consumer = consumer;
29     this->nextTermsHash = nextTermsHash;
30     this->trackAllocations = trackAllocations;
31 
32     bytesPerPosting = consumer->bytesPerPosting() + 4 * DocumentsWriter::POINTER_NUM_BYTE;
33     postingsFreeChunk = (int32_t)((double)DocumentsWriter::BYTE_BLOCK_SIZE / (double)bytesPerPosting);
34 }
35 
~TermsHash()36 TermsHash::~TermsHash() {
37 }
38 
addThread(const DocInverterPerThreadPtr & docInverterPerThread)39 InvertedDocConsumerPerThreadPtr TermsHash::addThread(const DocInverterPerThreadPtr& docInverterPerThread) {
40     return newLucene<TermsHashPerThread>(docInverterPerThread, shared_from_this(), nextTermsHash, TermsHashPerThreadPtr());
41 }
42 
addThread(const DocInverterPerThreadPtr & docInverterPerThread,const TermsHashPerThreadPtr & primaryPerThread)43 TermsHashPerThreadPtr TermsHash::addThread(const DocInverterPerThreadPtr& docInverterPerThread, const TermsHashPerThreadPtr& primaryPerThread) {
44     return newLucene<TermsHashPerThread>(docInverterPerThread, shared_from_this(), nextTermsHash, primaryPerThread);
45 }
46 
setFieldInfos(const FieldInfosPtr & fieldInfos)47 void TermsHash::setFieldInfos(const FieldInfosPtr& fieldInfos) {
48     this->fieldInfos = fieldInfos;
49     consumer->setFieldInfos(fieldInfos);
50 }
51 
abort()52 void TermsHash::abort() {
53     consumer->abort();
54     if (nextTermsHash) {
55         nextTermsHash->abort();
56     }
57 }
58 
shrinkFreePostings(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)59 void TermsHash::shrinkFreePostings(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
60     BOOST_ASSERT(postingsFreeCount == postingsAllocCount);
61 
62     int32_t newSize = 1;
63     if (newSize != postingsFreeList.size()) {
64         if (postingsFreeCount > newSize) {
65             if (trackAllocations) {
66                 DocumentsWriterPtr(_docWriter)->bytesAllocated(-(postingsFreeCount - newSize) * bytesPerPosting);
67             }
68             postingsFreeCount = newSize;
69             postingsAllocCount = newSize;
70         }
71         postingsFreeList.resize(newSize);
72     }
73 }
74 
closeDocStore(const SegmentWriteStatePtr & state)75 void TermsHash::closeDocStore(const SegmentWriteStatePtr& state) {
76     SyncLock syncLock(this);
77     consumer->closeDocStore(state);
78     if (nextTermsHash) {
79         nextTermsHash->closeDocStore(state);
80     }
81 }
82 
flush(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)83 void TermsHash::flush(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
84     SyncLock syncLock(this);
85     MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField childThreadsAndFields(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::newInstance());
86     MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField nextThreadsAndFields;
87     if (nextTermsHash) {
88         nextThreadsAndFields = MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField::newInstance();
89     }
90 
91     for (MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
92         Collection<TermsHashConsumerPerFieldPtr> childFields(Collection<TermsHashConsumerPerFieldPtr>::newInstance());
93         Collection<InvertedDocConsumerPerFieldPtr> nextChildFields;
94         if (nextTermsHash) {
95             nextChildFields = Collection<InvertedDocConsumerPerFieldPtr>::newInstance();
96         }
97 
98         for (Collection<InvertedDocConsumerPerFieldPtr>::iterator perField = entry->second.begin(); perField != entry->second.end(); ++perField) {
99             childFields.add(boost::static_pointer_cast<TermsHashPerField>(*perField)->consumer);
100             if (nextTermsHash) {
101                 nextChildFields.add(boost::static_pointer_cast<TermsHashPerField>(*perField)->nextPerField);
102             }
103         }
104 
105         childThreadsAndFields.put(boost::static_pointer_cast<TermsHashPerThread>(entry->first)->consumer, childFields);
106         if (nextTermsHash) {
107             nextThreadsAndFields.put(boost::static_pointer_cast<TermsHashPerThread>(entry->first)->nextPerThread, nextChildFields);
108         }
109     }
110 
111     consumer->flush(childThreadsAndFields, state);
112 
113     shrinkFreePostings(threadsAndFields, state);
114 
115     if (nextTermsHash) {
116         nextTermsHash->flush(nextThreadsAndFields, state);
117     }
118 }
119 
freeRAM()120 bool TermsHash::freeRAM() {
121     if (!trackAllocations) {
122         return false;
123     }
124 
125     bool any = false;
126     int64_t bytesFreed = 0;
127     {
128         SyncLock syncLock(this);
129         int32_t numToFree = postingsFreeCount >= postingsFreeChunk ? postingsFreeChunk : postingsFreeCount;
130         any = (numToFree > 0);
131         if (any) {
132             MiscUtils::arrayFill(postingsFreeList.begin(), postingsFreeCount - numToFree, postingsFreeCount, RawPostingListPtr());
133             postingsFreeCount -= numToFree;
134             postingsAllocCount -= numToFree;
135             bytesFreed = -numToFree * bytesPerPosting;
136             any = true;
137         }
138     }
139 
140     if (any) {
141         DocumentsWriterPtr(_docWriter)->bytesAllocated(bytesFreed);
142     }
143 
144     if (nextTermsHash && nextTermsHash->freeRAM()) {
145         any = true;
146     }
147 
148     return any;
149 }
150 
recyclePostings(Collection<RawPostingListPtr> postings,int32_t numPostings)151 void TermsHash::recyclePostings(Collection<RawPostingListPtr> postings, int32_t numPostings) {
152     SyncLock syncLock(this);
153     BOOST_ASSERT(postings.size() >= numPostings);
154 
155     // Move all Postings from this ThreadState back to our free list.  We pre-allocated this array while we
156     // were creating Postings to make sure it's large enough
157     BOOST_ASSERT(postingsFreeCount + numPostings <= postingsFreeList.size());
158     MiscUtils::arrayCopy(postings.begin(), 0, postingsFreeList.begin(), postingsFreeCount, numPostings);
159     postingsFreeCount += numPostings;
160 }
161 
getPostings(Collection<RawPostingListPtr> postings)162 void TermsHash::getPostings(Collection<RawPostingListPtr> postings) {
163     SyncLock syncLock(this);
164     DocumentsWriterPtr docWriter(_docWriter);
165     IndexWriterPtr writer(docWriter->_writer);
166 
167     BOOST_ASSERT(writer->testPoint(L"TermsHash.getPostings start"));
168 
169     BOOST_ASSERT(postingsFreeCount <= postingsFreeList.size());
170     BOOST_ASSERT(postingsFreeCount <= postingsAllocCount);
171 
172     int32_t numToCopy = postingsFreeCount < postings.size() ? postingsFreeCount : postings.size();
173     int32_t start = postingsFreeCount - numToCopy;
174     BOOST_ASSERT(start >= 0);
175     BOOST_ASSERT(start + numToCopy <= postingsFreeList.size());
176     BOOST_ASSERT(numToCopy <= postings.size());
177     MiscUtils::arrayCopy(postingsFreeList.begin(), start, postings.begin(), 0, numToCopy);
178 
179     // Directly allocate the remainder if any
180     if (numToCopy != postings.size()) {
181         int32_t extra = postings.size() - numToCopy;
182         int32_t newPostingsAllocCount = postingsAllocCount + extra;
183 
184         consumer->createPostings(postings, numToCopy, extra);
185         BOOST_ASSERT(writer->testPoint(L"TermsHash.getPostings after create"));
186         postingsAllocCount += extra;
187 
188         if (trackAllocations) {
189             docWriter->bytesAllocated(extra * bytesPerPosting);
190         }
191 
192         if (newPostingsAllocCount > postingsFreeList.size()) {
193             // Pre-allocate the postingsFreeList so it's large enough to hold all postings we've given out
194             postingsFreeList = Collection<RawPostingListPtr>::newInstance(MiscUtils::getNextSize(newPostingsAllocCount));
195         }
196     }
197 
198     postingsFreeCount -= numToCopy;
199 
200     if (trackAllocations) {
201         docWriter->bytesUsed(postings.size() * bytesPerPosting);
202     }
203 }
204 
205 }
206