1 /*******************************************************************************
2  * thrill/data/file.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_FILE_HEADER
13 #define THRILL_DATA_FILE_HEADER
14 
15 #include <thrill/common/function_traits.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/data/block.hpp>
18 #include <thrill/data/block_reader.hpp>
19 #include <thrill/data/block_sink.hpp>
20 #include <thrill/data/block_writer.hpp>
21 #include <thrill/data/dyn_block_reader.hpp>
22 
23 #include <tlx/die.hpp>
24 
25 #include <cassert>
26 #include <deque>
27 #include <functional>
28 #include <limits>
29 #include <string>
30 #include <vector>
31 
32 namespace thrill {
33 namespace data {
34 
35 //! \addtogroup data_layer
36 //! \{
37 
38 class FileBlockSink;
39 class KeepFileBlockSource;
40 class ConsumeFileBlockSource;
41 
42 /*!
43  * A File is an ordered sequence of Block objects for storing items. By using
44  * the Block indirection, the File can be composed using existing Block objects
45  * (via reference counting), but only contain a subset of the items in those
46  * Blocks. This may be used for Zip() and Repartition().
47  *
48  * A File can be written using a BlockWriter instance, which is delivered by
49  * GetWriter(). Thereafter it can be read (multiple times) using a BlockReader,
50  * delivered by GetReader().
51  *
52  * Using a prefixsum over the number of items in a Block, one can seek to the
53  * block contained any item offset in log_2(Blocks) time, though seeking within
54  * the Block goes sequentially.
55  */
56 class File : public BlockSink, public tlx::ReferenceCounter
57 {
58 public:
59     using Writer = BlockWriter<FileBlockSink>;
60     using Reader = DynBlockReader;
61     using KeepReader = BlockReader<KeepFileBlockSource>;
62     using ConsumeReader = BlockReader<ConsumeFileBlockSource>;
63 
64     //! external static variable containing the default number of bytes to
65     //! prefetch in File readers
66     static size_t default_prefetch_size_;
67 
68     //! Constructor from BlockPool
69     File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id);
70 
71     //! non-copyable: delete copy-constructor
72     File(const File&) = delete;
73     //! non-copyable: delete assignment operator
74     File& operator = (const File&) = delete;
75     //! move-constructor: default
76     File(File&&) = default;
77     //! move-assignment operator: default
78     File& operator = (File&&) = default;
79 
80     //! Return a copy of the File (explicit copy-constructor)
81     File Copy() const;
82 
83     //! \name Methods of a BlockSink
84     //! \{
85 
86     //! Append a block to this file, the block must contain given number of
87     //! items after the offset first.
AppendBlock(const Block & b)88     void AppendBlock(const Block& b) {
89         if (b.size() == 0) return;
90         num_items_sum_.push_back(num_items() + b.num_items());
91         size_bytes_ += b.size();
92         stats_bytes_ += b.size();
93         stats_items_ += b.num_items();
94         blocks_.push_back(b);
95     }
96 
97     //! Append a block to this file, the block must contain given number of
98     //! items after the offset first.
AppendBlock(Block && b)99     void AppendBlock(Block&& b) {
100         if (b.size() == 0) return;
101         num_items_sum_.push_back(num_items() + b.num_items());
102         size_bytes_ += b.size();
103         stats_bytes_ += b.size();
104         stats_items_ += b.num_items();
105         blocks_.emplace_back(std::move(b));
106     }
107 
108     //! Append a block to this file, the block must contain given number of
109     //! items after the offset first.
AppendBlock(const Block & b,bool)110     void AppendBlock(const Block& b, bool /* is_last_block */) final {
111         return AppendBlock(b);
112     }
113 
114     //! Append a block to this file, the block must contain given number of
115     //! items after the offset first.
AppendBlock(Block && b,bool)116     void AppendBlock(Block&& b, bool /* is_last_block */) final {
117         return AppendBlock(std::move(b));
118     }
119 
120     void Close() final;
121 
122     //! write out stats
123     ~File();
124 
125     //! Free all Blocks in the File and deallocate vectors
126     void Clear();
127 
128     //! boolean flag whether to check if AllocateByteBlock can fail in any
129     //! subclass (if false: accelerate BlockWriter to not be able to cope with
130     //! nullptr).
131     static constexpr bool allocate_can_fail_ = false;
132 
133     //! \}
134 
135     //! \name Writers and Readers
136     //! \{
137 
138     //! Get BlockWriter.
139     Writer GetWriter(size_t block_size = default_block_size);
140 
141     /*!
142      * Get BlockReader or a consuming BlockReader for beginning of File
143      *
144      * \attention If consume is true, the reader consumes the File's contents
145      * UNCONDITIONALLY, the File will always be emptied whether all items were
146      * read via the Reader or not.
147      */
148     Reader GetReader(
149         bool consume, size_t prefetch_size = File::default_prefetch_size_);
150 
151     //! Get BlockReader for beginning of File
152     KeepReader GetKeepReader(
153         size_t prefetch_size = File::default_prefetch_size_) const;
154 
155     /*!
156      * Get consuming BlockReader for beginning of File
157      *
158      * \attention The reader consumes the File's contents UNCONDITIONALLY, the
159      * File will always be emptied whether all items were read via the Reader or
160      * not.
161      */
162     ConsumeReader GetConsumeReader(
163         size_t prefetch_size = File::default_prefetch_size_);
164 
165     //! Get BlockReader seeked to the corresponding item index
166     template <typename ItemType>
167     KeepReader GetReaderAt(
168         size_t index, size_t prefetch = default_prefetch_size_) const;
169 
170     //! Read complete File into a std::string, obviously, this should only be
171     //! used for debugging!
172     std::string ReadComplete() const;
173 
174     //! \}
175 
176     //! Return the number of blocks
num_blocks() const177     size_t num_blocks() const { return blocks_.size(); }
178 
179     //! Return the number of items in the file
num_items() const180     size_t num_items() const {
181         return num_items_sum_.size() ? num_items_sum_.back() : 0;
182     }
183 
184     //! Returns true if the File is empty.
empty() const185     bool empty() const { return blocks_.empty(); }
186 
187     //! Return the number of bytes of user data in this file.
size_bytes() const188     size_t size_bytes() const { return size_bytes_; }
189 
190     //! Return reference to a block
block(size_t i) const191     const Block& block(size_t i) const {
192         assert(i < blocks_.size());
193         return blocks_[i];
194     }
195 
196     //! Returns constant reference to all Blocks in the File.
blocks() const197     const std::deque<Block>& blocks() const { return blocks_; }
198 
199     //! Return number of items starting in block i
ItemsStartIn(size_t i) const200     size_t ItemsStartIn(size_t i) const {
201         assert(i < blocks_.size());
202         return num_items_sum_[i] - (i == 0 ? 0 : num_items_sum_[i - 1]);
203     }
204 
205     //! Get item at the corresponding position. Do not use this
206     //! method for reading multiple successive items.
207     template <typename ItemType>
208     ItemType GetItemAt(size_t index) const;
209 
210     /*!
211      * Get index of the given item, or the next greater item, in this file. The
212      * file has to be ordered according to the given compare function. The tie
213      * value can be used to make a decision in case of many successive equal
214      * elements.  The tie is compared with the local rank of the element.
215      *
216      * WARNING: This method uses GetItemAt combined with a binary search and is
217      * therefore not efficient. The method should be reimplemented in near
218      * future.
219      */
220     template <typename ItemType, typename CompareFunction = std::less<ItemType> >
221     size_t GetIndexOf(const ItemType& item, size_t tie,
222                       size_t left, size_t right,
223                       const CompareFunction& func = CompareFunction()) const;
224 
225     /*!
226      * Get index of the given item, or the next greater item, in this file. The
227      * file has to be ordered according to the given compare function. The tie
228      * value can be used to make a decision in case of many successive equal
229      * elements.  The tie is compared with the local rank of the element.
230      *
231      * WARNING: This method uses GetItemAt combined with a binary search and is
232      * therefore not efficient. The method should be reimplemented in near
233      * future.
234      */
235     template <typename ItemType, typename CompareFunction = std::less<ItemType> >
GetIndexOf(const ItemType & item,size_t tie,const CompareFunction & less=CompareFunction ()) const236     size_t GetIndexOf(const ItemType& item, size_t tie,
237                       const CompareFunction& less = CompareFunction()) const {
238         // start binary search with range [0,num_items)
239         return GetIndexOf(item, tie, 0, num_items(), less);
240     }
241 
242     //! Seek in File: return a Block range containing items begin, end of
243     //! given type.
244     template <typename ItemType>
245     std::vector<Block> GetItemRange(size_t begin, size_t end) const;
246 
247     //! Output the Block objects contained in this File.
248     friend std::ostream& operator << (std::ostream& os, const File& f);
249 
250     //! change dia_id after construction (needed because it may be unknown at
251     //! construction)
set_dia_id(size_t dia_id)252     void set_dia_id(size_t dia_id) { dia_id_ = dia_id; }
253 
254 private:
255     //! unique file id
256     size_t id_;
257 
258     //! optionally associated DIANode id
259     size_t dia_id_;
260 
261     //! container holding Blocks and thus shared pointers to all byte blocks.
262     std::deque<Block> blocks_;
263 
264     //! inclusive prefixsum of number of elements of blocks, hence
265     //! num_items_sum_[i] is the number of items starting in all blocks
266     //! preceding and including the i-th block.
267     std::deque<size_t> num_items_sum_;
268 
269     //! Total size of this file in bytes. Sum of all block sizes.
270     size_t size_bytes_ = 0;
271 
272     //! Total number of bytes stored in the File by a Writer: for stats, never
273     //! decreases.
274     size_t stats_bytes_ = 0;
275 
276     //! Total number of items stored in the File by a Writer: for stats, never
277     //! decreases.
278     size_t stats_items_ = 0;
279 
280     //! for access to blocks_ and num_items_sum_
281     friend class KeepFileBlockSource;
282     friend class ConsumeFileBlockSource;
283 };
284 
285 using FilePtr = tlx::CountingPtr<File>;
286 
287 /*!
288  * BlockSink which interfaces to a File
289  */
290 class FileBlockSink final : public BlockSink
291 {
292     static constexpr bool debug = false;
293 
294 public:
FileBlockSink()295     FileBlockSink()
296         : BlockSink(nullptr, -1), file_(nullptr)
297     { }
298 
FileBlockSink(tlx::CountingPtrNoDelete<File> file)299     explicit FileBlockSink(tlx::CountingPtrNoDelete<File> file)
300         : BlockSink(file->block_pool(), file->local_worker_id()),
301           file_(std::move(file)) {
302         LOG << "FileBlockSink() new for " << file_.get();
303     }
304 
305     //! default copy-constructor
306     FileBlockSink(const FileBlockSink&) = default;
307     //! default assignment operator
308     FileBlockSink& operator = (const FileBlockSink&) = default;
309 
~FileBlockSink()310     ~FileBlockSink() {
311         LOG << "~FileBlockSink() for " << file_.get();
312     }
313 
314     //! \name Methods of a BlockSink
315     //! \{
316 
317     //! Append a block to this file, the block must contain given number of
318     //! items after the offset first.
AppendBlock(const Block & b,bool is_last_block)319     void AppendBlock(const Block& b, bool is_last_block) final {
320         assert(file_);
321         return file_->AppendBlock(b, is_last_block);
322     }
323 
324     //! Append a block to this file, the block must contain given number of
325     //! items after the offset first.
AppendBlock(Block && b,bool is_last_block)326     void AppendBlock(Block&& b, bool is_last_block) final {
327         assert(file_);
328         return file_->AppendBlock(std::move(b), is_last_block);
329     }
330 
Close()331     void Close() final {
332         if (file_) {
333             file_->Close();
334             file_.reset();
335         }
336     }
337 
338     //! \}
339 
340 private:
341     tlx::CountingPtrNoDelete<File> file_;
342 };
343 
344 /*!
345  * A BlockSource to read Blocks from a File. The KeepFileBlockSource mainly
346  * contains an index to the current block, which is incremented when the
347  * NextBlock() must be delivered.
348  */
349 class KeepFileBlockSource
350 {
351 public:
352     //! Start reading a File
353     KeepFileBlockSource(
354         const File& file, size_t local_worker_id,
355         size_t prefetch_size = File::default_prefetch_size_,
356         size_t first_block = 0, size_t first_item = keep_first_item);
357 
358     //! Perform prefetch
359     void Prefetch(size_t prefetch_size);
360 
361     //! Advance to next block of file, delivers current_ and end_ for
362     //! BlockReader
363     PinnedBlock NextBlock();
364 
365     //! Get next block unpinned, used by GetItemBatch to read Blocks without
366     //! pinning them
367     Block NextBlockUnpinned();
368 
369     //! Acquire Pin for Block returned from NextBlockUnpinned
370     PinnedBlock AcquirePin(const Block& block);
371 
372 protected:
373     //! Determine current unpinned Block to deliver via NextBlock() or
374     //! NextBlockUnpinned().
375     Block MakeNextBlock();
376 
377 private:
378     //! sentinel value for not changing the first_item item
379     static constexpr size_t keep_first_item = size_t(-1);
380 
381     //! file to read blocks from
382     const File& file_;
383 
384     //! local worker id reading the File
385     size_t local_worker_id_;
386 
387     //! number of bytes of prefetch for reader
388     size_t prefetch_size_;
389 
390     //! current prefetch operations
391     std::deque<PinRequestPtr> fetching_blocks_;
392 
393     //! current number of bytes in prefetch
394     size_t fetching_bytes_;
395 
396     //! number of the first block
397     size_t first_block_;
398 
399     //! index of current block.
400     size_t current_block_;
401 
402     //! offset of first item in first block read
403     size_t first_item_;
404 };
405 
406 /*!
407  * A BlockSource to read and simultaneously consume Blocks from a File. The
408  * ConsumeFileBlockSource always returns the first block of the File and removes
409  * it, hence, consuming Blocks from the File.
410  *
411  * \attention The reader consumes the File's contents UNCONDITIONALLY, the File
412  * will always be emptied whether all items were read via the Reader or not.
413  */
414 class ConsumeFileBlockSource
415 {
416 public:
417     //! Start reading a File. Creates a source for the given file and set the
418     //! number of blocks that should be prefetched. 0 means that no blocks are
419     //! prefetched.
420     ConsumeFileBlockSource(
421         File* file, size_t local_worker_id,
422         size_t prefetch_size = File::default_prefetch_size_);
423 
424     //! non-copyable: delete copy-constructor
425     ConsumeFileBlockSource(const ConsumeFileBlockSource&) = delete;
426     //! non-copyable: delete assignment operator
427     ConsumeFileBlockSource& operator = (const ConsumeFileBlockSource&) = delete;
428     //! move-constructor: default
429     ConsumeFileBlockSource(ConsumeFileBlockSource&& s);
430 
431     //! Perform prefetch
432     void Prefetch(size_t prefetch_size);
433 
434     //! Get the next block of file.
435     PinnedBlock NextBlock();
436 
437     //! Get next block unpinned, used by GetItemBatch to read Blocks without
438     //! pinning them
439     Block NextBlockUnpinned();
440 
441     //! Acquire Pin for Block returned from NextBlockUnpinned
442     PinnedBlock AcquirePin(const Block& block);
443 
444     //! Consume unread blocks and reset File to zero items.
445     ~ConsumeFileBlockSource();
446 
447 private:
448     //! file to consume blocks from (ptr to make moving easier)
449     File* file_;
450 
451     //! local worker id reading the File
452     size_t local_worker_id_;
453 
454     //! number of bytes of prefetch for reader
455     size_t prefetch_size_;
456 
457     //! current prefetch operations
458     std::deque<PinRequestPtr> fetching_blocks_;
459 
460     //! current number of bytes in prefetch
461     size_t fetching_bytes_;
462 };
463 
464 //! Get BlockReader seeked to the corresponding item index
465 template <typename ItemType>
466 typename File::KeepReader
GetReaderAt(size_t index,size_t prefetch_size) const467 File::GetReaderAt(size_t index, size_t prefetch_size) const {
468     static constexpr bool debug = false;
469 
470     // perform binary search for item block with largest exclusive size
471     // prefixsum less or equal to index.
472     auto it =
473         std::lower_bound(num_items_sum_.begin(), num_items_sum_.end(), index);
474 
475     if (it == num_items_sum_.end())
476         die("Access beyond end of File?");
477 
478     size_t begin_block = it - num_items_sum_.begin();
479 
480     sLOG << "File::GetReaderAt()"
481          << "item" << index << "in block" << begin_block
482          << "psum" << num_items_sum_[begin_block]
483          << "first_item" << blocks_[begin_block].first_item_absolute();
484 
485     // start Reader at given first valid item in located block
486     KeepReader fr(
487         KeepFileBlockSource(*this, local_worker_id_, prefetch_size,
488                             begin_block,
489                             blocks_[begin_block].first_item_absolute()));
490 
491     // skip over extra items in beginning of block
492     size_t items_before = it == num_items_sum_.begin() ? 0 : *(it - 1);
493 
494     sLOG << "File::GetReaderAt()"
495          << "items_before" << items_before << "index" << index
496          << "delta" << (index - items_before);
497     assert(items_before <= index);
498 
499     // use fixed_size information to accelerate jump.
500     if (Serialization<KeepReader, ItemType>::is_fixed_size)
501     {
502         // fetch a Block to get typecode_verify flag
503         fr.HasNext();
504 
505         const size_t skip_items = index - items_before;
506         const size_t bytes_per_item =
507             (fr.typecode_verify() ? sizeof(size_t) : 0)
508             + Serialization<KeepReader, ItemType>::fixed_size;
509 
510         fr.Skip(skip_items, skip_items * bytes_per_item);
511     }
512     else
513     {
514         for (size_t i = items_before; i < index; ++i) {
515             if (!fr.HasNext())
516                 die("Underflow in GetItemRange()");
517             fr.template Next<ItemType>();
518         }
519     }
520 
521     sLOG << "File::GetReaderAt()"
522          << "after seek at" << fr.CopyBlock();
523 
524     return fr;
525 }
526 
527 template <typename ItemType>
GetItemAt(size_t index) const528 ItemType File::GetItemAt(size_t index) const {
529     KeepReader reader = this->GetReaderAt<ItemType>(index, /* prefetch */ 0);
530     return reader.Next<ItemType>();
531 }
532 
533 template <typename ItemType, typename CompareFunction>
GetIndexOf(const ItemType & item,size_t tie,size_t left,size_t right,const CompareFunction & less) const534 size_t File::GetIndexOf(
535     const ItemType& item, size_t tie, size_t left, size_t right,
536     const CompareFunction& less) const {
537 
538     static constexpr bool debug = false;
539 
540     static_assert(
541         std::is_convertible<
542             bool, typename common::FunctionTraits<CompareFunction>::result_type
543             >::value,
544         "Comperator must return boolean.");
545 
546     LOG << "File::GetIndexOf() looking for item " << item << " tie " << tie
547         << " in range [" << left << "," << right << ") ="
548         << " size " << right - left;
549 
550     assert(left <= right);
551     assert(left <= num_items());
552     assert(right <= num_items());
553 
554     // Use a binary search to find the item.
555     while (left < right) {
556         size_t mid = (right + left) >> 1;
557         LOG << "left: " << left << "right: " << right << "mid: " << mid;
558         ItemType cur = GetItemAt<ItemType>(mid);
559         LOG << "Item at mid: " << cur;
560         if (less(item, cur) ||
561             (!less(item, cur) && !less(cur, item) && tie <= mid)) {
562             right = mid;
563         }
564         else {
565             left = mid + 1;
566         }
567     }
568 
569     LOG << "found insert position at: " << left;
570     return left;
571 }
572 
573 //! Seek in File: return a Block range containing items begin, end of
574 //! given type.
575 template <typename ItemType>
GetItemRange(size_t begin,size_t end) const576 std::vector<Block> File::GetItemRange(size_t begin, size_t end) const {
577     assert(begin <= end);
578     // deliver array of remaining blocks
579     return GetReaderAt<ItemType>(begin)
580            .template GetItemBatch<ItemType>(end - begin);
581 }
582 
583 //! Take a vector of Readers and prefetch equally from them
584 template <typename Reader>
StartPrefetch(std::vector<Reader> & readers,size_t prefetch_size)585 void StartPrefetch(std::vector<Reader>& readers, size_t prefetch_size) {
586     for (size_t p = default_block_size; p < prefetch_size;
587          p += default_block_size)
588     {
589         for (Reader& r : readers)
590             r.source().Prefetch(p);
591     }
592     for (Reader& r : readers)
593         r.source().Prefetch(prefetch_size);
594 }
595 
596 //! \}
597 
598 } // namespace data
599 } // namespace thrill
600 
601 #endif // !THRILL_DATA_FILE_HEADER
602 
603 /******************************************************************************/
604