1 /** Implementation of the pqxx::pipeline class.
2  *
3  * Throughput-optimized query interface.
4  *
5  * Copyright (c) 2000-2020, Jeroen T. Vermeulen.
6  *
7  * See COPYING for copyright license.  If you did not receive a file called
8  * COPYING with this source code, please notify the distributor of this
9  * mistake, or contact the author.
10  */
11 #include "pqxx-source.hxx"
12 
13 #include <iterator>
14 
15 #include "pqxx/dbtransaction"
16 #include "pqxx/pipeline"
17 #include "pqxx/separated_list"
18 
19 #include "pqxx/internal/gates/connection-pipeline.hxx"
20 #include "pqxx/internal/gates/result-creation.hxx"
21 #include "pqxx/internal/gates/result-pipeline.hxx"
22 
23 
24 namespace
25 {
26 std::string const theSeparator{"; "};
27 std::string const theDummyValue{"1"};
28 std::string const theDummyQuery{"SELECT " + theDummyValue + theSeparator};
29 } // namespace
30 
31 
init()32 void pqxx::pipeline::init()
33 {
34   m_encoding = internal::enc_group(m_trans.conn().encoding_id());
35   m_issuedrange = make_pair(std::end(m_queries), std::end(m_queries));
36   attach();
37 }
38 
39 
~pipeline()40 pqxx::pipeline::~pipeline() noexcept
41 {
42   try
43   {
44     cancel();
45   }
46   catch (std::exception const &)
47   {}
48   detach();
49 }
50 
51 
attach()52 void pqxx::pipeline::attach()
53 {
54   if (not registered())
55     register_me();
56 }
57 
58 
detach()59 void pqxx::pipeline::detach()
60 {
61   if (registered())
62     unregister_me();
63 }
64 
65 
insert(std::string_view q)66 pqxx::pipeline::query_id pqxx::pipeline::insert(std::string_view q)
67 {
68   attach();
69   query_id const qid{generate_id()};
70   auto const i{m_queries.insert(std::make_pair(qid, Query(q))).first};
71 
72   if (m_issuedrange.second == std::end(m_queries))
73   {
74     m_issuedrange.second = i;
75     if (m_issuedrange.first == std::end(m_queries))
76       m_issuedrange.first = i;
77   }
78   m_num_waiting++;
79 
80   if (m_num_waiting > m_retain)
81   {
82     if (have_pending())
83       receive_if_available();
84     if (not have_pending())
85       issue();
86   }
87 
88   return qid;
89 }
90 
91 
complete()92 void pqxx::pipeline::complete()
93 {
94   if (have_pending())
95     receive(m_issuedrange.second);
96   if (m_num_waiting and (m_error == qid_limit()))
97   {
98     issue();
99     receive(std::end(m_queries));
100   }
101   detach();
102 }
103 
104 
flush()105 void pqxx::pipeline::flush()
106 {
107   if (not std::empty(m_queries))
108   {
109     if (have_pending())
110       receive(m_issuedrange.second);
111     m_issuedrange.first = m_issuedrange.second = std::end(m_queries);
112     m_num_waiting = 0;
113     m_dummy_pending = false;
114     m_queries.clear();
115   }
116   detach();
117 }
118 
119 
cancel()120 void pqxx::pipeline::cancel()
121 {
122   while (have_pending())
123   {
124     pqxx::internal::gate::connection_pipeline(m_trans.conn()).cancel_query();
125     auto canceled_query{m_issuedrange.first};
126     ++m_issuedrange.first;
127     m_queries.erase(canceled_query);
128   }
129 }
130 
131 
is_finished(pipeline::query_id q) const132 bool pqxx::pipeline::is_finished(pipeline::query_id q) const
133 {
134   if (m_queries.find(q) == std::end(m_queries))
135     throw std::logic_error{
136       "Requested status for unknown query '" + to_string(q) + "'."};
137   return (QueryMap::const_iterator(m_issuedrange.first) ==
138           std::end(m_queries)) or
139          (q < m_issuedrange.first->first and q < m_error);
140 }
141 
142 
retrieve()143 std::pair<pqxx::pipeline::query_id, pqxx::result> pqxx::pipeline::retrieve()
144 {
145   if (std::empty(m_queries))
146     throw std::logic_error{"Attempt to retrieve result from empty pipeline."};
147   return retrieve(std::begin(m_queries));
148 }
149 
150 
retain(int retain_max)151 int pqxx::pipeline::retain(int retain_max)
152 {
153   if (retain_max < 0)
154     throw range_error{
155       "Attempt to make pipeline retain " + to_string(retain_max) + " queries"};
156 
157   int const oldvalue{m_retain};
158   m_retain = retain_max;
159 
160   if (m_num_waiting >= m_retain)
161     resume();
162 
163   return oldvalue;
164 }
165 
166 
resume()167 void pqxx::pipeline::resume()
168 {
169   if (have_pending())
170     receive_if_available();
171   if (not have_pending() and m_num_waiting)
172   {
173     issue();
174     receive_if_available();
175   }
176 }
177 
178 
generate_id()179 pqxx::pipeline::query_id pqxx::pipeline::generate_id()
180 {
181   if (m_q_id == qid_limit())
182     throw std::overflow_error{"Too many queries went through pipeline."};
183   ++m_q_id;
184   return m_q_id;
185 }
186 
187 
issue()188 void pqxx::pipeline::issue()
189 {
190   // Retrieve that null result for the last query, if needed.
191   obtain_result();
192 
193   // Don't issue anything if we've encountered an error.
194   if (m_error < qid_limit())
195     return;
196 
197   // Start with oldest query (lowest id) not in previous issue range.
198   auto oldest{m_issuedrange.second};
199 
200   // Construct cumulative query string for entire batch.
201   auto cum{separated_list(
202     theSeparator, oldest, std::end(m_queries),
203     [](QueryMap::const_iterator i) { return i->second.query; })};
204   auto const num_issued{
205     QueryMap::size_type(std::distance(oldest, std::end(m_queries)))};
206   bool const prepend_dummy{num_issued > 1};
207   if (prepend_dummy)
208     cum = theDummyQuery + cum;
209 
210   pqxx::internal::gate::connection_pipeline{m_trans.conn()}.start_exec(
211     cum.c_str());
212 
213   // Since we managed to send out these queries, update state to reflect this.
214   m_dummy_pending = prepend_dummy;
215   m_issuedrange.first = oldest;
216   m_issuedrange.second = std::end(m_queries);
217   m_num_waiting -= check_cast<int>(num_issued, "pipeline issue()");
218 }
219 
220 
internal_error(std::string const & err)221 void pqxx::pipeline::internal_error(std::string const &err)
222 {
223   set_error_at(0);
224   throw pqxx::internal_error{err};
225 }
226 
227 
obtain_result(bool expect_none)228 bool pqxx::pipeline::obtain_result(bool expect_none)
229 {
230   pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
231   auto const r{gate.get_result()};
232   if (r == nullptr)
233   {
234     if (have_pending() and not expect_none)
235     {
236       set_error_at(m_issuedrange.first->first);
237       m_issuedrange.second = m_issuedrange.first;
238     }
239     return false;
240   }
241 
242   result const res{pqxx::internal::gate::result_creation::create(
243     r, std::begin(m_queries)->second.query, m_encoding)};
244 
245   if (not have_pending())
246   {
247     set_error_at(std::begin(m_queries)->first);
248     throw std::logic_error{
249       "Got more results from pipeline than there were queries."};
250   }
251 
252   // Must be the result for the oldest pending query.
253   if (not std::empty(m_issuedrange.first->second.res))
254     internal_error("Multiple results for one query.");
255 
256   m_issuedrange.first->second.res = res;
257   ++m_issuedrange.first;
258 
259   return true;
260 }
261 
262 
obtain_dummy()263 void pqxx::pipeline::obtain_dummy()
264 {
265   // Allocate once, re-use across invocations.
266   static auto const text{
267     std::make_shared<std::string>("[DUMMY PIPELINE QUERY]")};
268 
269   pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
270   auto const r{gate.get_result()};
271   m_dummy_pending = false;
272 
273   if (r == nullptr)
274     internal_error(
275       "Pipeline got no result from backend when it expected one.");
276 
277   result R{pqxx::internal::gate::result_creation::create(r, text, m_encoding)};
278 
279   bool OK{false};
280   try
281   {
282     pqxx::internal::gate::result_creation{R}.check_status();
283     OK = true;
284   }
285   catch (sql_error const &)
286   {}
287   if (OK)
288   {
289     if (std::size(R) > 1)
290       internal_error("Unexpected result for dummy query in pipeline.");
291 
292     if (R.at(0).at(0).as<std::string>() != theDummyValue)
293       internal_error("Dummy query in pipeline returned unexpected value.");
294     return;
295   }
296 
297   // XXX: Can we actually re-issue statements after a failure?
298   /* Execution of this batch failed.
299    *
300    * When we send multiple statements in one go, the backend treats them as a
301    * single transaction.  So the entire batch was effectively rolled back.
302    *
303    * Since none of the queries in the batch were actually executed, we can
304    * afford to replay them one by one until we find the exact query that
305    * caused the error.  This gives us not only a more specific error message
306    * to report, but also tells us which query to report it for.
307    */
308   // First, give the whole batch the same syntax error message, in case all
309   // else is going to fail.
310   for (auto i{m_issuedrange.first}; i != m_issuedrange.second; ++i)
311     i->second.res = R;
312 
313   // Remember where the end of this batch was
314   auto const stop{m_issuedrange.second};
315 
316   // Retrieve that null result for the last query, if needed
317   obtain_result(true);
318 
319   // Reset internal state to forget botched batch attempt
320   m_num_waiting += check_cast<int>(
321     std::distance(m_issuedrange.first, stop), "pipeline obtain_dummy()");
322   m_issuedrange.second = m_issuedrange.first;
323 
324   // Issue queries in failed batch one at a time.
325   unregister_me();
326   try
327   {
328     do
329     {
330       m_num_waiting--;
331       auto const query{*m_issuedrange.first->second.query};
332       auto &holder{m_issuedrange.first->second};
333       holder.res = m_trans.exec(query);
334       pqxx::internal::gate::result_creation{holder.res}.check_status();
335       ++m_issuedrange.first;
336     } while (m_issuedrange.first != stop);
337   }
338   catch (std::exception const &)
339   {
340     auto const thud{m_issuedrange.first->first};
341     ++m_issuedrange.first;
342     m_issuedrange.second = m_issuedrange.first;
343     auto q{m_issuedrange.first};
344     set_error_at((q == std::end(m_queries)) ? thud + 1 : q->first);
345   }
346 }
347 
348 
349 std::pair<pqxx::pipeline::query_id, pqxx::result>
retrieve(pipeline::QueryMap::iterator q)350 pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
351 {
352   if (q == std::end(m_queries))
353     throw std::logic_error{"Attempt to retrieve result for unknown query."};
354 
355   if (q->first >= m_error)
356     throw std::runtime_error{
357       "Could not complete query in pipeline due to error in earlier query."};
358 
359   // If query hasn't issued yet, do it now.
360   if (
361     m_issuedrange.second != std::end(m_queries) and
362     (q->first >= m_issuedrange.second->first))
363   {
364     if (have_pending())
365       receive(m_issuedrange.second);
366     if (m_error == qid_limit())
367       issue();
368   }
369 
370   // If result not in yet, get it; else get at least whatever's convenient.
371   if (have_pending())
372   {
373     if (q->first >= m_issuedrange.first->first)
374     {
375       auto suc{q};
376       ++suc;
377       receive(suc);
378     }
379     else
380     {
381       receive_if_available();
382     }
383   }
384 
385   if (q->first >= m_error)
386     throw std::runtime_error{
387       "Could not complete query in pipeline due to error in earlier query."};
388 
389   // Don't leave the backend idle if there are queries waiting to be issued.
390   if (m_num_waiting and not have_pending() and (m_error == qid_limit()))
391     issue();
392 
393   result const R{q->second.res};
394   auto const P{std::make_pair(q->first, R)};
395 
396   m_queries.erase(q);
397 
398   pqxx::internal::gate::result_creation{R}.check_status();
399   return P;
400 }
401 
402 
get_further_available_results()403 void pqxx::pipeline::get_further_available_results()
404 {
405   pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
406   while (not gate.is_busy() and obtain_result())
407     if (not gate.consume_input())
408       throw broken_connection{};
409 }
410 
411 
receive_if_available()412 void pqxx::pipeline::receive_if_available()
413 {
414   pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
415   if (not gate.consume_input())
416     throw broken_connection{};
417   if (gate.is_busy())
418     return;
419 
420   if (m_dummy_pending)
421     obtain_dummy();
422   if (have_pending())
423     get_further_available_results();
424 }
425 
426 
receive(pipeline::QueryMap::const_iterator stop)427 void pqxx::pipeline::receive(pipeline::QueryMap::const_iterator stop)
428 {
429   if (m_dummy_pending)
430     obtain_dummy();
431 
432   while (obtain_result() and
433          QueryMap::const_iterator{m_issuedrange.first} != stop)
434     ;
435 
436   // Also haul in any remaining "targets of opportunity".
437   if (QueryMap::const_iterator{m_issuedrange.first} == stop)
438     get_further_available_results();
439 }
440