1 /*******************************************************************************
2 * thrill/data/file.hpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #pragma once
12 #ifndef THRILL_DATA_FILE_HEADER
13 #define THRILL_DATA_FILE_HEADER
14
15 #include <thrill/common/function_traits.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/data/block.hpp>
18 #include <thrill/data/block_reader.hpp>
19 #include <thrill/data/block_sink.hpp>
20 #include <thrill/data/block_writer.hpp>
21 #include <thrill/data/dyn_block_reader.hpp>
22
23 #include <tlx/die.hpp>
24
25 #include <cassert>
26 #include <deque>
27 #include <functional>
28 #include <limits>
29 #include <string>
30 #include <vector>
31
32 namespace thrill {
33 namespace data {
34
35 //! \addtogroup data_layer
36 //! \{
37
38 class FileBlockSink;
39 class KeepFileBlockSource;
40 class ConsumeFileBlockSource;
41
42 /*!
43 * A File is an ordered sequence of Block objects for storing items. By using
44 * the Block indirection, the File can be composed using existing Block objects
45 * (via reference counting), but only contain a subset of the items in those
46 * Blocks. This may be used for Zip() and Repartition().
47 *
48 * A File can be written using a BlockWriter instance, which is delivered by
49 * GetWriter(). Thereafter it can be read (multiple times) using a BlockReader,
50 * delivered by GetReader().
51 *
52 * Using a prefixsum over the number of items in a Block, one can seek to the
53 * block contained any item offset in log_2(Blocks) time, though seeking within
54 * the Block goes sequentially.
55 */
56 class File : public BlockSink, public tlx::ReferenceCounter
57 {
58 public:
59 using Writer = BlockWriter<FileBlockSink>;
60 using Reader = DynBlockReader;
61 using KeepReader = BlockReader<KeepFileBlockSource>;
62 using ConsumeReader = BlockReader<ConsumeFileBlockSource>;
63
64 //! external static variable containing the default number of bytes to
65 //! prefetch in File readers
66 static size_t default_prefetch_size_;
67
68 //! Constructor from BlockPool
69 File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id);
70
71 //! non-copyable: delete copy-constructor
72 File(const File&) = delete;
73 //! non-copyable: delete assignment operator
74 File& operator = (const File&) = delete;
75 //! move-constructor: default
76 File(File&&) = default;
77 //! move-assignment operator: default
78 File& operator = (File&&) = default;
79
80 //! Return a copy of the File (explicit copy-constructor)
81 File Copy() const;
82
83 //! \name Methods of a BlockSink
84 //! \{
85
86 //! Append a block to this file, the block must contain given number of
87 //! items after the offset first.
AppendBlock(const Block & b)88 void AppendBlock(const Block& b) {
89 if (b.size() == 0) return;
90 num_items_sum_.push_back(num_items() + b.num_items());
91 size_bytes_ += b.size();
92 stats_bytes_ += b.size();
93 stats_items_ += b.num_items();
94 blocks_.push_back(b);
95 }
96
97 //! Append a block to this file, the block must contain given number of
98 //! items after the offset first.
AppendBlock(Block && b)99 void AppendBlock(Block&& b) {
100 if (b.size() == 0) return;
101 num_items_sum_.push_back(num_items() + b.num_items());
102 size_bytes_ += b.size();
103 stats_bytes_ += b.size();
104 stats_items_ += b.num_items();
105 blocks_.emplace_back(std::move(b));
106 }
107
108 //! Append a block to this file, the block must contain given number of
109 //! items after the offset first.
AppendBlock(const Block & b,bool)110 void AppendBlock(const Block& b, bool /* is_last_block */) final {
111 return AppendBlock(b);
112 }
113
114 //! Append a block to this file, the block must contain given number of
115 //! items after the offset first.
AppendBlock(Block && b,bool)116 void AppendBlock(Block&& b, bool /* is_last_block */) final {
117 return AppendBlock(std::move(b));
118 }
119
120 void Close() final;
121
122 //! write out stats
123 ~File();
124
125 //! Free all Blocks in the File and deallocate vectors
126 void Clear();
127
128 //! boolean flag whether to check if AllocateByteBlock can fail in any
129 //! subclass (if false: accelerate BlockWriter to not be able to cope with
130 //! nullptr).
131 static constexpr bool allocate_can_fail_ = false;
132
133 //! \}
134
135 //! \name Writers and Readers
136 //! \{
137
138 //! Get BlockWriter.
139 Writer GetWriter(size_t block_size = default_block_size);
140
141 /*!
142 * Get BlockReader or a consuming BlockReader for beginning of File
143 *
144 * \attention If consume is true, the reader consumes the File's contents
145 * UNCONDITIONALLY, the File will always be emptied whether all items were
146 * read via the Reader or not.
147 */
148 Reader GetReader(
149 bool consume, size_t prefetch_size = File::default_prefetch_size_);
150
151 //! Get BlockReader for beginning of File
152 KeepReader GetKeepReader(
153 size_t prefetch_size = File::default_prefetch_size_) const;
154
155 /*!
156 * Get consuming BlockReader for beginning of File
157 *
158 * \attention The reader consumes the File's contents UNCONDITIONALLY, the
159 * File will always be emptied whether all items were read via the Reader or
160 * not.
161 */
162 ConsumeReader GetConsumeReader(
163 size_t prefetch_size = File::default_prefetch_size_);
164
165 //! Get BlockReader seeked to the corresponding item index
166 template <typename ItemType>
167 KeepReader GetReaderAt(
168 size_t index, size_t prefetch = default_prefetch_size_) const;
169
170 //! Read complete File into a std::string, obviously, this should only be
171 //! used for debugging!
172 std::string ReadComplete() const;
173
174 //! \}
175
176 //! Return the number of blocks
num_blocks() const177 size_t num_blocks() const { return blocks_.size(); }
178
179 //! Return the number of items in the file
num_items() const180 size_t num_items() const {
181 return num_items_sum_.size() ? num_items_sum_.back() : 0;
182 }
183
184 //! Returns true if the File is empty.
empty() const185 bool empty() const { return blocks_.empty(); }
186
187 //! Return the number of bytes of user data in this file.
size_bytes() const188 size_t size_bytes() const { return size_bytes_; }
189
190 //! Return reference to a block
block(size_t i) const191 const Block& block(size_t i) const {
192 assert(i < blocks_.size());
193 return blocks_[i];
194 }
195
196 //! Returns constant reference to all Blocks in the File.
blocks() const197 const std::deque<Block>& blocks() const { return blocks_; }
198
199 //! Return number of items starting in block i
ItemsStartIn(size_t i) const200 size_t ItemsStartIn(size_t i) const {
201 assert(i < blocks_.size());
202 return num_items_sum_[i] - (i == 0 ? 0 : num_items_sum_[i - 1]);
203 }
204
205 //! Get item at the corresponding position. Do not use this
206 //! method for reading multiple successive items.
207 template <typename ItemType>
208 ItemType GetItemAt(size_t index) const;
209
210 /*!
211 * Get index of the given item, or the next greater item, in this file. The
212 * file has to be ordered according to the given compare function. The tie
213 * value can be used to make a decision in case of many successive equal
214 * elements. The tie is compared with the local rank of the element.
215 *
216 * WARNING: This method uses GetItemAt combined with a binary search and is
217 * therefore not efficient. The method should be reimplemented in near
218 * future.
219 */
220 template <typename ItemType, typename CompareFunction = std::less<ItemType> >
221 size_t GetIndexOf(const ItemType& item, size_t tie,
222 size_t left, size_t right,
223 const CompareFunction& func = CompareFunction()) const;
224
225 /*!
226 * Get index of the given item, or the next greater item, in this file. The
227 * file has to be ordered according to the given compare function. The tie
228 * value can be used to make a decision in case of many successive equal
229 * elements. The tie is compared with the local rank of the element.
230 *
231 * WARNING: This method uses GetItemAt combined with a binary search and is
232 * therefore not efficient. The method should be reimplemented in near
233 * future.
234 */
235 template <typename ItemType, typename CompareFunction = std::less<ItemType> >
GetIndexOf(const ItemType & item,size_t tie,const CompareFunction & less=CompareFunction ()) const236 size_t GetIndexOf(const ItemType& item, size_t tie,
237 const CompareFunction& less = CompareFunction()) const {
238 // start binary search with range [0,num_items)
239 return GetIndexOf(item, tie, 0, num_items(), less);
240 }
241
242 //! Seek in File: return a Block range containing items begin, end of
243 //! given type.
244 template <typename ItemType>
245 std::vector<Block> GetItemRange(size_t begin, size_t end) const;
246
247 //! Output the Block objects contained in this File.
248 friend std::ostream& operator << (std::ostream& os, const File& f);
249
250 //! change dia_id after construction (needed because it may be unknown at
251 //! construction)
set_dia_id(size_t dia_id)252 void set_dia_id(size_t dia_id) { dia_id_ = dia_id; }
253
254 private:
255 //! unique file id
256 size_t id_;
257
258 //! optionally associated DIANode id
259 size_t dia_id_;
260
261 //! container holding Blocks and thus shared pointers to all byte blocks.
262 std::deque<Block> blocks_;
263
264 //! inclusive prefixsum of number of elements of blocks, hence
265 //! num_items_sum_[i] is the number of items starting in all blocks
266 //! preceding and including the i-th block.
267 std::deque<size_t> num_items_sum_;
268
269 //! Total size of this file in bytes. Sum of all block sizes.
270 size_t size_bytes_ = 0;
271
272 //! Total number of bytes stored in the File by a Writer: for stats, never
273 //! decreases.
274 size_t stats_bytes_ = 0;
275
276 //! Total number of items stored in the File by a Writer: for stats, never
277 //! decreases.
278 size_t stats_items_ = 0;
279
280 //! for access to blocks_ and num_items_sum_
281 friend class KeepFileBlockSource;
282 friend class ConsumeFileBlockSource;
283 };
284
285 using FilePtr = tlx::CountingPtr<File>;
286
287 /*!
288 * BlockSink which interfaces to a File
289 */
290 class FileBlockSink final : public BlockSink
291 {
292 static constexpr bool debug = false;
293
294 public:
FileBlockSink()295 FileBlockSink()
296 : BlockSink(nullptr, -1), file_(nullptr)
297 { }
298
FileBlockSink(tlx::CountingPtrNoDelete<File> file)299 explicit FileBlockSink(tlx::CountingPtrNoDelete<File> file)
300 : BlockSink(file->block_pool(), file->local_worker_id()),
301 file_(std::move(file)) {
302 LOG << "FileBlockSink() new for " << file_.get();
303 }
304
305 //! default copy-constructor
306 FileBlockSink(const FileBlockSink&) = default;
307 //! default assignment operator
308 FileBlockSink& operator = (const FileBlockSink&) = default;
309
~FileBlockSink()310 ~FileBlockSink() {
311 LOG << "~FileBlockSink() for " << file_.get();
312 }
313
314 //! \name Methods of a BlockSink
315 //! \{
316
317 //! Append a block to this file, the block must contain given number of
318 //! items after the offset first.
AppendBlock(const Block & b,bool is_last_block)319 void AppendBlock(const Block& b, bool is_last_block) final {
320 assert(file_);
321 return file_->AppendBlock(b, is_last_block);
322 }
323
324 //! Append a block to this file, the block must contain given number of
325 //! items after the offset first.
AppendBlock(Block && b,bool is_last_block)326 void AppendBlock(Block&& b, bool is_last_block) final {
327 assert(file_);
328 return file_->AppendBlock(std::move(b), is_last_block);
329 }
330
Close()331 void Close() final {
332 if (file_) {
333 file_->Close();
334 file_.reset();
335 }
336 }
337
338 //! \}
339
340 private:
341 tlx::CountingPtrNoDelete<File> file_;
342 };
343
344 /*!
345 * A BlockSource to read Blocks from a File. The KeepFileBlockSource mainly
346 * contains an index to the current block, which is incremented when the
347 * NextBlock() must be delivered.
348 */
349 class KeepFileBlockSource
350 {
351 public:
352 //! Start reading a File
353 KeepFileBlockSource(
354 const File& file, size_t local_worker_id,
355 size_t prefetch_size = File::default_prefetch_size_,
356 size_t first_block = 0, size_t first_item = keep_first_item);
357
358 //! Perform prefetch
359 void Prefetch(size_t prefetch_size);
360
361 //! Advance to next block of file, delivers current_ and end_ for
362 //! BlockReader
363 PinnedBlock NextBlock();
364
365 //! Get next block unpinned, used by GetItemBatch to read Blocks without
366 //! pinning them
367 Block NextBlockUnpinned();
368
369 //! Acquire Pin for Block returned from NextBlockUnpinned
370 PinnedBlock AcquirePin(const Block& block);
371
372 protected:
373 //! Determine current unpinned Block to deliver via NextBlock() or
374 //! NextBlockUnpinned().
375 Block MakeNextBlock();
376
377 private:
378 //! sentinel value for not changing the first_item item
379 static constexpr size_t keep_first_item = size_t(-1);
380
381 //! file to read blocks from
382 const File& file_;
383
384 //! local worker id reading the File
385 size_t local_worker_id_;
386
387 //! number of bytes of prefetch for reader
388 size_t prefetch_size_;
389
390 //! current prefetch operations
391 std::deque<PinRequestPtr> fetching_blocks_;
392
393 //! current number of bytes in prefetch
394 size_t fetching_bytes_;
395
396 //! number of the first block
397 size_t first_block_;
398
399 //! index of current block.
400 size_t current_block_;
401
402 //! offset of first item in first block read
403 size_t first_item_;
404 };
405
406 /*!
407 * A BlockSource to read and simultaneously consume Blocks from a File. The
408 * ConsumeFileBlockSource always returns the first block of the File and removes
409 * it, hence, consuming Blocks from the File.
410 *
411 * \attention The reader consumes the File's contents UNCONDITIONALLY, the File
412 * will always be emptied whether all items were read via the Reader or not.
413 */
414 class ConsumeFileBlockSource
415 {
416 public:
417 //! Start reading a File. Creates a source for the given file and set the
418 //! number of blocks that should be prefetched. 0 means that no blocks are
419 //! prefetched.
420 ConsumeFileBlockSource(
421 File* file, size_t local_worker_id,
422 size_t prefetch_size = File::default_prefetch_size_);
423
424 //! non-copyable: delete copy-constructor
425 ConsumeFileBlockSource(const ConsumeFileBlockSource&) = delete;
426 //! non-copyable: delete assignment operator
427 ConsumeFileBlockSource& operator = (const ConsumeFileBlockSource&) = delete;
428 //! move-constructor: default
429 ConsumeFileBlockSource(ConsumeFileBlockSource&& s);
430
431 //! Perform prefetch
432 void Prefetch(size_t prefetch_size);
433
434 //! Get the next block of file.
435 PinnedBlock NextBlock();
436
437 //! Get next block unpinned, used by GetItemBatch to read Blocks without
438 //! pinning them
439 Block NextBlockUnpinned();
440
441 //! Acquire Pin for Block returned from NextBlockUnpinned
442 PinnedBlock AcquirePin(const Block& block);
443
444 //! Consume unread blocks and reset File to zero items.
445 ~ConsumeFileBlockSource();
446
447 private:
448 //! file to consume blocks from (ptr to make moving easier)
449 File* file_;
450
451 //! local worker id reading the File
452 size_t local_worker_id_;
453
454 //! number of bytes of prefetch for reader
455 size_t prefetch_size_;
456
457 //! current prefetch operations
458 std::deque<PinRequestPtr> fetching_blocks_;
459
460 //! current number of bytes in prefetch
461 size_t fetching_bytes_;
462 };
463
464 //! Get BlockReader seeked to the corresponding item index
465 template <typename ItemType>
466 typename File::KeepReader
GetReaderAt(size_t index,size_t prefetch_size) const467 File::GetReaderAt(size_t index, size_t prefetch_size) const {
468 static constexpr bool debug = false;
469
470 // perform binary search for item block with largest exclusive size
471 // prefixsum less or equal to index.
472 auto it =
473 std::lower_bound(num_items_sum_.begin(), num_items_sum_.end(), index);
474
475 if (it == num_items_sum_.end())
476 die("Access beyond end of File?");
477
478 size_t begin_block = it - num_items_sum_.begin();
479
480 sLOG << "File::GetReaderAt()"
481 << "item" << index << "in block" << begin_block
482 << "psum" << num_items_sum_[begin_block]
483 << "first_item" << blocks_[begin_block].first_item_absolute();
484
485 // start Reader at given first valid item in located block
486 KeepReader fr(
487 KeepFileBlockSource(*this, local_worker_id_, prefetch_size,
488 begin_block,
489 blocks_[begin_block].first_item_absolute()));
490
491 // skip over extra items in beginning of block
492 size_t items_before = it == num_items_sum_.begin() ? 0 : *(it - 1);
493
494 sLOG << "File::GetReaderAt()"
495 << "items_before" << items_before << "index" << index
496 << "delta" << (index - items_before);
497 assert(items_before <= index);
498
499 // use fixed_size information to accelerate jump.
500 if (Serialization<KeepReader, ItemType>::is_fixed_size)
501 {
502 // fetch a Block to get typecode_verify flag
503 fr.HasNext();
504
505 const size_t skip_items = index - items_before;
506 const size_t bytes_per_item =
507 (fr.typecode_verify() ? sizeof(size_t) : 0)
508 + Serialization<KeepReader, ItemType>::fixed_size;
509
510 fr.Skip(skip_items, skip_items * bytes_per_item);
511 }
512 else
513 {
514 for (size_t i = items_before; i < index; ++i) {
515 if (!fr.HasNext())
516 die("Underflow in GetItemRange()");
517 fr.template Next<ItemType>();
518 }
519 }
520
521 sLOG << "File::GetReaderAt()"
522 << "after seek at" << fr.CopyBlock();
523
524 return fr;
525 }
526
527 template <typename ItemType>
GetItemAt(size_t index) const528 ItemType File::GetItemAt(size_t index) const {
529 KeepReader reader = this->GetReaderAt<ItemType>(index, /* prefetch */ 0);
530 return reader.Next<ItemType>();
531 }
532
533 template <typename ItemType, typename CompareFunction>
GetIndexOf(const ItemType & item,size_t tie,size_t left,size_t right,const CompareFunction & less) const534 size_t File::GetIndexOf(
535 const ItemType& item, size_t tie, size_t left, size_t right,
536 const CompareFunction& less) const {
537
538 static constexpr bool debug = false;
539
540 static_assert(
541 std::is_convertible<
542 bool, typename common::FunctionTraits<CompareFunction>::result_type
543 >::value,
544 "Comperator must return boolean.");
545
546 LOG << "File::GetIndexOf() looking for item " << item << " tie " << tie
547 << " in range [" << left << "," << right << ") ="
548 << " size " << right - left;
549
550 assert(left <= right);
551 assert(left <= num_items());
552 assert(right <= num_items());
553
554 // Use a binary search to find the item.
555 while (left < right) {
556 size_t mid = (right + left) >> 1;
557 LOG << "left: " << left << "right: " << right << "mid: " << mid;
558 ItemType cur = GetItemAt<ItemType>(mid);
559 LOG << "Item at mid: " << cur;
560 if (less(item, cur) ||
561 (!less(item, cur) && !less(cur, item) && tie <= mid)) {
562 right = mid;
563 }
564 else {
565 left = mid + 1;
566 }
567 }
568
569 LOG << "found insert position at: " << left;
570 return left;
571 }
572
573 //! Seek in File: return a Block range containing items begin, end of
574 //! given type.
575 template <typename ItemType>
GetItemRange(size_t begin,size_t end) const576 std::vector<Block> File::GetItemRange(size_t begin, size_t end) const {
577 assert(begin <= end);
578 // deliver array of remaining blocks
579 return GetReaderAt<ItemType>(begin)
580 .template GetItemBatch<ItemType>(end - begin);
581 }
582
583 //! Take a vector of Readers and prefetch equally from them
584 template <typename Reader>
StartPrefetch(std::vector<Reader> & readers,size_t prefetch_size)585 void StartPrefetch(std::vector<Reader>& readers, size_t prefetch_size) {
586 for (size_t p = default_block_size; p < prefetch_size;
587 p += default_block_size)
588 {
589 for (Reader& r : readers)
590 r.source().Prefetch(p);
591 }
592 for (Reader& r : readers)
593 r.source().Prefetch(prefetch_size);
594 }
595
596 //! \}
597
598 } // namespace data
599 } // namespace thrill
600
601 #endif // !THRILL_DATA_FILE_HEADER
602
603 /******************************************************************************/
604