1 /*******************************************************************************
2 * thrill/api/read_lines.hpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
7 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
8 *
9 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10 ******************************************************************************/
11
12 #pragma once
13 #ifndef THRILL_API_READ_LINES_HEADER
14 #define THRILL_API_READ_LINES_HEADER
15
16 #include <thrill/api/context.hpp>
17 #include <thrill/api/dia.hpp>
18 #include <thrill/api/source_node.hpp>
19 #include <thrill/common/defines.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/string.hpp>
22 #include <thrill/common/system_exception.hpp>
23 #include <thrill/net/buffer_builder.hpp>
24 #include <thrill/vfs/file_io.hpp>
25
26 #include <tlx/string/join.hpp>
27
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 namespace thrill {
33 namespace api {
34
35 /*!
36 * A DIANode which performs a line-based Read operation. Reads a file from the
37 * file system and delivers it as a DIA.
38 *
39 * \ingroup api_layer
40 */
41 class ReadLinesNode final : public SourceNode<std::string>
42 {
43 static constexpr bool debug = false;
44
45 public:
46 using Super = SourceNode<std::string>;
47 using Super::context_;
48
49 //! Constructor for a ReadLinesNode. Sets the Context and file path.
ReadLinesNode(Context & ctx,const std::vector<std::string> & globlist,bool local_storage)50 ReadLinesNode(Context& ctx, const std::vector<std::string>& globlist,
51 bool local_storage)
52 : Super(ctx, "ReadLines"),
53 local_storage_(local_storage) {
54
55 filelist_ = vfs::Glob(globlist, vfs::GlobType::File);
56
57 if (filelist_.size() == 0)
58 die("ReadLines: no files found in globs: " + tlx::join(' ', globlist));
59
60 sLOG << "ReadLines: creating for" << globlist.size() << "globs"
61 << "matching" << filelist_.size() << "files";
62 }
63
64 //! Constructor for a ReadLinesNode. Sets the Context and file path.
ReadLinesNode(Context & ctx,const std::string & glob,bool local_storage)65 ReadLinesNode(Context& ctx, const std::string& glob, bool local_storage)
66 : ReadLinesNode(ctx, std::vector<std::string>{ glob }, local_storage)
67 { }
68
PushDataMemUse()69 DIAMemUse PushDataMemUse() final {
70 // InputLineIterators read files block-wise
71 return data::default_block_size;
72 }
73
PushData(bool)74 void PushData(bool /* consume */) final {
75 if (filelist_.contains_compressed) {
76 InputLineIteratorCompressed it(
77 filelist_, *this, local_storage_);
78
79 // Hook Read
80 while (it.HasNext()) {
81 this->PushItem(it.Next());
82 }
83 }
84 else {
85 InputLineIteratorUncompressed it(
86 filelist_, *this, local_storage_);
87
88 // Hook Read
89 while (it.HasNext()) {
90 this->PushItem(it.Next());
91 }
92 }
93 }
94
95 private:
96 vfs::FileList filelist_;
97
98 //! true, if files are on a local file system, false: common global file
99 //! system.
100 bool local_storage_;
101
102 class InputLineIterator
103 {
104 public:
InputLineIterator(const vfs::FileList & files,ReadLinesNode & node)105 InputLineIterator(const vfs::FileList& files,
106 ReadLinesNode& node)
107 : files_(files), node_(node) { }
108
109 //! non-copyable: delete copy-constructor
110 InputLineIterator(const InputLineIterator&) = delete;
111 //! non-copyable: delete assignment operator
112 InputLineIterator& operator = (const InputLineIterator&) = delete;
113 //! move-constructor: default
114 InputLineIterator(InputLineIterator&&) = default;
115 //! move-assignment operator: default
116 InputLineIterator& operator = (InputLineIterator&&) = default;
117
118 protected:
119 //! Block read size
120 const size_t read_size = data::default_block_size;
121 //! String, which Next() references to
122 std::string data_;
123 //! Input files with size prefixsum.
124 const vfs::FileList& files_;
125
126 //! Index of current file in files_
127 size_t file_nr_;
128 //! Byte buffer to create line std::string values.
129 net::BufferBuilder buffer_;
130 //! Start of next element in current buffer.
131 unsigned char* current_;
132 //! (exclusive) [begin,end) of local block
133 common::Range my_range_;
134 //! Reference to node
135 ReadLinesNode& node_;
136
137 common::StatsTimerStopped read_timer;
138
139 size_t total_bytes_ = 0;
140 size_t total_reads_ = 0;
141 size_t total_elements_ = 0;
142
ReadBlock(vfs::ReadStreamPtr & file,net::BufferBuilder & buffer)143 bool ReadBlock(vfs::ReadStreamPtr& file,
144 net::BufferBuilder& buffer) {
145 read_timer.Start();
146 ssize_t bytes = file->read(buffer.data(), read_size);
147 read_timer.Stop();
148 if (bytes < 0) {
149 throw common::ErrnoException("Read error");
150 }
151 buffer.set_size(bytes);
152 current_ = buffer.begin();
153 total_bytes_ += bytes;
154 total_reads_++;
155 LOG << "ReadLines: read block containing " << bytes << " bytes.";
156 return bytes > 0;
157 }
158
~InputLineIterator()159 ~InputLineIterator() {
160 node_.logger_
161 << "class" << "ReadLinesNode"
162 << "event" << "done"
163 << "total_bytes" << total_bytes_
164 << "total_reads" << total_reads_
165 << "total_lines" << total_elements_
166 << "read_time" << read_timer;
167 }
168 };
169
170 //! InputLineIterator gives you access to lines of a file
171 class InputLineIteratorUncompressed : public InputLineIterator
172 {
173 public:
174 //! Creates an instance of iterator that reads file line based
InputLineIteratorUncompressed(const vfs::FileList & files,ReadLinesNode & node,bool local_storage)175 InputLineIteratorUncompressed(const vfs::FileList& files,
176 ReadLinesNode& node, bool local_storage)
177 : InputLineIterator(files, node) {
178
179 // Go to start of 'local part'.
180 if (local_storage) {
181 my_range_ = node_.context_.CalculateLocalRangeOnHost(
182 files.total_size);
183 }
184 else {
185 my_range_ = node_.context_.CalculateLocalRange(
186 files.total_size);
187 }
188
189 assert(my_range_.begin <= my_range_.end);
190 if (my_range_.begin == my_range_.end) return;
191
192 file_nr_ = 0;
193
194 while (files_[file_nr_].size_inc_psum() <= my_range_.begin) {
195 file_nr_++;
196 }
197
198 offset_ = my_range_.begin - files_.size_ex_psum(file_nr_);
199
200 sLOG << "ReadLines: opening file" << file_nr_
201 << "my_range_" << my_range_ << "offset_" << offset_;
202
203 // find offset in current file:
204 // offset = start - sum of previous file sizes
205 stream_ = vfs::OpenReadStream(
206 files_[file_nr_].path, common::Range(offset_, 0));
207
208 buffer_.Reserve(read_size);
209 ReadBlock(stream_, buffer_);
210
211 if (offset_ != 0) {
212 bool found_n = false;
213
214 // find next newline, discard all previous data as previous
215 // worker already covers it
216 while (!found_n) {
217 while (current_ < buffer_.end()) {
218 if (TLX_UNLIKELY(*current_++ == '\n')) {
219 found_n = true;
220 break;
221 }
222 }
223 // no newline found: read new data into buffer_builder
224 if (!found_n) {
225 offset_ += buffer_.size();
226 if (!ReadBlock(stream_, buffer_)) {
227 // EOF = newline per definition
228 found_n = true;
229 }
230 }
231 }
232 }
233 data_.reserve(4 * 1024);
234 }
235
236 //! returns the next element if one exists
237 //!
238 //! does no checks whether a next element exists!
Next()239 const std::string& Next() {
240 total_elements_++;
241 data_.clear();
242 while (true) {
243 while (TLX_LIKELY(current_ < buffer_.end())) {
244 if (TLX_UNLIKELY(*current_ == '\n')) {
245 current_++;
246 return data_;
247 }
248 else {
249 data_.push_back(*current_++);
250 }
251 }
252 offset_ += buffer_.size();
253 if (!ReadBlock(stream_, buffer_)) {
254 LOG << "ReadLines: opening next file";
255
256 stream_->close();
257 file_nr_++;
258 offset_ = 0;
259
260 if (file_nr_ < files_.size()) {
261 stream_ = vfs::OpenReadStream(
262 files_[file_nr_].path, common::Range(offset_, 0));
263 offset_ += buffer_.size();
264 ReadBlock(stream_, buffer_);
265 }
266 else {
267 current_ = buffer_.begin() +
268 files_[file_nr_ - 1].size;
269 }
270
271 if (data_.length()) {
272 return data_;
273 }
274 }
275 }
276 }
277
278 //! returns true, if an element is available in local part
HasNext()279 bool HasNext() {
280 size_t pos = current_ - buffer_.begin();
281 assert(current_ >= buffer_.begin());
282 size_t global_index = offset_ + pos + files_.size_ex_psum(file_nr_);
283 return global_index < my_range_.end ||
284 (global_index == my_range_.end &&
285 files_[file_nr_].size > offset_ + pos);
286 }
287
288 private:
289 //! Offset of current block in stream_.
290 size_t offset_ = 0;
291 //! File handle to files_[file_nr_]
292 vfs::ReadStreamPtr stream_;
293 };
294
295 //! InputLineIterator gives you access to lines of a file
296 class InputLineIteratorCompressed : public InputLineIterator
297 {
298 public:
299 //! Creates an instance of iterator that reads file line based
InputLineIteratorCompressed(const vfs::FileList & files,ReadLinesNode & node,bool local_storage)300 InputLineIteratorCompressed(const vfs::FileList& files,
301 ReadLinesNode& node, bool local_storage)
302 : InputLineIterator(files, node) {
303
304 // Go to start of 'local part'.
305 if (local_storage) {
306 my_range_ = node_.context_.CalculateLocalRangeOnHost(
307 files.total_size);
308 }
309 else {
310 my_range_ = node_.context_.CalculateLocalRange(
311 files.total_size);
312 }
313
314 file_nr_ = 0;
315
316 while (file_nr_ < files_.size() &&
317 (files_[file_nr_].size_inc_psum() +
318 files_[file_nr_].size_ex_psum) / 2 <= my_range_.begin) {
319 ++file_nr_;
320 }
321
322 if (file_nr_ == files_.size()) {
323 LOG << "Start behind last file, not reading anything!";
324 return;
325 }
326
327 for (size_t i = file_nr_; i < files_.size(); i++) {
328 if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
329 == my_range_.end) {
330 break;
331 }
332 if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
333 > my_range_.end) {
334 my_range_.end = files_.size_ex_psum(i);
335 break;
336 }
337 }
338
339 if (my_range_.begin >= my_range_.end) {
340 // No local files, set buffer size to 2, so HasNext() does not try to read
341 LOG << "ReadLines: my_range " << my_range_;
342 buffer_.Reserve(2);
343 buffer_.set_size(2);
344 current_ = buffer_.begin();
345 return;
346 }
347
348 sLOG << "ReadLines: opening compressed file" << file_nr_
349 << "my_range" << my_range_;
350
351 stream_ = vfs::OpenReadStream(files_[file_nr_].path);
352
353 buffer_.Reserve(read_size);
354 ReadBlock(stream_, buffer_);
355 data_.reserve(4 * 1024);
356 }
357
358 //! returns the next element if one exists
359 //!
360 //! does no checks whether a next element exists!
Next()361 const std::string& Next() {
362 total_elements_++;
363 data_.clear();
364 while (true) {
365 while (current_ < buffer_.end()) {
366 if (TLX_UNLIKELY(*current_ == '\n')) {
367 current_++;
368 return data_;
369 }
370 else {
371 data_.push_back(*current_++);
372 }
373 }
374
375 if (!ReadBlock(stream_, buffer_)) {
376 LOG << "ReadLines: opening new compressed file!";
377 stream_->close();
378 file_nr_++;
379
380 if (file_nr_ < files_.size()) {
381 stream_ = vfs::OpenReadStream(files_[file_nr_].path);
382 ReadBlock(stream_, buffer_);
383 }
384 else {
385 LOG << "ReadLines: reached last file";
386 current_ = buffer_.begin();
387 }
388
389 if (data_.length()) {
390 LOG << "ReadLines: end - returning string of length"
391 << data_.length();
392 return data_;
393 }
394 }
395 }
396 }
397
398 //! returns true, if an element is available in local part
HasNext()399 bool HasNext() {
400 if (files_.size_ex_psum(file_nr_) >= my_range_.end) {
401 return false;
402 }
403
404 // if block is fully read, read next block. needs to be done here
405 // as HasNext() has to know if file is finished
406 // v-- no new line at end || v-- newline at end of file
407 if (current_ >= buffer_.end() || (current_ + 1 >= buffer_.end() && *current_ == '\n')) {
408 LOG << "ReadLines: new buffer in HasNext()";
409 ReadBlock(stream_, buffer_);
410 if (buffer_.size() > 1 || (buffer_.size() == 1 && buffer_[0] != '\n')) {
411 return true;
412 }
413 else {
414 LOG << "ReadLines: opening new file in HasNext()";
415 // already at last file
416 if (file_nr_ >= files_.size() - 1) {
417 return false;
418 }
419 stream_->close();
420 // if (this worker reads at least one more file)
421 if (my_range_.end > files_[file_nr_].size_inc_psum()) {
422 file_nr_++;
423 stream_ = vfs::OpenReadStream(files_[file_nr_].path);
424 ReadBlock(stream_, buffer_);
425 return true;
426 }
427 else {
428 return false;
429 }
430 }
431 }
432 else {
433 return true;
434 }
435 }
436
437 private:
438 //! File handle to files_[file_nr_]
439 vfs::ReadStreamPtr stream_;
440 };
441 };
442
443 /*!
444 * ReadLines is a DOp, which reads a file from the file system and
445 * creates an ordered DIA according to a given read function.
446 *
447 * \image html dia_ops/ReadLines.svg
448 *
449 * \param ctx Reference to the context object
450 * \param filepath Path of the file in the file system
451 *
452 * \ingroup dia_sources
453 */
ReadLines(Context & ctx,const std::string & filepath)454 DIA<std::string> ReadLines(Context& ctx, const std::string& filepath) {
455 return DIA<std::string>(
456 tlx::make_counting<ReadLinesNode>(
457 ctx, filepath, /* local_storage */ false));
458 }
459
460 /*!
461 * ReadLines is a DOp, which reads a file from the file system and
462 * creates an ordered DIA according to a given read function.
463 *
464 * \image html dia_ops/ReadLines.svg
465 *
466 * \param ctx Reference to the context object
467 * \param filepath Path of the file in the file system
468 *
469 * \ingroup dia_sources
470 */
ReadLines(struct LocalStorageTag,Context & ctx,const std::string & filepath)471 DIA<std::string> ReadLines(struct LocalStorageTag, Context& ctx,
472 const std::string& filepath) {
473 return DIA<std::string>(
474 tlx::make_counting<ReadLinesNode>(
475 ctx, filepath, /* local_storage */ true));
476 }
477
478 /*!
479 * ReadLines is a DOp, which reads a file from the file system and
480 * creates an ordered DIA according to a given read function.
481 *
482 * \image html dia_ops/ReadLines.svg
483 *
484 * \param ctx Reference to the context object
485 * \param filepaths Path of the file in the file system
486 *
487 * \ingroup dia_sources
488 */
ReadLines(Context & ctx,const std::vector<std::string> & filepaths)489 DIA<std::string> ReadLines(
490 Context& ctx, const std::vector<std::string>& filepaths) {
491 return DIA<std::string>(
492 tlx::make_counting<ReadLinesNode>(
493 ctx, filepaths, /* local_storage */ false));
494 }
495
496 /*!
497 * ReadLines is a DOp, which reads a file from the file system and
498 * creates an ordered DIA according to a given read function.
499 *
500 * \image html dia_ops/ReadLines.svg
501 *
502 * \param ctx Reference to the context object
503 * \param filepaths Path of the file in the file system
504 *
505 * \ingroup dia_sources
506 */
ReadLines(struct LocalStorageTag,Context & ctx,const std::vector<std::string> & filepaths)507 DIA<std::string> ReadLines(struct LocalStorageTag, Context& ctx,
508 const std::vector<std::string>& filepaths) {
509 return DIA<std::string>(
510 tlx::make_counting<ReadLinesNode>(
511 ctx, filepaths, /* local_storage */ true));
512 }
513
514 } // namespace api
515
516 //! imported from api namespace
517 using api::ReadLines;
518
519 } // namespace thrill
520
521 #endif // !THRILL_API_READ_LINES_HEADER
522
523 /******************************************************************************/
524