1 ///////////////////////////////////////////////////////////////////////////// 2 // Copyright (c) 2009-2014 Alan Wright. All rights reserved. 3 // Distributable under the terms of either the Apache License (Version 2.0) 4 // or the GNU Lesser General Public License. 5 ///////////////////////////////////////////////////////////////////////////// 6 7 #ifndef DOCUMENTSWRITER_H 8 #define DOCUMENTSWRITER_H 9 10 #include "ByteBlockPool.h" 11 #include "RAMFile.h" 12 13 namespace Lucene { 14 15 /// This class accepts multiple added documents and directly writes a single segment file. It does this more 16 /// efficiently than creating a single segment per document (with DocumentWriter) and doing standard merges on 17 /// those segments. 18 /// 19 /// Each added document is passed to the {@link DocConsumer}, which in turn processes the document and interacts 20 /// with other consumers in the indexing chain. Certain consumers, like {@link StoredFieldsWriter} and {@link 21 /// TermVectorsTermsWriter}, digest a document and immediately write bytes to the "doc store" files (ie, 22 /// they do not consume RAM per document, except while they are processing the document). 23 /// 24 /// Other consumers, eg {@link FreqProxTermsWriter} and {@link NormsWriter}, buffer bytes in RAM and flush only 25 /// when a new segment is produced. 26 /// 27 /// Once we have used our allowed RAM buffer, or the number of added docs is large enough (in the case we are 28 /// flushing by doc count instead of RAM usage), we create a real segment and flush it to the Directory. 29 /// 30 /// Threads: 31 /// Multiple threads are allowed into addDocument at once. There is an initial synchronized call to 32 /// getThreadState which allocates a ThreadState for this thread. The same thread will get the same ThreadState 33 /// over time (thread affinity) so that if there are consistent patterns (for example each thread is indexing a 34 /// different content source) then we make better use of RAM. Then processDocument is called on that ThreadState 35 /// without synchronization (most of the "heavy lifting" is in this call). Finally the synchronized 36 /// "finishDocument" is called to flush changes to the directory. 37 /// 38 /// When flush is called by IndexWriter we forcefully idle all threads and flush only once they are all idle. 39 /// This means you can call flush with a given thread even while other threads are actively adding/deleting 40 /// documents. 41 /// 42 /// Exceptions: 43 /// Because this class directly updates in-memory posting lists, and flushes stored fields and term vectors 44 /// directly to files in the directory, there are certain limited times when an exception can corrupt this state. 45 /// For example, a disk full while flushing stored fields leaves this file in a corrupt state. Or, an 46 /// std::bad_alloc exception while appending to the in-memory posting lists can corrupt that posting list. 47 /// We call such exceptions "aborting exceptions". In these cases we must call abort() to discard all docs added 48 /// since the last flush. 49 /// 50 /// All other exceptions ("non-aborting exceptions") can still partially update the index structures. These 51 /// updates are consistent, but, they represent only a part of the document seen up until the exception was hit. 52 /// When this happens, we immediately mark the document as deleted so that the document is always atomically 53 /// ("all or none") added to the index. 54 class LPPAPI DocumentsWriter : public LuceneObject { 55 public: 56 DocumentsWriter(const DirectoryPtr& directory, const IndexWriterPtr& writer, const IndexingChainPtr& indexingChain); 57 virtual ~DocumentsWriter(); 58 59 LUCENE_CLASS(DocumentsWriter); 60 61 protected: 62 String docStoreSegment; // Current doc-store segment we are writing 63 int32_t docStoreOffset; // Current starting doc-store offset of current segment 64 65 int32_t nextDocID; // Next docID to be added 66 int32_t numDocsInRAM; // # docs buffered in RAM 67 68 /// Max # ThreadState instances; if there are more threads than this they share ThreadStates 69 static const int32_t MAX_THREAD_STATE; 70 Collection<DocumentsWriterThreadStatePtr> threadStates; 71 MapThreadDocumentsWriterThreadState threadBindings; 72 73 int32_t pauseThreads; // Non-zero when we need all threads to pause (eg to flush) 74 bool aborting; // True if an abort is pending 75 76 DocFieldProcessorPtr docFieldProcessor; 77 78 /// Deletes done after the last flush; these are discarded on abort 79 BufferedDeletesPtr deletesInRAM; 80 81 /// Deletes done before the last flush; these are still kept on abort 82 BufferedDeletesPtr deletesFlushed; 83 84 /// The max number of delete terms that can be buffered before they must be flushed to disk. 85 int32_t maxBufferedDeleteTerms; 86 87 /// How much RAM we can use before flushing. This is 0 if we are flushing by doc count instead. 88 int64_t ramBufferSize; 89 int64_t waitQueuePauseBytes; 90 int64_t waitQueueResumeBytes; 91 92 /// If we've allocated 5% over our RAM budget, we then free down to 95% 93 int64_t freeTrigger; 94 int64_t freeLevel; 95 96 /// Flush @ this number of docs. If ramBufferSize is non-zero we will flush by RAM usage instead. 97 int32_t maxBufferedDocs; 98 99 /// How many docs already flushed to index 100 int32_t flushedDocCount; 101 102 bool closed; 103 104 /// List of files that were written before last abort() 105 HashSet<String> _abortedFiles; 106 SegmentWriteStatePtr flushState; 107 108 Collection<IntArray> freeIntBlocks; 109 Collection<CharArray> freeCharBlocks; 110 111 public: 112 /// Coarse estimates used to measure RAM usage of buffered deletes 113 static const int32_t OBJECT_HEADER_BYTES; 114 static const int32_t POINTER_NUM_BYTE; 115 static const int32_t INT_NUM_BYTE; 116 static const int32_t CHAR_NUM_BYTE; 117 118 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER). Entry is object 119 /// with Term key, BufferedDeletes.Num val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). Term is 120 /// object with String field and String text (OBJ_HEADER + 2*POINTER). We don't count Term's field since 121 /// it's interned. Term's text is String (OBJ_HEADER + 4*INT + POINTER + OBJ_HEADER + string.length*CHAR). 122 /// BufferedDeletes.num is OBJ_HEADER + INT. 123 static const int32_t BYTES_PER_DEL_TERM; 124 125 /// Rough logic: del docIDs are List<Integer>. Say list allocates ~2X size (2*POINTER). Integer is 126 /// OBJ_HEADER + int 127 static const int32_t BYTES_PER_DEL_DOCID; 128 129 /// Rough logic: HashMap has an array[Entry] with varying load factor (say 2 * POINTER). Entry is object 130 /// with Query key, Integer val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). Query we often undercount 131 /// (say 24 bytes). Integer is OBJ_HEADER + INT. 132 static const int32_t BYTES_PER_DEL_QUERY; 133 134 /// Initial chunks size of the shared byte[] blocks used to store postings data 135 static const int32_t BYTE_BLOCK_SHIFT; 136 static const int32_t BYTE_BLOCK_SIZE; 137 static const int32_t BYTE_BLOCK_MASK; 138 static const int32_t BYTE_BLOCK_NOT_MASK; 139 140 /// Initial chunk size of the shared char[] blocks used to store term text 141 static const int32_t CHAR_BLOCK_SHIFT; 142 static const int32_t CHAR_BLOCK_SIZE; 143 static const int32_t CHAR_BLOCK_MASK; 144 145 static const int32_t MAX_TERM_LENGTH; 146 147 /// Initial chunks size of the shared int[] blocks used to store postings data 148 static const int32_t INT_BLOCK_SHIFT; 149 static const int32_t INT_BLOCK_SIZE; 150 static const int32_t INT_BLOCK_MASK; 151 152 static const int32_t PER_DOC_BLOCK_SIZE; 153 154 INTERNAL: 155 IndexWriterWeakPtr _writer; 156 DirectoryPtr directory; 157 IndexingChainPtr indexingChain; 158 String segment; // Current segment we are working on 159 160 int32_t numDocsInStore; // # docs written to doc stores 161 162 bool flushPending; // True when a thread has decided to flush 163 bool bufferIsFull; // True when it's time to write segment 164 165 InfoStreamPtr infoStream; 166 int32_t maxFieldLength; 167 SimilarityPtr similarity; 168 169 DocConsumerPtr consumer; 170 171 HashSet<String> _openFiles; 172 HashSet<String> _closedFiles; 173 174 WaitQueuePtr waitQueue; 175 SkipDocWriterPtr skipDocWriter; 176 177 ByteBlockAllocatorPtr byteBlockAllocator; 178 ByteBlockAllocatorPtr perDocAllocator; 179 180 int64_t numBytesAlloc; 181 int64_t numBytesUsed; 182 183 // used only by assert 184 TermPtr lastDeleteTerm; 185 186 public: 187 virtual void initialize(); 188 189 /// Create and return a new DocWriterBuffer. 190 PerDocBufferPtr newPerDocBuffer(); 191 192 static IndexingChainPtr getDefaultIndexingChain(); 193 194 void updateFlushedDocCount(int32_t n); 195 int32_t getFlushedDocCount(); 196 void setFlushedDocCount(int32_t n); 197 198 /// Returns true if any of the fields in the current buffered docs have omitTermFreqAndPositions==false 199 bool hasProx(); 200 201 /// If non-null, various details of indexing are printed here. 202 void setInfoStream(const InfoStreamPtr& infoStream); 203 204 void setMaxFieldLength(int32_t maxFieldLength); 205 void setSimilarity(const SimilarityPtr& similarity); 206 207 /// Set how much RAM we can use before flushing. 208 void setRAMBufferSizeMB(double mb); 209 double getRAMBufferSizeMB(); 210 211 /// Set max buffered docs, which means we will flush by doc count instead of by RAM usage. 212 void setMaxBufferedDocs(int32_t count); 213 int32_t getMaxBufferedDocs(); 214 215 /// Get current segment name we are writing. 216 String getSegment(); 217 218 /// Returns how many docs are currently buffered in RAM. 219 int32_t getNumDocsInRAM(); 220 221 /// Returns the current doc store segment we are writing to. 222 String getDocStoreSegment(); 223 224 /// Returns the doc offset into the shared doc store for the current buffered docs. 225 int32_t getDocStoreOffset(); 226 227 /// Closes the current open doc stores an returns the doc store segment name. This returns null if there 228 /// are no buffered documents. 229 String closeDocStore(); 230 231 HashSet<String> abortedFiles(); 232 233 void message(const String& message); 234 235 /// Returns Collection of files in use by this instance, including any flushed segments. 236 HashSet<String> openFiles(); 237 HashSet<String> closedFiles(); 238 239 void addOpenFile(const String& name); 240 void removeOpenFile(const String& name); 241 242 void setAborting(); 243 244 /// Called if we hit an exception at a bad time (when updating the index files) and must discard all 245 /// currently buffered docs. This resets our state, discarding any docs added since last flush. 246 void abort(); 247 248 /// Returns true if an abort is in progress 249 bool pauseAllThreads(); 250 void resumeAllThreads(); 251 252 bool anyChanges(); 253 254 void initFlushState(bool onlyDocStore); 255 256 /// Flush all pending docs to a new segment 257 int32_t flush(bool _closeDocStore); 258 259 HashSet<String> getFlushedFiles(); 260 261 /// Build compound file for the segment we just flushed 262 void createCompoundFile(const String& segment); 263 264 /// Set flushPending if it is not already set and returns whether it was set. This is used by IndexWriter 265 /// to trigger a single flush even when multiple threads are trying to do so. 266 bool setFlushPending(); 267 void clearFlushPending(); 268 269 void pushDeletes(); 270 271 void close(); 272 273 void initSegmentName(bool onlyDocStore); 274 275 /// Returns a free (idle) ThreadState that may be used for indexing this one document. This call also 276 /// pauses if a flush is pending. If delTerm is non-null then we buffer this deleted term after the 277 /// thread state has been acquired. 278 DocumentsWriterThreadStatePtr getThreadState(const DocumentPtr& doc, const TermPtr& delTerm); 279 280 /// Returns true if the caller (IndexWriter) should now flush. 281 bool addDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer); 282 283 bool updateDocument(const TermPtr& t, const DocumentPtr& doc, const AnalyzerPtr& analyzer); 284 bool updateDocument(const DocumentPtr& doc, const AnalyzerPtr& analyzer, const TermPtr& delTerm); 285 286 int32_t getNumBufferedDeleteTerms(); // for testing 287 MapTermNum getBufferedDeleteTerms(); // for testing 288 289 /// Called whenever a merge has completed and the merged segments had deletions 290 void remapDeletes(const SegmentInfosPtr& infos, Collection< Collection<int32_t> > docMaps, Collection<int32_t> delCounts, const OneMergePtr& merge, int32_t mergeDocCount); 291 292 bool bufferDeleteTerms(Collection<TermPtr> terms); 293 bool bufferDeleteTerm(const TermPtr& term); 294 bool bufferDeleteQueries(Collection<QueryPtr> queries); 295 bool bufferDeleteQuery(const QueryPtr& query); 296 bool deletesFull(); 297 bool doApplyDeletes(); 298 299 void setMaxBufferedDeleteTerms(int32_t maxBufferedDeleteTerms); 300 int32_t getMaxBufferedDeleteTerms(); 301 302 bool hasDeletes(); 303 bool applyDeletes(const SegmentInfosPtr& infos); 304 bool doBalanceRAM(); 305 306 void waitForWaitQueue(); 307 308 int64_t getRAMUsed(); 309 310 IntArray getIntBlock(bool trackAllocations); 311 void bytesAllocated(int64_t numBytes); 312 void bytesUsed(int64_t numBytes); 313 void recycleIntBlocks(Collection<IntArray> blocks, int32_t start, int32_t end); 314 315 CharArray getCharBlock(); 316 void recycleCharBlocks(Collection<CharArray> blocks, int32_t numBlocks); 317 318 String toMB(int64_t v); 319 320 /// We have four pools of RAM: Postings, byte blocks (holds freq/prox posting data), char blocks (holds 321 /// characters in the term) and per-doc buffers (stored fields/term vectors). Different docs require 322 /// varying amount of storage from these four classes. 323 /// 324 /// For example, docs with many unique single-occurrence short terms will use up the Postings 325 /// RAM and hardly any of the other two. Whereas docs with very large terms will use alot of char blocks 326 /// RAM and relatively less of the other two. This method just frees allocations from the pools once we 327 /// are over-budget, which balances the pools to match the current docs. 328 void balanceRAM(); 329 330 protected: 331 /// Reset after a flush 332 void doAfterFlush(); 333 334 bool allThreadsIdle(); 335 336 void waitReady(const DocumentsWriterThreadStatePtr& state); 337 338 bool timeToFlushDeletes(); 339 340 // used only by assert 341 bool checkDeleteTerm(const TermPtr& term); 342 343 bool applyDeletes(const IndexReaderPtr& reader, int32_t docIDStart); 344 void addDeleteTerm(const TermPtr& term, int32_t docCount); 345 346 /// Buffer a specific docID for deletion. Currently only used when we hit a exception when adding a document 347 void addDeleteDocID(int32_t docID); 348 void addDeleteQuery(const QueryPtr& query, int32_t docID); 349 350 /// Does the synchronized work to finish/flush the inverted document. 351 void finishDocument(const DocumentsWriterThreadStatePtr& perThread, const DocWriterPtr& docWriter); 352 353 friend class WaitQueue; 354 }; 355 356 class DocState : public LuceneObject { 357 public: 358 DocState(); 359 virtual ~DocState(); 360 361 LUCENE_CLASS(DocState); 362 363 public: 364 DocumentsWriterWeakPtr _docWriter; 365 AnalyzerPtr analyzer; 366 int32_t maxFieldLength; 367 InfoStreamPtr infoStream; 368 SimilarityPtr similarity; 369 int32_t docID; 370 DocumentPtr doc; 371 String maxTermPrefix; 372 373 public: 374 /// Only called by asserts 375 virtual bool testPoint(const String& name); 376 377 void clear(); 378 }; 379 380 /// RAMFile buffer for DocWriters. 381 class PerDocBuffer : public RAMFile { 382 public: 383 PerDocBuffer(const DocumentsWriterPtr& docWriter); 384 virtual ~PerDocBuffer(); 385 386 LUCENE_CLASS(PerDocBuffer); 387 388 protected: 389 DocumentsWriterWeakPtr _docWriter; 390 391 public: 392 /// Recycle the bytes used. 393 void recycle(); 394 395 protected: 396 /// Allocate bytes used from shared pool. 397 virtual ByteArray newBuffer(int32_t size); 398 }; 399 400 /// Consumer returns this on each doc. This holds any state that must be flushed synchronized 401 /// "in docID order". We gather these and flush them in order. 402 class DocWriter : public LuceneObject { 403 public: 404 DocWriter(); 405 virtual ~DocWriter(); 406 407 LUCENE_CLASS(DocWriter); 408 409 public: 410 DocWriterPtr next; 411 int32_t docID; 412 413 public: 414 virtual void finish() = 0; 415 virtual void abort() = 0; 416 virtual int64_t sizeInBytes() = 0; 417 418 virtual void setNext(const DocWriterPtr& next); 419 }; 420 421 /// The IndexingChain must define the {@link #getChain(DocumentsWriter)} method which returns the DocConsumer 422 /// that the DocumentsWriter calls to process the documents. 423 class IndexingChain : public LuceneObject { 424 public: 425 virtual ~IndexingChain(); 426 427 LUCENE_CLASS(IndexingChain); 428 429 public: 430 virtual DocConsumerPtr getChain(const DocumentsWriterPtr& documentsWriter) = 0; 431 }; 432 433 /// This is the current indexing chain: 434 /// DocConsumer / DocConsumerPerThread 435 /// --> code: DocFieldProcessor / DocFieldProcessorPerThread 436 /// --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField 437 /// --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField 438 /// --> code: DocInverter / DocInverterPerThread / DocInverterPerField 439 /// --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField 440 /// --> code: TermsHash / TermsHashPerThread / TermsHashPerField 441 /// --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField 442 /// --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField 443 /// --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField 444 /// --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField 445 /// --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField 446 /// --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField 447 class DefaultIndexingChain : public IndexingChain { 448 public: 449 virtual ~DefaultIndexingChain(); 450 451 LUCENE_CLASS(DefaultIndexingChain); 452 453 public: 454 virtual DocConsumerPtr getChain(const DocumentsWriterPtr& documentsWriter); 455 }; 456 457 class SkipDocWriter : public DocWriter { 458 public: 459 virtual ~SkipDocWriter(); 460 461 LUCENE_CLASS(SkipDocWriter); 462 463 public: 464 virtual void finish(); 465 virtual void abort(); 466 virtual int64_t sizeInBytes(); 467 }; 468 469 class WaitQueue : public LuceneObject { 470 public: 471 WaitQueue(const DocumentsWriterPtr& docWriter); 472 virtual ~WaitQueue(); 473 474 LUCENE_CLASS(WaitQueue); 475 476 protected: 477 DocumentsWriterWeakPtr _docWriter; 478 479 public: 480 Collection<DocWriterPtr> waiting; 481 int32_t nextWriteDocID; 482 int32_t nextWriteLoc; 483 int32_t numWaiting; 484 int64_t waitingBytes; 485 486 public: 487 void reset(); 488 bool doResume(); 489 bool doPause(); 490 void abort(); 491 bool add(const DocWriterPtr& doc); 492 493 protected: 494 void writeDocument(const DocWriterPtr& doc); 495 }; 496 497 class ByteBlockAllocator : public ByteBlockPoolAllocatorBase { 498 public: 499 ByteBlockAllocator(const DocumentsWriterPtr& docWriter, int32_t blockSize); 500 virtual ~ByteBlockAllocator(); 501 502 LUCENE_CLASS(ByteBlockAllocator); 503 504 protected: 505 DocumentsWriterWeakPtr _docWriter; 506 507 public: 508 int32_t blockSize; 509 Collection<ByteArray> freeByteBlocks; 510 511 public: 512 /// Allocate another byte[] from the shared pool 513 virtual ByteArray getByteBlock(bool trackAllocations); 514 515 /// Return byte[]'s to the pool 516 virtual void recycleByteBlocks(Collection<ByteArray> blocks, int32_t start, int32_t end); 517 virtual void recycleByteBlocks(Collection<ByteArray> blocks); 518 }; 519 520 } 521 522 #endif 523