1 #include <cpp11/as.hpp>
2 #include <cpp11/function.hpp>
3
4 #include "connection.h"
5 #include "fixed_width_index_connection.h"
6 #include "r_utils.h"
7 #include "unicode_fopen.h"
8 #include <array>
9 #include <fstream>
10 #include <future> // std::async, std::future
11 #include <utility>
12
13 #ifdef VROOM_LOG
14 #include "spdlog/sinks/basic_file_sink.h" // support for basic file logging
15 #include "spdlog/spdlog.h"
16 #endif
17
18 using namespace vroom;
19
fixed_width_index_connection(SEXP in,std::vector<int> col_starts,std::vector<int> col_ends,bool trim_ws,const size_t skip,const char * comment,const bool skip_empty_rows,const size_t n_max,const bool progress,const size_t chunk_size)20 fixed_width_index_connection::fixed_width_index_connection(
21 SEXP in,
22 std::vector<int> col_starts,
23 std::vector<int> col_ends,
24 bool trim_ws,
25 const size_t skip,
26 const char* comment,
27 const bool skip_empty_rows,
28 const size_t n_max,
29 const bool progress,
30 const size_t chunk_size) {
31
32 col_starts_ = std::move(col_starts);
33 col_ends_ = std::move(col_ends);
34 trim_ws_ = trim_ws;
35
36 filename_ =
37 cpp11::as_cpp<std::string>(cpp11::package("vroom")["vroom_tempfile"]());
38
39 std::FILE* out = unicode_fopen(filename_.c_str(), "wb");
40
41 auto con = R_GetConnection(in);
42
43 bool should_open = !is_open(in);
44 if (should_open) {
45 cpp11::package("base")["open"](in, "rb");
46 }
47
48 std::array<std::vector<char>, 2> buf = {
49 std::vector<char>(chunk_size), std::vector<char>(chunk_size)};
50
51 // A buf index that alternates between 0,1
52 auto i = 0;
53
54 newlines_.reserve(128);
55
56 size_t sz = R_ReadConnection(con, buf[i].data(), chunk_size - 1);
57 buf[i][sz] = '\0';
58
59 // Parse header
60 size_t start = find_first_line(
61 buf[i],
62 skip,
63 comment,
64 skip_empty_rows,
65 /* embedded_nl */ false,
66 /* quote */ '\0');
67
68 // Check for windows newlines
69 size_t first_nl;
70 newline_type nl_type;
71 std::tie(first_nl, nl_type) = find_next_newline(
72 buf[i],
73 start,
74 comment,
75 skip_empty_rows,
76 false,
77 /* quote */ '\0');
78
79 bool n_max_set = n_max != static_cast<size_t>(-1);
80
81 std::unique_ptr<RProgress::RProgress> pb = nullptr;
82 if (progress) {
83 pb = std::unique_ptr<RProgress::RProgress>(
84 new RProgress::RProgress(get_pb_format("connection"), 1e12));
85 pb->tick(start);
86 }
87
88 size_t total_read = 0;
89 std::future<void> parse_fut;
90 std::future<void> write_fut;
91 size_t lines_read = 0;
92 size_t lines_remaining = n_max;
93 std::unique_ptr<RProgress::RProgress> empty_pb = nullptr;
94
95 if (n_max > 0) {
96 newlines_.push_back(start - 1);
97 }
98
99 while (sz > 0) {
100 if (parse_fut.valid()) {
101 parse_fut.wait();
102 }
103 if (lines_read >= lines_remaining) {
104 break;
105 }
106 lines_remaining -= lines_read;
107
108 parse_fut = std::async([&, i, start, total_read, sz] {
109 lines_read = index_region(
110 buf[i],
111 newlines_,
112 start,
113 sz,
114 total_read,
115 comment,
116 skip_empty_rows,
117 lines_remaining,
118 empty_pb);
119 });
120
121 if (write_fut.valid()) {
122 write_fut.wait();
123 }
124 write_fut = std::async(
125 [&, i, sz] { std::fwrite(buf[i].data(), sizeof(char), sz, out); });
126
127 if (progress) {
128 pb->tick(sz);
129 }
130
131 total_read += sz;
132
133 i = (i + 1) % 2;
134 sz = R_ReadConnection(con, buf[i].data(), chunk_size - 1);
135 if (sz > 0) {
136 buf[i][sz] = '\0';
137 }
138
139 start = 0;
140
141 SPDLOG_DEBUG("first_nl_loc: {0} size: {1}", start, sz);
142 }
143
144 if (parse_fut.valid()) {
145 parse_fut.wait();
146 }
147 if (write_fut.valid()) {
148 write_fut.wait();
149 }
150
151 std::fclose(out);
152
153 if (progress) {
154 pb->update(1);
155 }
156
157 /* raw connections are always created as open, but we should close them */
158 bool should_close = should_open || Rf_inherits(in, "rawConnection");
159 if (should_close) {
160 cpp11::package("base")["close"](in);
161 }
162
163 std::error_code error;
164 if (n_max != 0) {
165 mmap_ = make_mmap_source(filename_.c_str(), error);
166 if (error) {
167 cpp11::stop("%s", error.message().c_str());
168 }
169 }
170
171 char last_char = mmap_[mmap_.size() - 1];
172 bool ends_with_newline = last_char == '\n' || last_char == '\r';
173 if (!n_max_set && !ends_with_newline) {
174 newlines_.push_back(mmap_.size());
175 }
176
177 #ifdef VROOM_LOG
178 auto log = spdlog::basic_logger_mt(
179 "basic_logger", "logs/fixed_width_index_connection.idx", true);
180 log->set_level(spdlog::level::debug);
181 for (auto&& v : newlines_) {
182 SPDLOG_LOGGER_DEBUG(log, "{}", v);
183 }
184 SPDLOG_LOGGER_DEBUG(log, "end of idx {0:x}", (size_t)&newlines_);
185 spdlog::drop("basic_logger");
186 #endif
187 }
188