1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6
7 #include "LuceneInc.h"
8 #include "FreqProxTermsWriter.h"
9 #include "FreqProxTermsWriterPerThread.h"
10 #include "FreqProxTermsWriterPerField.h"
11 #include "FreqProxFieldMergeState.h"
12 #include "TermsHashConsumerPerThread.h"
13 #include "TermsHashConsumerPerField.h"
14 #include "TermsHashPerField.h"
15 #include "TermsHashPerThread.h"
16 #include "FormatPostingsDocsConsumer.h"
17 #include "FormatPostingsFieldsConsumer.h"
18 #include "FormatPostingsFieldsWriter.h"
19 #include "FormatPostingsTermsConsumer.h"
20 #include "FormatPostingsPositionsConsumer.h"
21 #include "FieldInfo.h"
22 #include "ByteSliceReader.h"
23 #include "RawPostingList.h"
24 #include "DocumentsWriter.h"
25 #include "UTF8Stream.h"
26 #include "TestPoint.h"
27
28 namespace Lucene {
29
~FreqProxTermsWriter()30 FreqProxTermsWriter::~FreqProxTermsWriter() {
31 }
32
addThread(const TermsHashPerThreadPtr & perThread)33 TermsHashConsumerPerThreadPtr FreqProxTermsWriter::addThread(const TermsHashPerThreadPtr& perThread) {
34 return newLucene<FreqProxTermsWriterPerThread>(perThread);
35 }
36
createPostings(Collection<RawPostingListPtr> postings,int32_t start,int32_t count)37 void FreqProxTermsWriter::createPostings(Collection<RawPostingListPtr> postings, int32_t start, int32_t count) {
38 int32_t end = start + count;
39 for (int32_t i = start; i < end; ++i) {
40 postings[i] = newLucene<FreqProxTermsWriterPostingList>();
41 }
42 }
43
compareText(const wchar_t * text1,int32_t pos1,const wchar_t * text2,int32_t pos2)44 int32_t FreqProxTermsWriter::compareText(const wchar_t* text1, int32_t pos1, const wchar_t* text2, int32_t pos2) {
45 while (true) {
46 wchar_t c1 = text1[pos1++];
47 wchar_t c2 = text2[pos2++];
48 if (c1 != c2) {
49 if (c2 == UTF8Base::UNICODE_TERMINATOR) {
50 return 1;
51 } else if (c1 == UTF8Base::UNICODE_TERMINATOR) {
52 return -1;
53 } else {
54 return (c1 - c2);
55 }
56 } else if (c1 == UTF8Base::UNICODE_TERMINATOR) {
57 return 0;
58 }
59 }
60 }
61
closeDocStore(const SegmentWriteStatePtr & state)62 void FreqProxTermsWriter::closeDocStore(const SegmentWriteStatePtr& state) {
63 }
64
abort()65 void FreqProxTermsWriter::abort() {
66 }
67
flush(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField threadsAndFields,const SegmentWriteStatePtr & state)68 void FreqProxTermsWriter::flush(MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField threadsAndFields, const SegmentWriteStatePtr& state) {
69 // Gather all FieldData's that have postings, across all ThreadStates
70 Collection<FreqProxTermsWriterPerFieldPtr> allFields(Collection<FreqProxTermsWriterPerFieldPtr>::newInstance());
71
72 for (MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
73 for (Collection<TermsHashConsumerPerFieldPtr>::iterator perField = entry->second.begin(); perField != entry->second.end(); ++perField) {
74 FreqProxTermsWriterPerFieldPtr freqProxPerField(boost::static_pointer_cast<FreqProxTermsWriterPerField>(*perField));
75 if (TermsHashPerFieldPtr(freqProxPerField->_termsHashPerField)->numPostings > 0) {
76 allFields.add(freqProxPerField);
77 }
78 }
79 }
80
81 // Sort by field name
82 std::sort(allFields.begin(), allFields.end(), luceneCompare<FreqProxTermsWriterPerFieldPtr>());
83
84 int32_t numAllFields = allFields.size();
85
86 FormatPostingsFieldsConsumerPtr consumer(newLucene<FormatPostingsFieldsWriter>(state, fieldInfos));
87
88 // Current writer chain:
89 // FormatPostingsFieldsConsumer
90 // -> IMPL: FormatPostingsFieldsWriter
91 // -> FormatPostingsTermsConsumer
92 // -> IMPL: FormatPostingsTermsWriter
93 // -> FormatPostingsDocConsumer
94 // -> IMPL: FormatPostingsDocWriter
95 // -> FormatPostingsPositionsConsumer
96 // -> IMPL: FormatPostingsPositionsWriter
97
98 int32_t start = 0;
99 while (start < numAllFields) {
100 FieldInfoPtr fieldInfo(allFields[start]->fieldInfo);
101 String fieldName(fieldInfo->name);
102
103 int32_t end = start + 1;
104 while (end < numAllFields && allFields[end]->fieldInfo->name == fieldName) {
105 ++end;
106 }
107
108 Collection<FreqProxTermsWriterPerFieldPtr> fields(Collection<FreqProxTermsWriterPerFieldPtr>::newInstance(end - start));
109 for (int32_t i = start; i < end; ++i) {
110 fields[i - start] = allFields[i];
111
112 // Aggregate the storePayload as seen by the same field across multiple threads
113 if (fields[i - start]->hasPayloads) {
114 fieldInfo->storePayloads = true;
115 }
116 }
117
118 // If this field has postings then add them to the segment
119 appendPostings(fields, consumer);
120
121 for (int32_t i = 0; i < fields.size(); ++i) {
122 TermsHashPerFieldPtr perField(fields[i]->_termsHashPerField);
123 int32_t numPostings = perField->numPostings;
124 perField->reset();
125 perField->shrinkHash(numPostings);
126 fields[i]->reset();
127 }
128
129 start = end;
130 }
131
132 for (MapTermsHashConsumerPerThreadCollectionTermsHashConsumerPerField::iterator entry = threadsAndFields.begin(); entry != threadsAndFields.end(); ++entry) {
133 TermsHashPerThreadPtr(boost::static_pointer_cast<FreqProxTermsWriterPerThread>(entry->first)->_termsHashPerThread)->reset(true);
134 }
135
136 consumer->finish();
137 }
138
appendPostings(Collection<FreqProxTermsWriterPerFieldPtr> fields,const FormatPostingsFieldsConsumerPtr & consumer)139 void FreqProxTermsWriter::appendPostings(Collection<FreqProxTermsWriterPerFieldPtr> fields, const FormatPostingsFieldsConsumerPtr& consumer) {
140 TestScope testScope(L"FreqProxTermsWriter", L"appendPostings");
141 int32_t numFields = fields.size();
142
143 Collection<FreqProxFieldMergeStatePtr> mergeStates(Collection<FreqProxFieldMergeStatePtr>::newInstance(numFields));
144
145 for (int32_t i = 0; i < numFields; ++i) {
146 FreqProxFieldMergeStatePtr fms(newLucene<FreqProxFieldMergeState>(fields[i]));
147 mergeStates[i] = fms;
148
149 BOOST_ASSERT(fms->field->fieldInfo == fields[0]->fieldInfo);
150
151 // Should always be true
152 bool result = fms->nextTerm();
153 BOOST_ASSERT(result);
154 }
155
156 FormatPostingsTermsConsumerPtr termsConsumer(consumer->addField(fields[0]->fieldInfo));
157
158 Collection<FreqProxFieldMergeStatePtr> termStates(Collection<FreqProxFieldMergeStatePtr>::newInstance(numFields));
159
160 bool currentFieldOmitTermFreqAndPositions = fields[0]->fieldInfo->omitTermFreqAndPositions;
161
162 while (numFields > 0) {
163 // Get the next term to merge
164 termStates[0] = mergeStates[0];
165 int32_t numToMerge = 1;
166
167 for (int32_t i = 1; i < numFields; ++i) {
168 CharArray text = mergeStates[i]->text;
169 int32_t textOffset = mergeStates[i]->textOffset;
170 int32_t cmp = compareText(text.get(), textOffset, termStates[0]->text.get(), termStates[0]->textOffset);
171
172 if (cmp < 0) {
173 termStates[0] = mergeStates[i];
174 numToMerge = 1;
175 } else if (cmp == 0) {
176 termStates[numToMerge++] = mergeStates[i];
177 }
178 }
179
180 FormatPostingsDocsConsumerPtr docConsumer(termsConsumer->addTerm(termStates[0]->text, termStates[0]->textOffset));
181
182 // Now termStates has numToMerge FieldMergeStates which all share the same term. Now we must
183 // interleave the docID streams.
184 while (numToMerge > 0) {
185 FreqProxFieldMergeStatePtr minState(termStates[0]);
186 for (int32_t i = 1; i < numToMerge; ++i) {
187 if (termStates[i]->docID < minState->docID) {
188 minState = termStates[i];
189 }
190 }
191
192 int32_t termDocFreq = minState->termFreq;
193
194 FormatPostingsPositionsConsumerPtr posConsumer(docConsumer->addDoc(minState->docID, termDocFreq));
195
196 ByteSliceReaderPtr prox(minState->prox);
197
198 // Carefully copy over the prox + payload info, changing the format to match Lucene's segment format.
199 if (!currentFieldOmitTermFreqAndPositions) {
200 // omitTermFreqAndPositions == false so we do write positions & payload
201 int32_t position = 0;
202 for (int32_t j = 0; j < termDocFreq; ++j) {
203 int32_t code = prox->readVInt();
204 position += (code >> 1);
205
206 int32_t payloadLength;
207 if ((code & 1) != 0) {
208 // This position has a payload
209 payloadLength = prox->readVInt();
210
211 if (!payloadBuffer) {
212 payloadBuffer = ByteArray::newInstance(payloadLength);
213 }
214 if (payloadBuffer.size() < payloadLength) {
215 payloadBuffer.resize(payloadLength);
216 }
217
218 prox->readBytes(payloadBuffer.get(), 0, payloadLength);
219 } else {
220 payloadLength = 0;
221 }
222
223 posConsumer->addPosition(position, payloadBuffer, 0, payloadLength);
224 }
225
226 posConsumer->finish();
227 }
228
229 if (!minState->nextDoc()) {
230 // Remove from termStates
231 int32_t upto = 0;
232 for (int32_t i = 0; i < numToMerge; ++i) {
233 if (termStates[i] != minState) {
234 termStates[upto++] = termStates[i];
235 }
236 }
237 --numToMerge;
238 BOOST_ASSERT(upto == numToMerge);
239
240 // Advance this state to the next term
241
242 if (!minState->nextTerm()) {
243 // OK, no more terms, so remove from mergeStates as well
244 upto = 0;
245 for (int32_t i = 0; i < numFields; ++i) {
246 if (mergeStates[i] != minState) {
247 mergeStates[upto++] = mergeStates[i];
248 }
249 }
250 --numFields;
251 BOOST_ASSERT(upto == numFields);
252 }
253 }
254 }
255
256 docConsumer->finish();
257 }
258
259 termsConsumer->finish();
260 }
261
bytesPerPosting()262 int32_t FreqProxTermsWriter::bytesPerPosting() {
263 return RawPostingList::BYTES_SIZE + 4 * DocumentsWriter::INT_NUM_BYTE;
264 }
265
FreqProxTermsWriterPostingList()266 FreqProxTermsWriterPostingList::FreqProxTermsWriterPostingList() {
267 docFreq = 0;
268 lastDocID = 0;
269 lastDocCode = 0;
270 lastPosition = 0;
271 }
272
~FreqProxTermsWriterPostingList()273 FreqProxTermsWriterPostingList::~FreqProxTermsWriterPostingList() {
274 }
275
276 }
277