1 #include <cpp11/as.hpp>
2 #include <cpp11/function.hpp>
3 
4 #include "connection.h"
5 #include "fixed_width_index_connection.h"
6 #include "r_utils.h"
7 #include "unicode_fopen.h"
8 #include <array>
9 #include <fstream>
10 #include <future> // std::async, std::future
11 #include <utility>
12 
13 #ifdef VROOM_LOG
14 #include "spdlog/sinks/basic_file_sink.h" // support for basic file logging
15 #include "spdlog/spdlog.h"
16 #endif
17 
18 using namespace vroom;
19 
fixed_width_index_connection(SEXP in,std::vector<int> col_starts,std::vector<int> col_ends,bool trim_ws,const size_t skip,const char * comment,const bool skip_empty_rows,const size_t n_max,const bool progress,const size_t chunk_size)20 fixed_width_index_connection::fixed_width_index_connection(
21     SEXP in,
22     std::vector<int> col_starts,
23     std::vector<int> col_ends,
24     bool trim_ws,
25     const size_t skip,
26     const char* comment,
27     const bool skip_empty_rows,
28     const size_t n_max,
29     const bool progress,
30     const size_t chunk_size) {
31 
32   col_starts_ = std::move(col_starts);
33   col_ends_ = std::move(col_ends);
34   trim_ws_ = trim_ws;
35 
36   filename_ =
37       cpp11::as_cpp<std::string>(cpp11::package("vroom")["vroom_tempfile"]());
38 
39   std::FILE* out = unicode_fopen(filename_.c_str(), "wb");
40 
41   auto con = R_GetConnection(in);
42 
43   bool should_open = !is_open(in);
44   if (should_open) {
45     cpp11::package("base")["open"](in, "rb");
46   }
47 
48   std::array<std::vector<char>, 2> buf = {
49       std::vector<char>(chunk_size), std::vector<char>(chunk_size)};
50 
51   // A buf index that alternates between 0,1
52   auto i = 0;
53 
54   newlines_.reserve(128);
55 
56   size_t sz = R_ReadConnection(con, buf[i].data(), chunk_size - 1);
57   buf[i][sz] = '\0';
58 
59   // Parse header
60   size_t start = find_first_line(
61       buf[i],
62       skip,
63       comment,
64       skip_empty_rows,
65       /* embedded_nl */ false,
66       /* quote */ '\0');
67 
68   // Check for windows newlines
69   size_t first_nl;
70   newline_type nl_type;
71   std::tie(first_nl, nl_type) = find_next_newline(
72       buf[i],
73       start,
74       comment,
75       skip_empty_rows,
76       false,
77       /* quote */ '\0');
78 
79   bool n_max_set = n_max != static_cast<size_t>(-1);
80 
81   std::unique_ptr<RProgress::RProgress> pb = nullptr;
82   if (progress) {
83     pb = std::unique_ptr<RProgress::RProgress>(
84         new RProgress::RProgress(get_pb_format("connection"), 1e12));
85     pb->tick(start);
86   }
87 
88   size_t total_read = 0;
89   std::future<void> parse_fut;
90   std::future<void> write_fut;
91   size_t lines_read = 0;
92   size_t lines_remaining = n_max;
93   std::unique_ptr<RProgress::RProgress> empty_pb = nullptr;
94 
95   if (n_max > 0) {
96     newlines_.push_back(start - 1);
97   }
98 
99   while (sz > 0) {
100     if (parse_fut.valid()) {
101       parse_fut.wait();
102     }
103     if (lines_read >= lines_remaining) {
104       break;
105     }
106     lines_remaining -= lines_read;
107 
108     parse_fut = std::async([&, i, start, total_read, sz] {
109       lines_read = index_region(
110           buf[i],
111           newlines_,
112           start,
113           sz,
114           total_read,
115           comment,
116           skip_empty_rows,
117           lines_remaining,
118           empty_pb);
119     });
120 
121     if (write_fut.valid()) {
122       write_fut.wait();
123     }
124     write_fut = std::async(
125         [&, i, sz] { std::fwrite(buf[i].data(), sizeof(char), sz, out); });
126 
127     if (progress) {
128       pb->tick(sz);
129     }
130 
131     total_read += sz;
132 
133     i = (i + 1) % 2;
134     sz = R_ReadConnection(con, buf[i].data(), chunk_size - 1);
135     if (sz > 0) {
136       buf[i][sz] = '\0';
137     }
138 
139     start = 0;
140 
141     SPDLOG_DEBUG("first_nl_loc: {0} size: {1}", start, sz);
142   }
143 
144   if (parse_fut.valid()) {
145     parse_fut.wait();
146   }
147   if (write_fut.valid()) {
148     write_fut.wait();
149   }
150 
151   std::fclose(out);
152 
153   if (progress) {
154     pb->update(1);
155   }
156 
157   /* raw connections are always created as open, but we should close them */
158   bool should_close = should_open || Rf_inherits(in, "rawConnection");
159   if (should_close) {
160     cpp11::package("base")["close"](in);
161   }
162 
163   std::error_code error;
164   if (n_max != 0) {
165     mmap_ = make_mmap_source(filename_.c_str(), error);
166     if (error) {
167       cpp11::stop("%s", error.message().c_str());
168     }
169   }
170 
171   char last_char = mmap_[mmap_.size() - 1];
172   bool ends_with_newline = last_char == '\n' || last_char == '\r';
173   if (!n_max_set && !ends_with_newline) {
174     newlines_.push_back(mmap_.size());
175   }
176 
177 #ifdef VROOM_LOG
178   auto log = spdlog::basic_logger_mt(
179       "basic_logger", "logs/fixed_width_index_connection.idx", true);
180   log->set_level(spdlog::level::debug);
181   for (auto&& v : newlines_) {
182     SPDLOG_LOGGER_DEBUG(log, "{}", v);
183   }
184   SPDLOG_LOGGER_DEBUG(log, "end of idx {0:x}", (size_t)&newlines_);
185   spdlog::drop("basic_logger");
186 #endif
187 }
188