1 /////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved.
3 // Distributable under the terms of either the Apache License (Version 2.0)
4 // or the GNU Lesser General Public License.
5 /////////////////////////////////////////////////////////////////////////////
6 
7 #ifndef DOCUMENTSWRITER_H
8 #define DOCUMENTSWRITER_H
9 
10 #include "ByteBlockPool.h"
11 #include "RAMFile.h"
12 
13 namespace Lucene {
14 
15 /// This class accepts multiple added documents and directly writes a single segment file.  It does this more
16 /// efficiently than creating a single segment per document (with DocumentWriter) and doing standard merges on
17 /// those segments.
18 ///
19 /// Each added document is passed to the {@link DocConsumer}, which in turn processes the document and interacts
20 /// with other consumers in the indexing chain.  Certain consumers, like {@link StoredFieldsWriter} and {@link
21 /// TermVectorsTermsWriter}, digest a document and immediately write bytes to the "doc store" files (ie,
22 /// they do not consume RAM per document, except while they are processing the document).
23 ///
24 /// Other consumers, eg {@link FreqProxTermsWriter} and {@link NormsWriter}, buffer bytes in RAM and flush only
25 /// when a new segment is produced.
26 ///
27 /// Once we have used our allowed RAM buffer, or the number of added docs is large enough (in the case we are
28 /// flushing by doc count instead of RAM usage), we create a real segment and flush it to the Directory.
29 ///
30 /// Threads:
31 /// Multiple threads are allowed into addDocument at once. There is an initial synchronized call to
32 /// getThreadState which allocates a ThreadState for this thread.  The same thread will get the same ThreadState
33 /// over time (thread affinity) so that if there are consistent patterns (for example each thread is indexing a
34 /// different content source) then we make better use of RAM.  Then processDocument is called on that ThreadState
35 /// without synchronization (most of the "heavy lifting" is in this call).  Finally the synchronized
36 /// "finishDocument" is called to flush changes to the directory.
37 ///
38 /// When flush is called by IndexWriter we forcefully idle all threads and flush only once they are all idle.
39 /// This means you can call flush with a given thread even while other threads are actively adding/deleting
40 /// documents.
41 ///
42 /// Exceptions:
43 /// Because this class directly updates in-memory posting lists, and flushes stored fields and term vectors
44 /// directly to files in the directory, there are certain limited times when an exception can corrupt this state.
45 /// For example, a disk full while flushing stored fields leaves this file in a corrupt state.  Or, an
46 /// std::bad_alloc exception while appending to the in-memory posting lists can corrupt that posting list.
47 /// We call such exceptions "aborting exceptions".  In these cases we must call abort() to discard all docs added
48 /// since the last flush.
49 ///
50 /// All other exceptions ("non-aborting exceptions") can still partially update the index structures.  These
51 /// updates are consistent, but, they represent only a part of the document seen up until the exception was hit.
52 /// When this happens, we immediately mark the document as deleted so that the document is always atomically
53 /// ("all or none") added to the index.
54 class LPPAPI DocumentsWriter : public LuceneObject {
55 public:
56     DocumentsWriter(const DirectoryPtr& directory, const IndexWriterPtr& writer, const IndexingChainPtr& indexingChain);
57     virtual ~DocumentsWriter();
58 
59     LUCENE_CLASS(DocumentsWriter);
60 
61 protected:
62     String docStoreSegment; // Current doc-store segment we are writing
63     int32_t docStoreOffset; // Current starting doc-store offset of current segment
64 
65     int32_t nextDocID; // Next docID to be added
66     int32_t numDocsInRAM; // # docs buffered in RAM
67 
68     /// Max # ThreadState instances; if there are more threads than this they share ThreadStates
69     static const int32_t MAX_THREAD_STATE;
70     Collection<DocumentsWriterThreadStatePtr> threadStates;
71     MapThreadDocumentsWriterThreadState threadBindings;
72 
73     int32_t pauseThreads; // Non-zero when we need all threads to pause (eg to flush)
74     bool aborting; // True if an abort is pending
75 
76     DocFieldProcessorPtr docFieldProcessor;
77 
78     /// Deletes done after the last flush; these are discarded on abort
79     BufferedDeletesPtr deletesInRAM;
80 
81     /// Deletes done before the last flush; these are still kept on abort
82     BufferedDeletesPtr deletesFlushed;
83 
84     /// The max number of delete terms that can be buffered before they must be flushed to disk.
85     int32_t maxBufferedDeleteTerms;
86 
87     /// How much RAM we can use before flushing.  This is 0 if we are flushing by doc count instead.
88     int64_t ramBufferSize;
89     int64_t waitQueuePauseBytes;
90     int64_t waitQueueResumeBytes;
91 
92     /// If we've allocated 5% over our RAM budget, we then free down to 95%
93     int64_t freeTrigger;
94     int64_t freeLevel;
95 
96     /// Flush @ this number of docs.  If ramBufferSize is non-zero we will flush by RAM usage instead.
97     int32_t maxBufferedDocs;
98 
99     /// How many docs already flushed to index
100     int32_t flushedDocCount;
101 
102     bool closed;
103 
104     /// List of files that were written before last abort()
105     HashSet<String> _abortedFiles;
106     SegmentWriteStatePtr flushState;
107 
108     Collection<IntArray> freeIntBlocks;
109     Collection<CharArray> freeCharBlocks;
110 
111 public:
112     /// Coarse estimates used to measure RAM usage of buffered deletes
113     static const int32_t OBJECT_HEADER_BYTES;
114     static const int32_t POINTER_NUM_BYTE;
115     static const int32_t INT_NUM_BYTE;
116     static const int32_t CHAR_NUM_BYTE;
117 
118     /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER).  Entry is object
119     /// with Term key, BufferedDeletes.Num val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).  Term is
120     /// object with String field and String text (OBJ_HEADER + 2*POINTER).  We don't count Term's field since
121     /// it's interned.  Term's text is String (OBJ_HEADER + 4*INT + POINTER + OBJ_HEADER + string.length*CHAR).
122     /// BufferedDeletes.num is OBJ_HEADER + INT.
123     static const int32_t BYTES_PER_DEL_TERM;
124 
125     /// Rough logic: del docIDs are List<Integer>.  Say list allocates ~2X size (2*POINTER).  Integer is
126     /// OBJ_HEADER + int
127     static const int32_t BYTES_PER_DEL_DOCID;
128 
129     /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER).  Entry is object
130     /// with Query key, Integer val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).  Query we often undercount
131     /// (say 24 bytes).  Integer is OBJ_HEADER + INT.
132     static const int32_t BYTES_PER_DEL_QUERY;
133 
134     /// Initial chunks size of the shared byte[] blocks used to store postings data
135     static const int32_t BYTE_BLOCK_SHIFT;
136     static const int32_t BYTE_BLOCK_SIZE;
137     static const int32_t BYTE_BLOCK_MASK;
138     static const int32_t BYTE_BLOCK_NOT_MASK;
139 
140     /// Initial chunk size of the shared char[] blocks used to store term text
141     static const int32_t CHAR_BLOCK_SHIFT;
142     static const int32_t CHAR_BLOCK_SIZE;
143     static const int32_t CHAR_BLOCK_MASK;
144 
145     static const int32_t MAX_TERM_LENGTH;
146 
147     /// Initial chunks size of the shared int[] blocks used to store postings data
148     static const int32_t INT_BLOCK_SHIFT;
149     static const int32_t INT_BLOCK_SIZE;
150     static const int32_t INT_BLOCK_MASK;
151 
152     static const int32_t PER_DOC_BLOCK_SIZE;
153 
154 INTERNAL:
155     IndexWriterWeakPtr _writer;
156     DirectoryPtr directory;
157     IndexingChainPtr indexingChain;
158     String segment; // Current segment we are working on
159 
160     int32_t numDocsInStore; // # docs written to doc stores
161 
162     bool flushPending; // True when a thread has decided to flush
163     bool bufferIsFull; // True when it's time to write segment
164 
165     InfoStreamPtr infoStream;
166     int32_t maxFieldLength;
167     SimilarityPtr similarity;
168 
169     DocConsumerPtr consumer;
170 
171     HashSet<String> _openFiles;
172     HashSet<String> _closedFiles;
173 
174     WaitQueuePtr waitQueue;
175     SkipDocWriterPtr skipDocWriter;
176 
177     ByteBlockAllocatorPtr byteBlockAllocator;
178     ByteBlockAllocatorPtr perDocAllocator;
179 
180     int64_t numBytesAlloc;
181     int64_t numBytesUsed;
182 
183     // used only by assert
184     TermPtr lastDeleteTerm;
185 
186 public:
187     virtual void initialize();
188 
189     /// Create and return a new DocWriterBuffer.
190     PerDocBufferPtr newPerDocBuffer();
191 
192     static IndexingChainPtr getDefaultIndexingChain();
193 
194     void updateFlushedDocCount(int32_t n);
195     int32_t getFlushedDocCount();
196     void setFlushedDocCount(int32_t n);
197 
198     /// Returns true if any of the fields in the current buffered docs have omitTermFreqAndPositions==false
199     bool hasProx();
200 
201     /// If non-null, various details of indexing are printed here.
202     void setInfoStream(const InfoStreamPtr& infoStream);
203 
204     void setMaxFieldLength(int32_t maxFieldLength);
205     void setSimilarity(const SimilarityPtr& similarity);
206 
207     /// Set how much RAM we can use before flushing.
208     void setRAMBufferSizeMB(double mb);
209     double getRAMBufferSizeMB();
210 
211     /// Set max buffered docs, which means we will flush by doc count instead of by RAM usage.
212     void setMaxBufferedDocs(int32_t count);
213     int32_t getMaxBufferedDocs();
214 
215     /// Get current segment name we are writing.
216     String getSegment();
217 
218     /// Returns how many docs are currently buffered in RAM.
219     int32_t getNumDocsInRAM();
220 
221     /// Returns the current doc store segment we are writing to.
222     String getDocStoreSegment();
223 
224     /// Returns the doc offset into the shared doc store for the current buffered docs.
225     int32_t getDocStoreOffset();
226 
227     /// Closes the current open doc stores an returns the doc store segment name.  This returns null if there
228     /// are no buffered documents.
229     String closeDocStore();
230 
231     HashSet<String> abortedFiles();
232 
233     void message(const String& message);
234 
235     /// Returns Collection of files in use by this instance, including any flushed segments.
236     HashSet<String> openFiles();
237     HashSet<String> closedFiles();
238 
239     void addOpenFile(const String& name);
240     void removeOpenFile(const String& name);
241 
242     void setAborting();
243 
244     /// Called if we hit an exception at a bad time (when updating the index files) and must discard all
245     /// currently buffered docs.  This resets our state, discarding any docs added since last flush.
246     void abort();
247 
248     /// Returns true if an abort is in progress
249     bool pauseAllThreads();
250     void resumeAllThreads();
251 
252     bool anyChanges();
253 
254     void initFlushState(bool onlyDocStore);
255 
256     /// Flush all pending docs to a new segment
257     int32_t flush(bool _closeDocStore);
258 
259     HashSet<String> getFlushedFiles();
260 
261     /// Build compound file for the segment we just flushed
262     void createCompoundFile(const String& segment);
263 
264     /// Set flushPending if it is not already set and returns whether it was set. This is used by IndexWriter
265     /// to trigger a single flush even when multiple threads are trying to do so.
266     bool setFlushPending();
267     void clearFlushPending();
268 
269     void pushDeletes();
270 
271     void close();
272 
273     void initSegmentName(bool onlyDocStore);
274 
275     /// Returns a free (idle) ThreadState that may be used for indexing this one document.  This call also
276     /// pauses if a flush is pending.  If delTerm is non-null then we buffer this deleted term after the
277     /// thread state has been acquired.
278     DocumentsWriterThreadStatePtr getThreadState(const DocumentPtr& doc, const TermPtr& delTerm);
279 
280     /// Returns true if the caller (IndexWriter) should now flush.
281     bool addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer);
282 
283     bool updateDocument(const TermPtr& t, const DocumentPtr& doc, const AnalyzerPtr& analyzer);
284     bool updateDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer, const TermPtr& delTerm);
285 
286     int32_t getNumBufferedDeleteTerms(); // for testing
287     MapTermNum getBufferedDeleteTerms(); // for testing
288 
289     /// Called whenever a merge has completed and the merged segments had deletions
290     void remapDeletes(const SegmentInfosPtr& infos, Collection< Collection<int32_t> > docMaps, Collection<int32_t> delCounts, const OneMergePtr& merge, int32_t mergeDocCount);
291 
292     bool bufferDeleteTerms(Collection<TermPtr> terms);
293     bool bufferDeleteTerm(const TermPtr& term);
294     bool bufferDeleteQueries(Collection<QueryPtr> queries);
295     bool bufferDeleteQuery(const QueryPtr& query);
296     bool deletesFull();
297     bool doApplyDeletes();
298 
299     void setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms);
300     int32_t getMaxBufferedDeleteTerms();
301 
302     bool hasDeletes();
303     bool applyDeletes(const SegmentInfosPtr& infos);
304     bool doBalanceRAM();
305 
306     void waitForWaitQueue();
307 
308     int64_t getRAMUsed();
309 
310     IntArray getIntBlock(bool trackAllocations);
311     void bytesAllocated(int64_t numBytes);
312     void bytesUsed(int64_t numBytes);
313     void recycleIntBlocks(Collection<IntArray> blocks, int32_t start, int32_t end);
314 
315     CharArray getCharBlock();
316     void recycleCharBlocks(Collection<CharArray> blocks, int32_t numBlocks);
317 
318     String toMB(int64_t v);
319 
320     /// We have four pools of RAM: Postings, byte blocks (holds freq/prox posting data), char blocks (holds
321     /// characters in the term) and per-doc buffers (stored fields/term vectors).  Different docs require
322     /// varying amount of storage from these four classes.
323     ///
324     /// For example, docs with many unique single-occurrence short terms will use up the Postings
325     /// RAM and hardly any of the other two.  Whereas docs with very large terms will use alot of char blocks
326     /// RAM and relatively less of the other two.  This method just frees allocations from the pools once we
327     /// are over-budget, which balances the pools to match the current docs.
328     void balanceRAM();
329 
330 protected:
331     /// Reset after a flush
332     void doAfterFlush();
333 
334     bool allThreadsIdle();
335 
336     void waitReady(const DocumentsWriterThreadStatePtr& state);
337 
338     bool timeToFlushDeletes();
339 
340     // used only by assert
341     bool checkDeleteTerm(const TermPtr& term);
342 
343     bool applyDeletes(const IndexReaderPtr& reader, int32_t docIDStart);
344     void addDeleteTerm(const TermPtr& term, int32_t docCount);
345 
346     /// Buffer a specific docID for deletion.  Currently only used when we hit a exception when adding a document
347     void addDeleteDocID(int32_t docID);
348     void addDeleteQuery(const QueryPtr& query, int32_t docID);
349 
350     /// Does the synchronized work to finish/flush the inverted document.
351     void finishDocument(const DocumentsWriterThreadStatePtr& perThread, const DocWriterPtr& docWriter);
352 
353     friend class WaitQueue;
354 };
355 
356 class DocState : public LuceneObject {
357 public:
358     DocState();
359     virtual ~DocState();
360 
361     LUCENE_CLASS(DocState);
362 
363 public:
364     DocumentsWriterWeakPtr _docWriter;
365     AnalyzerPtr analyzer;
366     int32_t maxFieldLength;
367     InfoStreamPtr infoStream;
368     SimilarityPtr similarity;
369     int32_t docID;
370     DocumentPtr doc;
371     String maxTermPrefix;
372 
373 public:
374     /// Only called by asserts
375     virtual bool testPoint(const String& name);
376 
377     void clear();
378 };
379 
380 /// RAMFile buffer for DocWriters.
381 class PerDocBuffer : public RAMFile {
382 public:
383     PerDocBuffer(const DocumentsWriterPtr& docWriter);
384     virtual ~PerDocBuffer();
385 
386     LUCENE_CLASS(PerDocBuffer);
387 
388 protected:
389     DocumentsWriterWeakPtr _docWriter;
390 
391 public:
392     /// Recycle the bytes used.
393     void recycle();
394 
395 protected:
396     /// Allocate bytes used from shared pool.
397     virtual ByteArray newBuffer(int32_t size);
398 };
399 
400 /// Consumer returns this on each doc.  This holds any state that must be flushed synchronized
401 /// "in docID order".  We gather these and flush them in order.
402 class DocWriter : public LuceneObject {
403 public:
404     DocWriter();
405     virtual ~DocWriter();
406 
407     LUCENE_CLASS(DocWriter);
408 
409 public:
410     DocWriterPtr next;
411     int32_t docID;
412 
413 public:
414     virtual void finish() = 0;
415     virtual void abort() = 0;
416     virtual int64_t sizeInBytes() = 0;
417 
418     virtual void setNext(const DocWriterPtr& next);
419 };
420 
421 /// The IndexingChain must define the {@link #getChain(DocumentsWriter)} method which returns the DocConsumer
422 /// that the DocumentsWriter calls to process the documents.
423 class IndexingChain : public LuceneObject {
424 public:
425     virtual ~IndexingChain();
426 
427     LUCENE_CLASS(IndexingChain);
428 
429 public:
430     virtual DocConsumerPtr getChain(const DocumentsWriterPtr& documentsWriter) = 0;
431 };
432 
433 /// This is the current indexing chain:
434 /// DocConsumer / DocConsumerPerThread
435 ///   --> code: DocFieldProcessor / DocFieldProcessorPerThread
436 ///     --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
437 ///       --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
438 ///         --> code: DocInverter / DocInverterPerThread / DocInverterPerField
439 ///          --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
440 ///            --> code: TermsHash / TermsHashPerThread / TermsHashPerField
441 ///              --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
442 ///                --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
443 ///                --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
444 ///          --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
445 ///            --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
446 ///        --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
447 class DefaultIndexingChain : public IndexingChain {
448 public:
449     virtual ~DefaultIndexingChain();
450 
451     LUCENE_CLASS(DefaultIndexingChain);
452 
453 public:
454     virtual DocConsumerPtr getChain(const DocumentsWriterPtr& documentsWriter);
455 };
456 
457 class SkipDocWriter : public DocWriter {
458 public:
459     virtual ~SkipDocWriter();
460 
461     LUCENE_CLASS(SkipDocWriter);
462 
463 public:
464     virtual void finish();
465     virtual void abort();
466     virtual int64_t sizeInBytes();
467 };
468 
469 class WaitQueue : public LuceneObject {
470 public:
471     WaitQueue(const DocumentsWriterPtr& docWriter);
472     virtual ~WaitQueue();
473 
474     LUCENE_CLASS(WaitQueue);
475 
476 protected:
477     DocumentsWriterWeakPtr _docWriter;
478 
479 public:
480     Collection<DocWriterPtr> waiting;
481     int32_t nextWriteDocID;
482     int32_t nextWriteLoc;
483     int32_t numWaiting;
484     int64_t waitingBytes;
485 
486 public:
487     void reset();
488     bool doResume();
489     bool doPause();
490     void abort();
491     bool add(const DocWriterPtr& doc);
492 
493 protected:
494     void writeDocument(const DocWriterPtr& doc);
495 };
496 
497 class ByteBlockAllocator : public ByteBlockPoolAllocatorBase {
498 public:
499     ByteBlockAllocator(const DocumentsWriterPtr& docWriter, int32_t blockSize);
500     virtual ~ByteBlockAllocator();
501 
502     LUCENE_CLASS(ByteBlockAllocator);
503 
504 protected:
505     DocumentsWriterWeakPtr _docWriter;
506 
507 public:
508     int32_t blockSize;
509     Collection<ByteArray> freeByteBlocks;
510 
511 public:
512     /// Allocate another byte[] from the shared pool
513     virtual ByteArray getByteBlock(bool trackAllocations);
514 
515     /// Return byte[]'s to the pool
516     virtual void recycleByteBlocks(Collection<ByteArray> blocks, int32_t start, int32_t end);
517     virtual void recycleByteBlocks(Collection<ByteArray> blocks);
518 };
519 
520 }
521 
522 #endif
523