1 /*******************************************************************************
2  * thrill/api/write_binary.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_WRITE_BINARY_HEADER
14 #define THRILL_API_WRITE_BINARY_HEADER
15 
16 #include <thrill/api/action_node.hpp>
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/string.hpp>
20 #include <thrill/data/block_sink.hpp>
21 #include <thrill/data/block_writer.hpp>
22 #include <thrill/vfs/file_io.hpp>
23 #include <tlx/math/round_to_power_of_two.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace api {
31 
32 /*!
33  * \ingroup api_layer
34  */
35 template <typename ValueType>
36 class WriteBinaryNode final : public ActionNode
37 {
38     static constexpr bool debug = false;
39 
40 public:
41     using Super = ActionNode;
42     using Super::context_;
43 
44     template <typename ParentDIA>
WriteBinaryNode(const ParentDIA & parent,const std::string & path_out,size_t max_file_size)45     WriteBinaryNode(const ParentDIA& parent,
46                     const std::string& path_out,
47                     size_t max_file_size)
48         : ActionNode(parent.ctx(), "WriteBinary",
49                      { parent.id() }, { parent.node() }),
50           out_pathbase_(path_out),
51           max_file_size_(max_file_size) {
52         sLOG << "Creating write node.";
53 
54         block_size_ = std::min(data::default_block_size,
55                                tlx::round_up_to_power_of_two(max_file_size));
56         sLOG << "block_size_" << block_size_;
57 
__anon9d58cefb0102(const ValueType& input) 58         auto pre_op_fn = [=](const ValueType& input) {
59                              return PreOp(input);
60                          };
61         // close the function stack with our pre op and register it at parent
62         // node for output
63         auto lop_chain = parent.stack().push(pre_op_fn).fold();
64         parent.node()->AddChild(this, lop_chain);
65     }
66 
PreOpMemUse()67     DIAMemUse PreOpMemUse() final {
68         return data::default_block_size;
69     }
70 
71     //! writer preop: put item into file, create files as needed.
PreOp(const ValueType & input)72     void PreOp(const ValueType& input) {
73         stats_total_elements_++;
74 
75         if (!writer_) OpenNextFile();
76 
77         try {
78             writer_->PutNoSelfVerify(input);
79         }
80         catch (data::FullException&) {
81             // sink is full. flush it. and repeat, which opens new file.
82             OpenNextFile();
83 
84             try {
85                 writer_->PutNoSelfVerify(input);
86             }
87             catch (data::FullException&) {
88                 throw std::runtime_error(
89                           "Error in WriteBinary: "
90                           "an item is larger than the file size limit");
91             }
92         }
93     }
94 
95     //! Closes the output file
StopPreOp(size_t)96     void StopPreOp(size_t /* parent_index */) final {
97         sLOG << "closing file" << out_pathbase_;
98         writer_.reset();
99 
100         Super::logger_
101             << "class" << "WriteBinaryNode"
102             << "total_elements" << stats_total_elements_
103             << "total_writes" << stats_total_writes_;
104     }
105 
Execute()106     void Execute() final { }
107 
108 private:
109     //! Implements BlockSink class writing to files with size limit.
110     class SysFileSink final : public data::BoundedBlockSink
111     {
112     public:
SysFileSink(api::Context & context,size_t local_worker_id,const std::string & path,size_t max_file_size,size_t & stats_total_elements,size_t & stats_total_writes)113         SysFileSink(api::Context& context,
114                     size_t local_worker_id,
115                     const std::string& path, size_t max_file_size,
116                     size_t& stats_total_elements,
117                     size_t& stats_total_writes)
118             : BlockSink(context.block_pool(), local_worker_id),
119               BoundedBlockSink(context.block_pool(), local_worker_id, max_file_size),
120               stream_(vfs::OpenWriteStream(path)),
121               stats_total_elements_(stats_total_elements),
122               stats_total_writes_(stats_total_writes) { }
123 
AppendPinnedBlock(data::PinnedBlock && b,bool)124         void AppendPinnedBlock(
125             data::PinnedBlock&& b, bool /* is_last_block */) final {
126             sLOG << "SysFileSink::AppendBlock()" << b;
127             stats_total_writes_++;
128             stream_->write(b.data_begin(), b.size());
129         }
130 
AppendBlock(const data::Block & block,bool is_last_block)131         void AppendBlock(const data::Block& block, bool is_last_block) {
132             return AppendPinnedBlock(
133                 block.PinWait(local_worker_id()), is_last_block);
134         }
135 
AppendBlock(data::Block && block,bool is_last_block)136         void AppendBlock(data::Block&& block, bool is_last_block) {
137             return AppendPinnedBlock(
138                 block.PinWait(local_worker_id()), is_last_block);
139         }
140 
Close()141         void Close() final {
142             stream_->close();
143         }
144 
145     private:
146         vfs::WriteStreamPtr stream_;
147         size_t& stats_total_elements_;
148         size_t& stats_total_writes_;
149     };
150 
151     using Writer = data::BlockWriter<SysFileSink>;
152 
153     //! Base path of the output file.
154     std::string out_pathbase_;
155 
156     //! File serial number for this worker
157     size_t out_serial_ = 0;
158 
159     //! Maximum file size
160     size_t max_file_size_;
161 
162     //! Block size used by BlockWriter
163     size_t block_size_ = data::default_block_size;
164 
165     //! BlockWriter to sink.
166     std::unique_ptr<Writer> writer_;
167 
168     size_t stats_total_elements_ = 0;
169     size_t stats_total_writes_ = 0;
170 
171     //! Function to create sink_ and writer_ for next file
OpenNextFile()172     void OpenNextFile() {
173         writer_.reset();
174 
175         // construct path from pattern containing ### and $$$
176         std::string out_path = vfs::FillFilePattern(
177             out_pathbase_, context_.my_rank(), out_serial_++);
178 
179         sLOG << "OpenNextFile() out_path" << out_path;
180 
181         writer_ = std::make_unique<Writer>(
182             SysFileSink(
183                 context_, context_.local_worker_id(),
184                 out_path, max_file_size_,
185                 stats_total_elements_, stats_total_writes_),
186             block_size_);
187     }
188 };
189 
190 template <typename ValueType, typename Stack>
WriteBinary(const std::string & filepath,size_t max_file_size) const191 void DIA<ValueType, Stack>::WriteBinary(
192     const std::string& filepath, size_t max_file_size) const {
193 
194     using WriteBinaryNode = api::WriteBinaryNode<ValueType>;
195 
196     auto node = tlx::make_counting<WriteBinaryNode>(
197         *this, filepath, max_file_size);
198 
199     node->RunScope();
200 }
201 
202 template <typename ValueType, typename Stack>
WriteBinaryFuture(const std::string & filepath,size_t max_file_size) const203 Future<void> DIA<ValueType, Stack>::WriteBinaryFuture(
204     const std::string& filepath, size_t max_file_size) const {
205 
206     using WriteBinaryNode = api::WriteBinaryNode<ValueType>;
207 
208     auto node = tlx::make_counting<WriteBinaryNode>(
209         *this, filepath, max_file_size);
210 
211     return Future<void>(node);
212 }
213 
214 } // namespace api
215 } // namespace thrill
216 
217 #endif // !THRILL_API_WRITE_BINARY_HEADER
218 
219 /******************************************************************************/
220