1 /*******************************************************************************
2 * thrill/api/write_binary.hpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
8 *
9 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10 ******************************************************************************/
11
12 #pragma once
13 #ifndef THRILL_API_WRITE_BINARY_HEADER
14 #define THRILL_API_WRITE_BINARY_HEADER
15
16 #include <thrill/api/action_node.hpp>
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/string.hpp>
20 #include <thrill/data/block_sink.hpp>
21 #include <thrill/data/block_writer.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 #include <tlx/math/round_to_power_of_two.hpp>
24
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28
29 namespace thrill {
30 namespace api {
31
32 /*!
33 * \ingroup api_layer
34 */
35 template <typename ValueType>
36 class WriteBinaryNode final : public ActionNode
37 {
38 static constexpr bool debug = false;
39
40 public:
41 using Super = ActionNode;
42 using Super::context_;
43
44 template <typename ParentDIA>
WriteBinaryNode(const ParentDIA & parent,const std::string & path_out,size_t max_file_size)45 WriteBinaryNode(const ParentDIA& parent,
46 const std::string& path_out,
47 size_t max_file_size)
48 : ActionNode(parent.ctx(), "WriteBinary",
49 { parent.id() }, { parent.node() }),
50 out_pathbase_(path_out),
51 max_file_size_(max_file_size) {
52 sLOG << "Creating write node.";
53
54 block_size_ = std::min(data::default_block_size,
55 tlx::round_up_to_power_of_two(max_file_size));
56 sLOG << "block_size_" << block_size_;
57
__anon9d58cefb0102(const ValueType& input) 58 auto pre_op_fn = [=](const ValueType& input) {
59 return PreOp(input);
60 };
61 // close the function stack with our pre op and register it at parent
62 // node for output
63 auto lop_chain = parent.stack().push(pre_op_fn).fold();
64 parent.node()->AddChild(this, lop_chain);
65 }
66
PreOpMemUse()67 DIAMemUse PreOpMemUse() final {
68 return data::default_block_size;
69 }
70
71 //! writer preop: put item into file, create files as needed.
PreOp(const ValueType & input)72 void PreOp(const ValueType& input) {
73 stats_total_elements_++;
74
75 if (!writer_) OpenNextFile();
76
77 try {
78 writer_->PutNoSelfVerify(input);
79 }
80 catch (data::FullException&) {
81 // sink is full. flush it. and repeat, which opens new file.
82 OpenNextFile();
83
84 try {
85 writer_->PutNoSelfVerify(input);
86 }
87 catch (data::FullException&) {
88 throw std::runtime_error(
89 "Error in WriteBinary: "
90 "an item is larger than the file size limit");
91 }
92 }
93 }
94
95 //! Closes the output file
StopPreOp(size_t)96 void StopPreOp(size_t /* parent_index */) final {
97 sLOG << "closing file" << out_pathbase_;
98 writer_.reset();
99
100 Super::logger_
101 << "class" << "WriteBinaryNode"
102 << "total_elements" << stats_total_elements_
103 << "total_writes" << stats_total_writes_;
104 }
105
Execute()106 void Execute() final { }
107
108 private:
109 //! Implements BlockSink class writing to files with size limit.
110 class SysFileSink final : public data::BoundedBlockSink
111 {
112 public:
SysFileSink(api::Context & context,size_t local_worker_id,const std::string & path,size_t max_file_size,size_t & stats_total_elements,size_t & stats_total_writes)113 SysFileSink(api::Context& context,
114 size_t local_worker_id,
115 const std::string& path, size_t max_file_size,
116 size_t& stats_total_elements,
117 size_t& stats_total_writes)
118 : BlockSink(context.block_pool(), local_worker_id),
119 BoundedBlockSink(context.block_pool(), local_worker_id, max_file_size),
120 stream_(vfs::OpenWriteStream(path)),
121 stats_total_elements_(stats_total_elements),
122 stats_total_writes_(stats_total_writes) { }
123
AppendPinnedBlock(data::PinnedBlock && b,bool)124 void AppendPinnedBlock(
125 data::PinnedBlock&& b, bool /* is_last_block */) final {
126 sLOG << "SysFileSink::AppendBlock()" << b;
127 stats_total_writes_++;
128 stream_->write(b.data_begin(), b.size());
129 }
130
AppendBlock(const data::Block & block,bool is_last_block)131 void AppendBlock(const data::Block& block, bool is_last_block) {
132 return AppendPinnedBlock(
133 block.PinWait(local_worker_id()), is_last_block);
134 }
135
AppendBlock(data::Block && block,bool is_last_block)136 void AppendBlock(data::Block&& block, bool is_last_block) {
137 return AppendPinnedBlock(
138 block.PinWait(local_worker_id()), is_last_block);
139 }
140
Close()141 void Close() final {
142 stream_->close();
143 }
144
145 private:
146 vfs::WriteStreamPtr stream_;
147 size_t& stats_total_elements_;
148 size_t& stats_total_writes_;
149 };
150
151 using Writer = data::BlockWriter<SysFileSink>;
152
153 //! Base path of the output file.
154 std::string out_pathbase_;
155
156 //! File serial number for this worker
157 size_t out_serial_ = 0;
158
159 //! Maximum file size
160 size_t max_file_size_;
161
162 //! Block size used by BlockWriter
163 size_t block_size_ = data::default_block_size;
164
165 //! BlockWriter to sink.
166 std::unique_ptr<Writer> writer_;
167
168 size_t stats_total_elements_ = 0;
169 size_t stats_total_writes_ = 0;
170
171 //! Function to create sink_ and writer_ for next file
OpenNextFile()172 void OpenNextFile() {
173 writer_.reset();
174
175 // construct path from pattern containing ### and $$$
176 std::string out_path = vfs::FillFilePattern(
177 out_pathbase_, context_.my_rank(), out_serial_++);
178
179 sLOG << "OpenNextFile() out_path" << out_path;
180
181 writer_ = std::make_unique<Writer>(
182 SysFileSink(
183 context_, context_.local_worker_id(),
184 out_path, max_file_size_,
185 stats_total_elements_, stats_total_writes_),
186 block_size_);
187 }
188 };
189
190 template <typename ValueType, typename Stack>
WriteBinary(const std::string & filepath,size_t max_file_size) const191 void DIA<ValueType, Stack>::WriteBinary(
192 const std::string& filepath, size_t max_file_size) const {
193
194 using WriteBinaryNode = api::WriteBinaryNode<ValueType>;
195
196 auto node = tlx::make_counting<WriteBinaryNode>(
197 *this, filepath, max_file_size);
198
199 node->RunScope();
200 }
201
202 template <typename ValueType, typename Stack>
WriteBinaryFuture(const std::string & filepath,size_t max_file_size) const203 Future<void> DIA<ValueType, Stack>::WriteBinaryFuture(
204 const std::string& filepath, size_t max_file_size) const {
205
206 using WriteBinaryNode = api::WriteBinaryNode<ValueType>;
207
208 auto node = tlx::make_counting<WriteBinaryNode>(
209 *this, filepath, max_file_size);
210
211 return Future<void>(node);
212 }
213
214 } // namespace api
215 } // namespace thrill
216
217 #endif // !THRILL_API_WRITE_BINARY_HEADER
218
219 /******************************************************************************/
220