1 /*******************************************************************************
2 * thrill/data/mix_block_queue.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #include <thrill/data/mix_block_queue.hpp>
12 #include <thrill/data/mix_stream.hpp>
13
14 #include <vector>
15
16 namespace thrill {
17 namespace data {
18
19 /******************************************************************************/
20 // MixBlockQueue
21
MixBlockQueue(BlockPool & block_pool,size_t num_workers,size_t local_worker_id,size_t dia_id)22 MixBlockQueue::MixBlockQueue(BlockPool& block_pool, size_t num_workers,
23 size_t local_worker_id, size_t dia_id)
24 : block_pool_(block_pool),
25 local_worker_id_(local_worker_id),
26 num_workers_(num_workers),
27 write_closed_(num_workers) {
28 queues_.reserve(num_workers);
29 for (size_t w = 0; w < num_workers; ++w) {
30 queues_.emplace_back(block_pool_, local_worker_id, dia_id);
31 }
32 }
33
set_dia_id(size_t dia_id)34 void MixBlockQueue::set_dia_id(size_t dia_id) {
35 for (size_t i = 0; i < queues_.size(); ++i) {
36 queues_[i].set_dia_id(dia_id);
37 }
38 }
39
AppendBlock(size_t src,const Block & block)40 void MixBlockQueue::AppendBlock(size_t src, const Block& block) {
41 LOG << "MixBlockQueue::AppendBlock"
42 << " src=" << src << " block=" << block;
43 mix_queue_.emplace(SrcBlockPair { src, block });
44 }
45
AppendBlock(size_t src,Block && block)46 void MixBlockQueue::AppendBlock(size_t src, Block&& block) {
47 LOG << "MixBlockQueue::AppendBlock"
48 << " src=" << src << " block=" << block;
49 mix_queue_.emplace(SrcBlockPair { src, std::move(block) });
50 }
51
Close(size_t src)52 void MixBlockQueue::Close(size_t src) {
53 LOG << "MixBlockQueue::Close()"
54 << " src=" << src
55 << " local_worker_id_=" << local_worker_id_
56 << " --write_open_count_=" << write_open_count_ - 1;
57 assert(!write_closed_[src]);
58 write_closed_[src] = true;
59 --write_open_count_;
60
61 // enqueue a closing Block.
62 mix_queue_.emplace(SrcBlockPair { src, Block() });
63 }
64
is_queue_closed(size_t src)65 bool MixBlockQueue::is_queue_closed(size_t src) {
66 return write_closed_[src];
67 }
68
Pop()69 MixBlockQueue::SrcBlockPair MixBlockQueue::Pop() {
70 if (read_open_ == 0)
71 return SrcBlockPair {
72 size_t(-1), Block()
73 };
74 SrcBlockPair b;
75 mix_queue_.pop(b);
76 if (!b.block.IsValid()) {
77 LOG << "MixBlockQueue()"
78 << " read_open_ " << read_open_ << " -> " << read_open_ - 1;
79 --read_open_;
80 }
81 return b;
82 }
83
84 /******************************************************************************/
85 // MixBlockQueueReader
86
MixBlockQueueReader(MixBlockQueue & mix_queue,bool consume,size_t local_worker_id)87 MixBlockQueueReader::MixBlockQueueReader(
88 MixBlockQueue& mix_queue, bool consume, size_t local_worker_id)
89 : mix_queue_(mix_queue),
90 reread_(mix_queue.read_closed()) {
91
92 if (!reread_) {
93 readers_.reserve(mix_queue_.num_workers_);
94 available_at_.resize(mix_queue_.num_workers_, 0);
95
96 for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
97 readers_.emplace_back(
98 mix_queue_.queues_[w].GetReader(consume, local_worker_id));
99 }
100 }
101 else {
102 // construct vector of BlockSources to read from queues_.
103 std::vector<DynBlockSource> result;
104 for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
105 result.emplace_back(mix_queue_.queues_[w].GetBlockSource(
106 consume, local_worker_id));
107 }
108 // move BlockQueueSources into concatenation BlockSource, and to Reader.
109 cat_reader_ = CatBlockReader(CatBlockSource(std::move(result)));
110 }
111 }
112
~MixBlockQueueReader()113 MixBlockQueueReader::~MixBlockQueueReader() {
114 // TODO(tb)
115 }
116
PullBlock()117 bool MixBlockQueueReader::PullBlock() {
118 // no full item available: get next block from mix queue
119 while (available_ == 0) {
120
121 MixBlockQueue::SrcBlockPair src_blk = mix_queue_.Pop();
122 LOG << "MixBlockQueueReader::PullBlock()"
123 << " still open_=" << open_
124 << " src=" << src_blk.src << " block=" << src_blk.block
125 << " selected_=" << selected_
126 << " available_=" << available_
127 << " available_at_[src]=" << available_at_[src_blk.src];
128
129 assert(src_blk.src < readers_.size());
130
131 if (src_blk.block.IsValid()) {
132 // block for this reader.
133 selected_ = src_blk.src;
134
135 size_t num_items = src_blk.block.num_items();
136
137 // save block with data for reader
138 mix_queue_.queues_[src_blk.src].AppendBlock(
139 std::move(src_blk.block), /* is_last_block */ false);
140
141 // add available items: one less than in the blocks.
142 available_at_[src_blk.src] += num_items;
143 available_ = available_at_[src_blk.src] - 1;
144 available_at_[src_blk.src] -= available_;
145 }
146 else {
147 // close block received: maybe get last item
148 assert(open_ > 0);
149 --open_;
150
151 // save block with data for reader
152 mix_queue_.queues_[src_blk.src].AppendBlock(
153 std::move(src_blk.block), /* is_last_block */ false);
154
155 // check if we can still read the last item
156 if (available_at_[src_blk.src]) {
157 assert(available_at_[src_blk.src] == 1);
158 selected_ = src_blk.src;
159 available_ = available_at_[src_blk.src];
160 available_at_[src_blk.src] -= available_;
161 }
162 else if (open_ == 0) return false;
163 }
164
165 LOG << "MixBlockQueueReader::PullBlock() afterwards"
166 << " selected_=" << selected_
167 << " available_=" << available_
168 << " available_at_[src]=" << available_at_[src_blk.src];
169 }
170 return true;
171 }
172
173 } // namespace data
174 } // namespace thrill
175
176 /******************************************************************************/
177