1 /*******************************************************************************
2  * thrill/data/mix_block_queue.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/data/mix_block_queue.hpp>
12 #include <thrill/data/mix_stream.hpp>
13 
14 #include <vector>
15 
16 namespace thrill {
17 namespace data {
18 
19 /******************************************************************************/
20 // MixBlockQueue
21 
MixBlockQueue(BlockPool & block_pool,size_t num_workers,size_t local_worker_id,size_t dia_id)22 MixBlockQueue::MixBlockQueue(BlockPool& block_pool, size_t num_workers,
23                              size_t local_worker_id, size_t dia_id)
24     : block_pool_(block_pool),
25       local_worker_id_(local_worker_id),
26       num_workers_(num_workers),
27       write_closed_(num_workers) {
28     queues_.reserve(num_workers);
29     for (size_t w = 0; w < num_workers; ++w) {
30         queues_.emplace_back(block_pool_, local_worker_id, dia_id);
31     }
32 }
33 
set_dia_id(size_t dia_id)34 void MixBlockQueue::set_dia_id(size_t dia_id) {
35     for (size_t i = 0; i < queues_.size(); ++i) {
36         queues_[i].set_dia_id(dia_id);
37     }
38 }
39 
AppendBlock(size_t src,const Block & block)40 void MixBlockQueue::AppendBlock(size_t src, const Block& block) {
41     LOG << "MixBlockQueue::AppendBlock"
42         << " src=" << src << " block=" << block;
43     mix_queue_.emplace(SrcBlockPair { src, block });
44 }
45 
AppendBlock(size_t src,Block && block)46 void MixBlockQueue::AppendBlock(size_t src, Block&& block) {
47     LOG << "MixBlockQueue::AppendBlock"
48         << " src=" << src << " block=" << block;
49     mix_queue_.emplace(SrcBlockPair { src, std::move(block) });
50 }
51 
Close(size_t src)52 void MixBlockQueue::Close(size_t src) {
53     LOG << "MixBlockQueue::Close()"
54         << " src=" << src
55         << " local_worker_id_=" << local_worker_id_
56         << " --write_open_count_=" << write_open_count_ - 1;
57     assert(!write_closed_[src]);
58     write_closed_[src] = true;
59     --write_open_count_;
60 
61     // enqueue a closing Block.
62     mix_queue_.emplace(SrcBlockPair { src, Block() });
63 }
64 
is_queue_closed(size_t src)65 bool MixBlockQueue::is_queue_closed(size_t src) {
66     return write_closed_[src];
67 }
68 
Pop()69 MixBlockQueue::SrcBlockPair MixBlockQueue::Pop() {
70     if (read_open_ == 0)
71         return SrcBlockPair {
72             size_t(-1), Block()
73         };
74     SrcBlockPair b;
75     mix_queue_.pop(b);
76     if (!b.block.IsValid()) {
77         LOG << "MixBlockQueue()"
78             << " read_open_ " << read_open_ << " -> " << read_open_ - 1;
79         --read_open_;
80     }
81     return b;
82 }
83 
84 /******************************************************************************/
85 // MixBlockQueueReader
86 
MixBlockQueueReader(MixBlockQueue & mix_queue,bool consume,size_t local_worker_id)87 MixBlockQueueReader::MixBlockQueueReader(
88     MixBlockQueue& mix_queue, bool consume, size_t local_worker_id)
89     : mix_queue_(mix_queue),
90       reread_(mix_queue.read_closed()) {
91 
92     if (!reread_) {
93         readers_.reserve(mix_queue_.num_workers_);
94         available_at_.resize(mix_queue_.num_workers_, 0);
95 
96         for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
97             readers_.emplace_back(
98                 mix_queue_.queues_[w].GetReader(consume, local_worker_id));
99         }
100     }
101     else {
102         // construct vector of BlockSources to read from queues_.
103         std::vector<DynBlockSource> result;
104         for (size_t w = 0; w < mix_queue_.num_workers_; ++w) {
105             result.emplace_back(mix_queue_.queues_[w].GetBlockSource(
106                                     consume, local_worker_id));
107         }
108         // move BlockQueueSources into concatenation BlockSource, and to Reader.
109         cat_reader_ = CatBlockReader(CatBlockSource(std::move(result)));
110     }
111 }
112 
~MixBlockQueueReader()113 MixBlockQueueReader::~MixBlockQueueReader() {
114     // TODO(tb)
115 }
116 
PullBlock()117 bool MixBlockQueueReader::PullBlock() {
118     // no full item available: get next block from mix queue
119     while (available_ == 0) {
120 
121         MixBlockQueue::SrcBlockPair src_blk = mix_queue_.Pop();
122         LOG << "MixBlockQueueReader::PullBlock()"
123             << " still open_=" << open_
124             << " src=" << src_blk.src << " block=" << src_blk.block
125             << " selected_=" << selected_
126             << " available_=" << available_
127             << " available_at_[src]=" << available_at_[src_blk.src];
128 
129         assert(src_blk.src < readers_.size());
130 
131         if (src_blk.block.IsValid()) {
132             // block for this reader.
133             selected_ = src_blk.src;
134 
135             size_t num_items = src_blk.block.num_items();
136 
137             // save block with data for reader
138             mix_queue_.queues_[src_blk.src].AppendBlock(
139                 std::move(src_blk.block), /* is_last_block */ false);
140 
141             // add available items: one less than in the blocks.
142             available_at_[src_blk.src] += num_items;
143             available_ = available_at_[src_blk.src] - 1;
144             available_at_[src_blk.src] -= available_;
145         }
146         else {
147             // close block received: maybe get last item
148             assert(open_ > 0);
149             --open_;
150 
151             // save block with data for reader
152             mix_queue_.queues_[src_blk.src].AppendBlock(
153                 std::move(src_blk.block), /* is_last_block */ false);
154 
155             // check if we can still read the last item
156             if (available_at_[src_blk.src]) {
157                 assert(available_at_[src_blk.src] == 1);
158                 selected_ = src_blk.src;
159                 available_ = available_at_[src_blk.src];
160                 available_at_[src_blk.src] -= available_;
161             }
162             else if (open_ == 0) return false;
163         }
164 
165         LOG << "MixBlockQueueReader::PullBlock() afterwards"
166             << " selected_=" << selected_
167             << " available_=" << available_
168             << " available_at_[src]=" << available_at_[src_blk.src];
169     }
170     return true;
171 }
172 
173 } // namespace data
174 } // namespace thrill
175 
176 /******************************************************************************/
177