1 /** Implementation of the pqxx::stream_from class.
2  *
3  * pqxx::stream_from enables optimized batch reads from a database table.
4  *
5  * Copyright (c) 2000-2020, Jeroen T. Vermeulen.
6  *
7  * See COPYING for copyright license.  If you did not receive a file called
8  * COPYING with this source code, please notify the distributor of this
9  * mistake, or contact the author.
10  */
11 #include "pqxx-source.hxx"
12 
13 #include "pqxx/stream_from"
14 
15 #include "pqxx/internal/encodings.hxx"
16 #include "pqxx/internal/gates/connection-stream_from.hxx"
17 #include "pqxx/transaction_base.hxx"
18 
19 
compose_query(pqxx::transaction_base const & tx,std::string_view table,std::string const & columns)20 std::string pqxx::stream_from::compose_query(
21   pqxx::transaction_base const &tx, std::string_view table,
22   std::string const &columns)
23 {
24   constexpr std::string_view copy{"COPY "}, to_stdout{" TO STDOUT"};
25   auto const escaped_table{tx.quote_name(table)};
26   std::string command;
27   command.reserve(
28     std::size(copy) + std::size(escaped_table) + std::size(columns) + 2 +
29     std::size(to_stdout));
30   command += copy;
31   command += escaped_table;
32 
33   if (not std::empty(columns))
34   {
35     command.push_back('(');
36     command += columns;
37     command.push_back(')');
38   }
39 
40   command += to_stdout;
41   return command;
42 }
43 
44 
45 namespace
46 {
47 pqxx::internal::glyph_scanner_func *
get_scanner(pqxx::transaction_base const & tx)48 get_scanner(pqxx::transaction_base const &tx)
49 {
50   auto const group{pqxx::internal::enc_group(tx.conn().encoding_id())};
51   return pqxx::internal::get_glyph_scanner(group);
52 }
53 } // namespace
54 
55 
stream_from(transaction_base & tx,from_query_t,std::string_view query)56 pqxx::stream_from::stream_from(
57   transaction_base &tx, from_query_t, std::string_view query) :
58         namedclass{"stream_from"},
59         transactionfocus{tx},
60         m_glyph_scanner{get_scanner(tx)}
61 {
62   constexpr std::string_view copy{"COPY ("}, to_stdout{") TO STDOUT"};
63   std::string command;
64   command.reserve(std::size(copy) + std::size(query) + std::size(to_stdout));
65   command += copy;
66   command += query;
67   command += to_stdout;
68   tx.exec0(command);
69 
70   register_me();
71 }
72 
73 
stream_from(transaction_base & tx,from_table_t,std::string_view table)74 pqxx::stream_from::stream_from(
75   transaction_base &tx, from_table_t, std::string_view table) :
76         namedclass{"stream_from", table},
77         transactionfocus{tx},
78         m_glyph_scanner{get_scanner(tx)}
79 {
80   auto const command{compose_query(tx, table, "")};
81   tx.exec0(command);
82   register_me();
83 }
84 
85 
stream_from(transaction_base & tx,std::string_view table,std::string && columns,from_table_t)86 pqxx::stream_from::stream_from(
87   transaction_base &tx, std::string_view table, std::string &&columns,
88   from_table_t) :
89         namedclass{"stream_from", table},
90         transactionfocus{tx},
91         m_glyph_scanner{get_scanner(tx)}
92 {
93   auto const command{compose_query(tx, table, columns)};
94   tx.exec0(command);
95   register_me();
96 }
97 
98 
~stream_from()99 pqxx::stream_from::~stream_from() noexcept
100 {
101   try
102   {
103     close();
104   }
105   catch (std::exception const &e)
106   {
107     reg_pending_error(e.what());
108   }
109 }
110 
111 
get_raw_line()112 pqxx::stream_from::raw_line pqxx::stream_from::get_raw_line()
113 {
114   if (*this)
115   {
116     internal::gate::connection_stream_from gate{m_trans.conn()};
117     try
118     {
119       raw_line line{gate.read_copy_line()};
120       if (line.first.get() == nullptr)
121         close();
122       return line;
123     }
124     catch (std::exception const &)
125     {
126       close();
127       throw;
128     }
129   }
130   else
131   {
132     return raw_line{};
133   }
134 }
135 
136 
close()137 void pqxx::stream_from::close()
138 {
139   if (not m_finished)
140   {
141     m_finished = true;
142     unregister_me();
143   }
144 }
145 
146 
complete()147 void pqxx::stream_from::complete()
148 {
149   if (m_finished)
150     return;
151   try
152   {
153     // Flush any remaining lines - libpq will automatically close the stream
154     // when it hits the end.
155     bool done{false};
156     while (not done)
157     {
158       auto [line, size] = get_raw_line();
159       ignore_unused(size);
160       done = not line.get();
161     }
162   }
163   catch (broken_connection const &)
164   {
165     close();
166     throw;
167   }
168   catch (std::exception const &e)
169   {
170     reg_pending_error(e.what());
171   }
172   close();
173 }
174 
175 
parse_line()176 void pqxx::stream_from::parse_line()
177 {
178   if (m_finished)
179     return;
180   auto const next_seq{m_glyph_scanner};
181 
182   m_fields.clear();
183 
184   auto const [line, line_size] = get_raw_line();
185   if (line.get() == nullptr)
186     m_finished = true;
187 
188   // Make room for unescaping the line.  It's a pessimistic size.
189   // Unusually, we're storing terminating zeroes *inside* the string.
190   // This is the only place where we modify m_row.  MAKE SURE THE BUFFER DOES
191   // NOT GET RESIZED while we're working, because we're working with pointers
192   // into its buffer.
193   m_row.resize(line_size + 1);
194 
195   char const *line_begin{line.get()};
196   char const *line_end{line_begin + line_size};
197   char const *read{line_begin};
198 
199   // Output iterator for unescaped text.
200   char *write{m_row.data()};
201 
202   // Beginning of current field in m_row, or nullptr for null fields.
203   char const *field_begin{write};
204 
205   while (read < line_end)
206   {
207     auto const offset{static_cast<std::size_t>(read - line_begin)};
208     auto const glyph_end{line_begin + next_seq(line_begin, line_size, offset)};
209     if (glyph_end == read + 1)
210     {
211       // Single-byte character.
212       char c{*read++};
213       switch (c)
214       {
215       case '\t': // Field separator.
216         // End the field.
217         if (field_begin == nullptr)
218         {
219           m_fields.emplace_back();
220         }
221         else
222         {
223           // Would love to emplace_back() here, but gcc 9.1 warns about the
224           // constructor not throwing.  It suggests adding "noexcept."  Which
225           // we can hardly do, without std::string_view guaranteeing it.
226           m_fields.push_back(zview{field_begin, write - field_begin});
227           *write++ = '\0';
228         }
229         field_begin = write;
230         break;
231 
232       case '\\': {
233         // Escape sequence.
234         if (read >= line_end)
235           throw failure{"Row ends in backslash"};
236 
237         c = *read++;
238         switch (c)
239         {
240         case 'N':
241           // Null value.
242           if (write != field_begin)
243             throw failure{"Null sequence found in nonempty field"};
244           field_begin = nullptr;
245           // (If there's any characters _after_ the null we'll just crash.)
246           break;
247 
248         case 'b': // Backspace.
249           *write++ = '\b';
250           break;
251         case 'f': // Form feed
252           *write++ = '\f';
253           break;
254         case 'n': // Line feed.
255           *write++ = '\n';
256           break;
257         case 'r': // Carriage return.
258           *write++ = '\r';
259           break;
260         case 't': // Horizontal tab.
261           *write++ = '\t';
262           break;
263         case 'v': // Vertical tab.
264           *write++ = '\v';
265           break;
266 
267         default:
268           // Regular character ("self-escaped").
269           *write++ = c;
270           break;
271         }
272       }
273       break;
274 
275       default: *write++ = c; break;
276       }
277     }
278     else
279     {
280       // Multi-byte sequence.  Never treated specially, so just append.
281       while (read < glyph_end) *write++ = *read++;
282     }
283   }
284 
285   // End the last field here.
286   if (field_begin == nullptr)
287   {
288     m_fields.emplace_back();
289   }
290   else
291   {
292     m_fields.push_back(zview{field_begin, write - field_begin});
293     *write++ = '\0';
294   }
295 
296   // DO NOT shrink m_row to fit.  We're carrying string_views pointing into
297   // the buffer.  (Also, how useful would shrinking really be?)
298 }
299 
300 
read_row()301 std::vector<pqxx::zview> const *pqxx::stream_from::read_row()
302 {
303   parse_line();
304   return m_finished ? nullptr : &m_fields;
305 }
306