1 /** Implementation of the pqxx::stream_from class.
2 *
3 * pqxx::stream_from enables optimized batch reads from a database table.
4 *
5 * Copyright (c) 2000-2020, Jeroen T. Vermeulen.
6 *
7 * See COPYING for copyright license. If you did not receive a file called
8 * COPYING with this source code, please notify the distributor of this
9 * mistake, or contact the author.
10 */
11 #include "pqxx-source.hxx"
12
13 #include "pqxx/stream_from"
14
15 #include "pqxx/internal/encodings.hxx"
16 #include "pqxx/internal/gates/connection-stream_from.hxx"
17 #include "pqxx/transaction_base.hxx"
18
19
compose_query(pqxx::transaction_base const & tx,std::string_view table,std::string const & columns)20 std::string pqxx::stream_from::compose_query(
21 pqxx::transaction_base const &tx, std::string_view table,
22 std::string const &columns)
23 {
24 constexpr std::string_view copy{"COPY "}, to_stdout{" TO STDOUT"};
25 auto const escaped_table{tx.quote_name(table)};
26 std::string command;
27 command.reserve(
28 std::size(copy) + std::size(escaped_table) + std::size(columns) + 2 +
29 std::size(to_stdout));
30 command += copy;
31 command += escaped_table;
32
33 if (not std::empty(columns))
34 {
35 command.push_back('(');
36 command += columns;
37 command.push_back(')');
38 }
39
40 command += to_stdout;
41 return command;
42 }
43
44
45 namespace
46 {
47 pqxx::internal::glyph_scanner_func *
get_scanner(pqxx::transaction_base const & tx)48 get_scanner(pqxx::transaction_base const &tx)
49 {
50 auto const group{pqxx::internal::enc_group(tx.conn().encoding_id())};
51 return pqxx::internal::get_glyph_scanner(group);
52 }
53 } // namespace
54
55
stream_from(transaction_base & tx,from_query_t,std::string_view query)56 pqxx::stream_from::stream_from(
57 transaction_base &tx, from_query_t, std::string_view query) :
58 namedclass{"stream_from"},
59 transactionfocus{tx},
60 m_glyph_scanner{get_scanner(tx)}
61 {
62 constexpr std::string_view copy{"COPY ("}, to_stdout{") TO STDOUT"};
63 std::string command;
64 command.reserve(std::size(copy) + std::size(query) + std::size(to_stdout));
65 command += copy;
66 command += query;
67 command += to_stdout;
68 tx.exec0(command);
69
70 register_me();
71 }
72
73
stream_from(transaction_base & tx,from_table_t,std::string_view table)74 pqxx::stream_from::stream_from(
75 transaction_base &tx, from_table_t, std::string_view table) :
76 namedclass{"stream_from", table},
77 transactionfocus{tx},
78 m_glyph_scanner{get_scanner(tx)}
79 {
80 auto const command{compose_query(tx, table, "")};
81 tx.exec0(command);
82 register_me();
83 }
84
85
stream_from(transaction_base & tx,std::string_view table,std::string && columns,from_table_t)86 pqxx::stream_from::stream_from(
87 transaction_base &tx, std::string_view table, std::string &&columns,
88 from_table_t) :
89 namedclass{"stream_from", table},
90 transactionfocus{tx},
91 m_glyph_scanner{get_scanner(tx)}
92 {
93 auto const command{compose_query(tx, table, columns)};
94 tx.exec0(command);
95 register_me();
96 }
97
98
~stream_from()99 pqxx::stream_from::~stream_from() noexcept
100 {
101 try
102 {
103 close();
104 }
105 catch (std::exception const &e)
106 {
107 reg_pending_error(e.what());
108 }
109 }
110
111
get_raw_line()112 pqxx::stream_from::raw_line pqxx::stream_from::get_raw_line()
113 {
114 if (*this)
115 {
116 internal::gate::connection_stream_from gate{m_trans.conn()};
117 try
118 {
119 raw_line line{gate.read_copy_line()};
120 if (line.first.get() == nullptr)
121 close();
122 return line;
123 }
124 catch (std::exception const &)
125 {
126 close();
127 throw;
128 }
129 }
130 else
131 {
132 return raw_line{};
133 }
134 }
135
136
close()137 void pqxx::stream_from::close()
138 {
139 if (not m_finished)
140 {
141 m_finished = true;
142 unregister_me();
143 }
144 }
145
146
complete()147 void pqxx::stream_from::complete()
148 {
149 if (m_finished)
150 return;
151 try
152 {
153 // Flush any remaining lines - libpq will automatically close the stream
154 // when it hits the end.
155 bool done{false};
156 while (not done)
157 {
158 auto [line, size] = get_raw_line();
159 ignore_unused(size);
160 done = not line.get();
161 }
162 }
163 catch (broken_connection const &)
164 {
165 close();
166 throw;
167 }
168 catch (std::exception const &e)
169 {
170 reg_pending_error(e.what());
171 }
172 close();
173 }
174
175
parse_line()176 void pqxx::stream_from::parse_line()
177 {
178 if (m_finished)
179 return;
180 auto const next_seq{m_glyph_scanner};
181
182 m_fields.clear();
183
184 auto const [line, line_size] = get_raw_line();
185 if (line.get() == nullptr)
186 m_finished = true;
187
188 // Make room for unescaping the line. It's a pessimistic size.
189 // Unusually, we're storing terminating zeroes *inside* the string.
190 // This is the only place where we modify m_row. MAKE SURE THE BUFFER DOES
191 // NOT GET RESIZED while we're working, because we're working with pointers
192 // into its buffer.
193 m_row.resize(line_size + 1);
194
195 char const *line_begin{line.get()};
196 char const *line_end{line_begin + line_size};
197 char const *read{line_begin};
198
199 // Output iterator for unescaped text.
200 char *write{m_row.data()};
201
202 // Beginning of current field in m_row, or nullptr for null fields.
203 char const *field_begin{write};
204
205 while (read < line_end)
206 {
207 auto const offset{static_cast<std::size_t>(read - line_begin)};
208 auto const glyph_end{line_begin + next_seq(line_begin, line_size, offset)};
209 if (glyph_end == read + 1)
210 {
211 // Single-byte character.
212 char c{*read++};
213 switch (c)
214 {
215 case '\t': // Field separator.
216 // End the field.
217 if (field_begin == nullptr)
218 {
219 m_fields.emplace_back();
220 }
221 else
222 {
223 // Would love to emplace_back() here, but gcc 9.1 warns about the
224 // constructor not throwing. It suggests adding "noexcept." Which
225 // we can hardly do, without std::string_view guaranteeing it.
226 m_fields.push_back(zview{field_begin, write - field_begin});
227 *write++ = '\0';
228 }
229 field_begin = write;
230 break;
231
232 case '\\': {
233 // Escape sequence.
234 if (read >= line_end)
235 throw failure{"Row ends in backslash"};
236
237 c = *read++;
238 switch (c)
239 {
240 case 'N':
241 // Null value.
242 if (write != field_begin)
243 throw failure{"Null sequence found in nonempty field"};
244 field_begin = nullptr;
245 // (If there's any characters _after_ the null we'll just crash.)
246 break;
247
248 case 'b': // Backspace.
249 *write++ = '\b';
250 break;
251 case 'f': // Form feed
252 *write++ = '\f';
253 break;
254 case 'n': // Line feed.
255 *write++ = '\n';
256 break;
257 case 'r': // Carriage return.
258 *write++ = '\r';
259 break;
260 case 't': // Horizontal tab.
261 *write++ = '\t';
262 break;
263 case 'v': // Vertical tab.
264 *write++ = '\v';
265 break;
266
267 default:
268 // Regular character ("self-escaped").
269 *write++ = c;
270 break;
271 }
272 }
273 break;
274
275 default: *write++ = c; break;
276 }
277 }
278 else
279 {
280 // Multi-byte sequence. Never treated specially, so just append.
281 while (read < glyph_end) *write++ = *read++;
282 }
283 }
284
285 // End the last field here.
286 if (field_begin == nullptr)
287 {
288 m_fields.emplace_back();
289 }
290 else
291 {
292 m_fields.push_back(zview{field_begin, write - field_begin});
293 *write++ = '\0';
294 }
295
296 // DO NOT shrink m_row to fit. We're carrying string_views pointing into
297 // the buffer. (Also, how useful would shrinking really be?)
298 }
299
300
read_row()301 std::vector<pqxx::zview> const *pqxx::stream_from::read_row()
302 {
303 parse_line();
304 return m_finished ? nullptr : &m_fields;
305 }
306