1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "TermsHash.h"
9 #include "DocumentsWriter.h"
10 #include "TermsHashConsumer.h"
11 #include "TermsHashPerThread.h"
12 #include "TermsHashPerField.h"
13 #include "TermsHashConsumerPerThread.h"
14 #include "DocInverterPerThread.h"
15 #include "TermsHashConsumerPerField.h"
16 #include "IndexWriter.h"
17 #include "MiscUtils.h"
18
19 namespace Lucene {
20
TermsHash(const DocumentsWriterPtr & docWriter,bool trackAllocations,const TermsHashConsumerPtr & consumer,const TermsHashPtr & nextTermsHash)21 TermsHash::TermsHash(const DocumentsWriterPtr& docWriter, bool trackAllocations, const TermsHashConsumerPtr& consumer, const TermsHashPtr& nextTermsHash) {
22 this->postingsFreeCount = 0;
23 this->postingsAllocCount = 0;
24 this->trackAllocations = false;
25 this->postingsFreeList = Collection<RawPostingListPtr>::newInstance(1);
26
27 this->_docWriter = docWriter;
28 this->consumer = consumer;
29 this->nextTermsHash = nextTermsHash;
30 this->trackAllocations = trackAllocations;
31
32 bytesPerPosting = consumer->bytesPerPosting() + 4 * DocumentsWriter::POINTER_NUM_BYTE;
33 postingsFreeChunk = (int32_t)((double)DocumentsWriter::BYTE_BLOCK_SIZE / (double)bytesPerPosting);
34 }
35
~TermsHash()36 TermsHash::~TermsHash() {
37 }
38
addThread(const DocInverterPerThreadPtr & docInverterPerThread)39 InvertedDocConsumerPerThreadPtr TermsHash::addThread(const DocInverterPerThreadPtr& docInverterPerThread) {
40 return newLucene<TermsHashPerThread>(docInverterPerThread, shared_from_this(), nextTermsHash, TermsHashPerThreadPtr());
41 }
42
addThread(const DocInverterPerThreadPtr & docInverterPerThread,const TermsHashPerThreadPtr & primaryPerThread)43 TermsHashPerThreadPtr TermsHash::addThread(const DocInverterPerThreadPtr& docInverterPerThread, const TermsHashPerThreadPtr& primaryPerThread) {
44 return newLucene<TermsHashPerThread>(docInverterPerThread, shared_from_this(), nextTermsHash, primaryPerThread);
45 }
46
setFieldInfos(const FieldInfosPtr & fieldInfos)47 void TermsHash::setFieldInfos(const FieldInfosPtr& fieldInfos) {
48 this->fieldInfos = fieldInfos;
49 consumer->setFieldInfos(fieldInfos);
50 }
51
abort()52 void TermsHash::abort() {
53 consumer->abort();
54 if (nextTermsHash) {
55 nextTermsHash->abort();
56 }
57 }
58
shrinkFreePostings(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)59 void TermsHash::shrinkFreePostings(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
60 BOOST_ASSERT(postingsFreeCount == postingsAllocCount);
61
62 int32_t newSize = 1;
63 if (newSize != postingsFreeList.size()) {
64 if (postingsFreeCount > newSize) {
65 if (trackAllocations) {
66 DocumentsWriterPtr(_docWriter)->bytesAllocated(-(postingsFreeCount - newSize) * bytesPerPosting);
67 }
68 postingsFreeCount = newSize;
69 postingsAllocCount = newSize;
70 }
71 postingsFreeList.resize(newSize);
72 }
73 }
74
closeDocStore(const SegmentWriteStatePtr & state)75 void TermsHash::closeDocStore(const SegmentWriteStatePtr& state) {
76 SyncLock syncLock(this);
77 consumer->closeDocStore(state);
78 if (nextTermsHash) {
79 nextTermsHash->closeDocStore(state);
80 }
81 }
82
flush(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)83 void TermsHash::flush(MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
84 SyncLock syncLock(this);
85 MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField childThreadsAndFields(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::newInstance());
86 MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField nextThreadsAndFields;
87 if (nextTermsHash) {
88 nextThreadsAndFields = MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField::newInstance();
89 }
90
91 for (MapInvertedDocConsumerPerThreadCollectionInvertedDocConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
92 Collection<TermsHashConsumerPerFieldPtr> childFields(Collection<TermsHashConsumerPerFieldPtr>::newInstance());
93 Collection<InvertedDocConsumerPerFieldPtr> nextChildFields;
94 if (nextTermsHash) {
95 nextChildFields = Collection<InvertedDocConsumerPerFieldPtr>::newInstance();
96 }
97
98 for (Collection<InvertedDocConsumerPerFieldPtr>::iterator perField = entry->second.begin(); perField != entry->second.end(); ++perField) {
99 childFields.add(boost::static_pointer_cast<TermsHashPerField>(*perField)->consumer);
100 if (nextTermsHash) {
101 nextChildFields.add(boost::static_pointer_cast<TermsHashPerField>(*perField)->nextPerField);
102 }
103 }
104
105 childThreadsAndFields.put(boost::static_pointer_cast<TermsHashPerThread>(entry->first)->consumer, childFields);
106 if (nextTermsHash) {
107 nextThreadsAndFields.put(boost::static_pointer_cast<TermsHashPerThread>(entry->first)->nextPerThread, nextChildFields);
108 }
109 }
110
111 consumer->flush(childThreadsAndFields, state);
112
113 shrinkFreePostings(threadsAndFields, state);
114
115 if (nextTermsHash) {
116 nextTermsHash->flush(nextThreadsAndFields, state);
117 }
118 }
119
freeRAM()120 bool TermsHash::freeRAM() {
121 if (!trackAllocations) {
122 return false;
123 }
124
125 bool any = false;
126 int64_t bytesFreed = 0;
127 {
128 SyncLock syncLock(this);
129 int32_t numToFree = postingsFreeCount >= postingsFreeChunk ? postingsFreeChunk : postingsFreeCount;
130 any = (numToFree > 0);
131 if (any) {
132 MiscUtils::arrayFill(postingsFreeList.begin(), postingsFreeCount - numToFree, postingsFreeCount, RawPostingListPtr());
133 postingsFreeCount -= numToFree;
134 postingsAllocCount -= numToFree;
135 bytesFreed = -numToFree * bytesPerPosting;
136 any = true;
137 }
138 }
139
140 if (any) {
141 DocumentsWriterPtr(_docWriter)->bytesAllocated(bytesFreed);
142 }
143
144 if (nextTermsHash && nextTermsHash->freeRAM()) {
145 any = true;
146 }
147
148 return any;
149 }
150
recyclePostings(Collection<RawPostingListPtr> postings,int32_t numPostings)151 void TermsHash::recyclePostings(Collection<RawPostingListPtr> postings, int32_t numPostings) {
152 SyncLock syncLock(this);
153 BOOST_ASSERT(postings.size() >= numPostings);
154
155 // Move all Postings from this ThreadState back to our free list. We pre-allocated this array while we
156 // were creating Postings to make sure it's large enough
157 BOOST_ASSERT(postingsFreeCount + numPostings <= postingsFreeList.size());
158 MiscUtils::arrayCopy(postings.begin(), 0, postingsFreeList.begin(), postingsFreeCount, numPostings);
159 postingsFreeCount += numPostings;
160 }
161
getPostings(Collection<RawPostingListPtr> postings)162 void TermsHash::getPostings(Collection<RawPostingListPtr> postings) {
163 SyncLock syncLock(this);
164 DocumentsWriterPtr docWriter(_docWriter);
165 IndexWriterPtr writer(docWriter->_writer);
166
167 BOOST_ASSERT(writer->testPoint(L"TermsHash.getPostings start"));
168
169 BOOST_ASSERT(postingsFreeCount <= postingsFreeList.size());
170 BOOST_ASSERT(postingsFreeCount <= postingsAllocCount);
171
172 int32_t numToCopy = postingsFreeCount < postings.size() ? postingsFreeCount : postings.size();
173 int32_t start = postingsFreeCount - numToCopy;
174 BOOST_ASSERT(start >= 0);
175 BOOST_ASSERT(start + numToCopy <= postingsFreeList.size());
176 BOOST_ASSERT(numToCopy <= postings.size());
177 MiscUtils::arrayCopy(postingsFreeList.begin(), start, postings.begin(), 0, numToCopy);
178
179 // Directly allocate the remainder if any
180 if (numToCopy != postings.size()) {
181 int32_t extra = postings.size() - numToCopy;
182 int32_t newPostingsAllocCount = postingsAllocCount + extra;
183
184 consumer->createPostings(postings, numToCopy, extra);
185 BOOST_ASSERT(writer->testPoint(L"TermsHash.getPostings after create"));
186 postingsAllocCount += extra;
187
188 if (trackAllocations) {
189 docWriter->bytesAllocated(extra * bytesPerPosting);
190 }
191
192 if (newPostingsAllocCount > postingsFreeList.size()) {
193 // Pre-allocate the postingsFreeList so it's large enough to hold all postings we've given out
194 postingsFreeList = Collection<RawPostingListPtr>::newInstance(MiscUtils::getNextSize(newPostingsAllocCount));
195 }
196 }
197
198 postingsFreeCount -= numToCopy;
199
200 if (trackAllocations) {
201 docWriter->bytesUsed(postings.size() * bytesPerPosting);
202 }
203 }
204
205 }
206