1 /*******************************************************************************
2  * thrill/api/read_lines.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
7  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_READ_LINES_HEADER
14 #define THRILL_API_READ_LINES_HEADER
15 
16 #include <thrill/api/context.hpp>
17 #include <thrill/api/dia.hpp>
18 #include <thrill/api/source_node.hpp>
19 #include <thrill/common/defines.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/string.hpp>
22 #include <thrill/common/system_exception.hpp>
23 #include <thrill/net/buffer_builder.hpp>
24 #include <thrill/vfs/file_io.hpp>
25 
26 #include <tlx/string/join.hpp>
27 
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 namespace thrill {
33 namespace api {
34 
35 /*!
36  * A DIANode which performs a line-based Read operation. Reads a file from the
37  * file system and delivers it as a DIA.
38  *
39  * \ingroup api_layer
40  */
41 class ReadLinesNode final : public SourceNode<std::string>
42 {
43     static constexpr bool debug = false;
44 
45 public:
46     using Super = SourceNode<std::string>;
47     using Super::context_;
48 
49     //! Constructor for a ReadLinesNode. Sets the Context and file path.
ReadLinesNode(Context & ctx,const std::vector<std::string> & globlist,bool local_storage)50     ReadLinesNode(Context& ctx, const std::vector<std::string>& globlist,
51                   bool local_storage)
52         : Super(ctx, "ReadLines"),
53           local_storage_(local_storage) {
54 
55         filelist_ = vfs::Glob(globlist, vfs::GlobType::File);
56 
57         if (filelist_.size() == 0)
58             die("ReadLines: no files found in globs: " + tlx::join(' ', globlist));
59 
60         sLOG << "ReadLines: creating for" << globlist.size() << "globs"
61              << "matching" << filelist_.size() << "files";
62     }
63 
64     //! Constructor for a ReadLinesNode. Sets the Context and file path.
ReadLinesNode(Context & ctx,const std::string & glob,bool local_storage)65     ReadLinesNode(Context& ctx, const std::string& glob, bool local_storage)
66         : ReadLinesNode(ctx, std::vector<std::string>{ glob }, local_storage)
67     { }
68 
PushDataMemUse()69     DIAMemUse PushDataMemUse() final {
70         // InputLineIterators read files block-wise
71         return data::default_block_size;
72     }
73 
PushData(bool)74     void PushData(bool /* consume */) final {
75         if (filelist_.contains_compressed) {
76             InputLineIteratorCompressed it(
77                 filelist_, *this, local_storage_);
78 
79             // Hook Read
80             while (it.HasNext()) {
81                 this->PushItem(it.Next());
82             }
83         }
84         else {
85             InputLineIteratorUncompressed it(
86                 filelist_, *this, local_storage_);
87 
88             // Hook Read
89             while (it.HasNext()) {
90                 this->PushItem(it.Next());
91             }
92         }
93     }
94 
95 private:
96     vfs::FileList filelist_;
97 
98     //! true, if files are on a local file system, false: common global file
99     //! system.
100     bool local_storage_;
101 
102     class InputLineIterator
103     {
104     public:
InputLineIterator(const vfs::FileList & files,ReadLinesNode & node)105         InputLineIterator(const vfs::FileList& files,
106                           ReadLinesNode& node)
107             : files_(files), node_(node) { }
108 
109         //! non-copyable: delete copy-constructor
110         InputLineIterator(const InputLineIterator&) = delete;
111         //! non-copyable: delete assignment operator
112         InputLineIterator& operator = (const InputLineIterator&) = delete;
113         //! move-constructor: default
114         InputLineIterator(InputLineIterator&&) = default;
115         //! move-assignment operator: default
116         InputLineIterator& operator = (InputLineIterator&&) = default;
117 
118     protected:
119         //! Block read size
120         const size_t read_size = data::default_block_size;
121         //! String, which Next() references to
122         std::string data_;
123         //! Input files with size prefixsum.
124         const vfs::FileList& files_;
125 
126         //! Index of current file in files_
127         size_t file_nr_;
128         //! Byte buffer to create line std::string values.
129         net::BufferBuilder buffer_;
130         //! Start of next element in current buffer.
131         unsigned char* current_;
132         //! (exclusive) [begin,end) of local block
133         common::Range my_range_;
134         //! Reference to node
135         ReadLinesNode& node_;
136 
137         common::StatsTimerStopped read_timer;
138 
139         size_t total_bytes_ = 0;
140         size_t total_reads_ = 0;
141         size_t total_elements_ = 0;
142 
ReadBlock(vfs::ReadStreamPtr & file,net::BufferBuilder & buffer)143         bool ReadBlock(vfs::ReadStreamPtr& file,
144                        net::BufferBuilder& buffer) {
145             read_timer.Start();
146             ssize_t bytes = file->read(buffer.data(), read_size);
147             read_timer.Stop();
148             if (bytes < 0) {
149                 throw common::ErrnoException("Read error");
150             }
151             buffer.set_size(bytes);
152             current_ = buffer.begin();
153             total_bytes_ += bytes;
154             total_reads_++;
155             LOG << "ReadLines: read block containing " << bytes << " bytes.";
156             return bytes > 0;
157         }
158 
~InputLineIterator()159         ~InputLineIterator() {
160             node_.logger_
161                 << "class" << "ReadLinesNode"
162                 << "event" << "done"
163                 << "total_bytes" << total_bytes_
164                 << "total_reads" << total_reads_
165                 << "total_lines" << total_elements_
166                 << "read_time" << read_timer;
167         }
168     };
169 
170     //! InputLineIterator gives you access to lines of a file
171     class InputLineIteratorUncompressed : public InputLineIterator
172     {
173     public:
174         //! Creates an instance of iterator that reads file line based
InputLineIteratorUncompressed(const vfs::FileList & files,ReadLinesNode & node,bool local_storage)175         InputLineIteratorUncompressed(const vfs::FileList& files,
176                                       ReadLinesNode& node, bool local_storage)
177             : InputLineIterator(files, node) {
178 
179             // Go to start of 'local part'.
180             if (local_storage) {
181                 my_range_ = node_.context_.CalculateLocalRangeOnHost(
182                     files.total_size);
183             }
184             else {
185                 my_range_ = node_.context_.CalculateLocalRange(
186                     files.total_size);
187             }
188 
189             assert(my_range_.begin <= my_range_.end);
190             if (my_range_.begin == my_range_.end) return;
191 
192             file_nr_ = 0;
193 
194             while (files_[file_nr_].size_inc_psum() <= my_range_.begin) {
195                 file_nr_++;
196             }
197 
198             offset_ = my_range_.begin - files_.size_ex_psum(file_nr_);
199 
200             sLOG << "ReadLines: opening file" << file_nr_
201                  << "my_range_" << my_range_ << "offset_" << offset_;
202 
203             // find offset in current file:
204             // offset = start - sum of previous file sizes
205             stream_ = vfs::OpenReadStream(
206                 files_[file_nr_].path, common::Range(offset_, 0));
207 
208             buffer_.Reserve(read_size);
209             ReadBlock(stream_, buffer_);
210 
211             if (offset_ != 0) {
212                 bool found_n = false;
213 
214                 // find next newline, discard all previous data as previous
215                 // worker already covers it
216                 while (!found_n) {
217                     while (current_ < buffer_.end()) {
218                         if (TLX_UNLIKELY(*current_++ == '\n')) {
219                             found_n = true;
220                             break;
221                         }
222                     }
223                     // no newline found: read new data into buffer_builder
224                     if (!found_n) {
225                         offset_ += buffer_.size();
226                         if (!ReadBlock(stream_, buffer_)) {
227                             // EOF = newline per definition
228                             found_n = true;
229                         }
230                     }
231                 }
232             }
233             data_.reserve(4 * 1024);
234         }
235 
236         //! returns the next element if one exists
237         //!
238         //! does no checks whether a next element exists!
Next()239         const std::string& Next() {
240             total_elements_++;
241             data_.clear();
242             while (true) {
243                 while (TLX_LIKELY(current_ < buffer_.end())) {
244                     if (TLX_UNLIKELY(*current_ == '\n')) {
245                         current_++;
246                         return data_;
247                     }
248                     else {
249                         data_.push_back(*current_++);
250                     }
251                 }
252                 offset_ += buffer_.size();
253                 if (!ReadBlock(stream_, buffer_)) {
254                     LOG << "ReadLines: opening next file";
255 
256                     stream_->close();
257                     file_nr_++;
258                     offset_ = 0;
259 
260                     if (file_nr_ < files_.size()) {
261                         stream_ = vfs::OpenReadStream(
262                             files_[file_nr_].path, common::Range(offset_, 0));
263                         offset_ += buffer_.size();
264                         ReadBlock(stream_, buffer_);
265                     }
266                     else {
267                         current_ = buffer_.begin() +
268                                    files_[file_nr_ - 1].size;
269                     }
270 
271                     if (data_.length()) {
272                         return data_;
273                     }
274                 }
275             }
276         }
277 
278         //! returns true, if an element is available in local part
HasNext()279         bool HasNext() {
280             size_t pos = current_ - buffer_.begin();
281             assert(current_ >= buffer_.begin());
282             size_t global_index = offset_ + pos + files_.size_ex_psum(file_nr_);
283             return global_index < my_range_.end ||
284                    (global_index == my_range_.end &&
285                     files_[file_nr_].size > offset_ + pos);
286         }
287 
288     private:
289         //! Offset of current block in stream_.
290         size_t offset_ = 0;
291         //! File handle to files_[file_nr_]
292         vfs::ReadStreamPtr stream_;
293     };
294 
295     //! InputLineIterator gives you access to lines of a file
296     class InputLineIteratorCompressed : public InputLineIterator
297     {
298     public:
299         //! Creates an instance of iterator that reads file line based
InputLineIteratorCompressed(const vfs::FileList & files,ReadLinesNode & node,bool local_storage)300         InputLineIteratorCompressed(const vfs::FileList& files,
301                                     ReadLinesNode& node, bool local_storage)
302             : InputLineIterator(files, node) {
303 
304             // Go to start of 'local part'.
305             if (local_storage) {
306                 my_range_ = node_.context_.CalculateLocalRangeOnHost(
307                     files.total_size);
308             }
309             else {
310                 my_range_ = node_.context_.CalculateLocalRange(
311                     files.total_size);
312             }
313 
314             file_nr_ = 0;
315 
316             while (file_nr_ < files_.size() &&
317                    (files_[file_nr_].size_inc_psum() +
318                     files_[file_nr_].size_ex_psum) / 2 <= my_range_.begin) {
319                 ++file_nr_;
320             }
321 
322             if (file_nr_ == files_.size()) {
323                 LOG << "Start behind last file, not reading anything!";
324                 return;
325             }
326 
327             for (size_t i = file_nr_; i < files_.size(); i++) {
328                 if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
329                     == my_range_.end) {
330                     break;
331                 }
332                 if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
333                     > my_range_.end) {
334                     my_range_.end = files_.size_ex_psum(i);
335                     break;
336                 }
337             }
338 
339             if (my_range_.begin >= my_range_.end) {
340                 // No local files, set buffer size to 2, so HasNext() does not try to read
341                 LOG << "ReadLines: my_range " << my_range_;
342                 buffer_.Reserve(2);
343                 buffer_.set_size(2);
344                 current_ = buffer_.begin();
345                 return;
346             }
347 
348             sLOG << "ReadLines: opening compressed file" << file_nr_
349                  << "my_range" << my_range_;
350 
351             stream_ = vfs::OpenReadStream(files_[file_nr_].path);
352 
353             buffer_.Reserve(read_size);
354             ReadBlock(stream_, buffer_);
355             data_.reserve(4 * 1024);
356         }
357 
358         //! returns the next element if one exists
359         //!
360         //! does no checks whether a next element exists!
Next()361         const std::string& Next() {
362             total_elements_++;
363             data_.clear();
364             while (true) {
365                 while (current_ < buffer_.end()) {
366                     if (TLX_UNLIKELY(*current_ == '\n')) {
367                         current_++;
368                         return data_;
369                     }
370                     else {
371                         data_.push_back(*current_++);
372                     }
373                 }
374 
375                 if (!ReadBlock(stream_, buffer_)) {
376                     LOG << "ReadLines: opening new compressed file!";
377                     stream_->close();
378                     file_nr_++;
379 
380                     if (file_nr_ < files_.size()) {
381                         stream_ = vfs::OpenReadStream(files_[file_nr_].path);
382                         ReadBlock(stream_, buffer_);
383                     }
384                     else {
385                         LOG << "ReadLines: reached last file";
386                         current_ = buffer_.begin();
387                     }
388 
389                     if (data_.length()) {
390                         LOG << "ReadLines: end - returning string of length"
391                             << data_.length();
392                         return data_;
393                     }
394                 }
395             }
396         }
397 
398         //! returns true, if an element is available in local part
HasNext()399         bool HasNext() {
400             if (files_.size_ex_psum(file_nr_) >= my_range_.end) {
401                 return false;
402             }
403 
404             // if block is fully read, read next block. needs to be done here
405             // as HasNext() has to know if file is finished
406             //         v-- no new line at end ||   v-- newline at end of file
407             if (current_ >= buffer_.end() || (current_ + 1 >= buffer_.end() && *current_ == '\n')) {
408                 LOG << "ReadLines: new buffer in HasNext()";
409                 ReadBlock(stream_, buffer_);
410                 if (buffer_.size() > 1 || (buffer_.size() == 1 && buffer_[0] != '\n')) {
411                     return true;
412                 }
413                 else {
414                     LOG << "ReadLines: opening new file in HasNext()";
415                     // already at last file
416                     if (file_nr_ >= files_.size() - 1) {
417                         return false;
418                     }
419                     stream_->close();
420                     // if (this worker reads at least one more file)
421                     if (my_range_.end > files_[file_nr_].size_inc_psum()) {
422                         file_nr_++;
423                         stream_ = vfs::OpenReadStream(files_[file_nr_].path);
424                         ReadBlock(stream_, buffer_);
425                         return true;
426                     }
427                     else {
428                         return false;
429                     }
430                 }
431             }
432             else {
433                 return true;
434             }
435         }
436 
437     private:
438         //! File handle to files_[file_nr_]
439         vfs::ReadStreamPtr stream_;
440     };
441 };
442 
443 /*!
444  * ReadLines is a DOp, which reads a file from the file system and
445  * creates an ordered DIA according to a given read function.
446  *
447  * \image html dia_ops/ReadLines.svg
448  *
449  * \param ctx Reference to the context object
450  * \param filepath Path of the file in the file system
451  *
452  * \ingroup dia_sources
453  */
ReadLines(Context & ctx,const std::string & filepath)454 DIA<std::string> ReadLines(Context& ctx, const std::string& filepath) {
455     return DIA<std::string>(
456         tlx::make_counting<ReadLinesNode>(
457             ctx, filepath, /* local_storage */ false));
458 }
459 
460 /*!
461  * ReadLines is a DOp, which reads a file from the file system and
462  * creates an ordered DIA according to a given read function.
463  *
464  * \image html dia_ops/ReadLines.svg
465  *
466  * \param ctx Reference to the context object
467  * \param filepath Path of the file in the file system
468  *
469  * \ingroup dia_sources
470  */
ReadLines(struct LocalStorageTag,Context & ctx,const std::string & filepath)471 DIA<std::string> ReadLines(struct LocalStorageTag, Context& ctx,
472                            const std::string& filepath) {
473     return DIA<std::string>(
474         tlx::make_counting<ReadLinesNode>(
475             ctx, filepath, /* local_storage */ true));
476 }
477 
478 /*!
479  * ReadLines is a DOp, which reads a file from the file system and
480  * creates an ordered DIA according to a given read function.
481  *
482  * \image html dia_ops/ReadLines.svg
483  *
484  * \param ctx Reference to the context object
485  * \param filepaths Path of the file in the file system
486  *
487  * \ingroup dia_sources
488  */
ReadLines(Context & ctx,const std::vector<std::string> & filepaths)489 DIA<std::string> ReadLines(
490     Context& ctx, const std::vector<std::string>& filepaths) {
491     return DIA<std::string>(
492         tlx::make_counting<ReadLinesNode>(
493             ctx, filepaths, /* local_storage */ false));
494 }
495 
496 /*!
497  * ReadLines is a DOp, which reads a file from the file system and
498  * creates an ordered DIA according to a given read function.
499  *
500  * \image html dia_ops/ReadLines.svg
501  *
502  * \param ctx Reference to the context object
503  * \param filepaths Path of the file in the file system
504  *
505  * \ingroup dia_sources
506  */
ReadLines(struct LocalStorageTag,Context & ctx,const std::vector<std::string> & filepaths)507 DIA<std::string> ReadLines(struct LocalStorageTag, Context& ctx,
508                            const std::vector<std::string>& filepaths) {
509     return DIA<std::string>(
510         tlx::make_counting<ReadLinesNode>(
511             ctx, filepaths, /* local_storage */ true));
512 }
513 
514 } // namespace api
515 
516 //! imported from api namespace
517 using api::ReadLines;
518 
519 } // namespace thrill
520 
521 #endif // !THRILL_API_READ_LINES_HEADER
522 
523 /******************************************************************************/
524