1 /** Implementation of the pqxx::pipeline class.
2 *
3 * Throughput-optimized query interface.
4 *
5 * Copyright (c) 2000-2020, Jeroen T. Vermeulen.
6 *
7 * See COPYING for copyright license. If you did not receive a file called
8 * COPYING with this source code, please notify the distributor of this
9 * mistake, or contact the author.
10 */
11 #include "pqxx-source.hxx"
12
13 #include <iterator>
14
15 #include "pqxx/dbtransaction"
16 #include "pqxx/pipeline"
17 #include "pqxx/separated_list"
18
19 #include "pqxx/internal/gates/connection-pipeline.hxx"
20 #include "pqxx/internal/gates/result-creation.hxx"
21 #include "pqxx/internal/gates/result-pipeline.hxx"
22
23
24 namespace
25 {
26 std::string const theSeparator{"; "};
27 std::string const theDummyValue{"1"};
28 std::string const theDummyQuery{"SELECT " + theDummyValue + theSeparator};
29 } // namespace
30
31
init()32 void pqxx::pipeline::init()
33 {
34 m_encoding = internal::enc_group(m_trans.conn().encoding_id());
35 m_issuedrange = make_pair(std::end(m_queries), std::end(m_queries));
36 attach();
37 }
38
39
~pipeline()40 pqxx::pipeline::~pipeline() noexcept
41 {
42 try
43 {
44 cancel();
45 }
46 catch (std::exception const &)
47 {}
48 detach();
49 }
50
51
attach()52 void pqxx::pipeline::attach()
53 {
54 if (not registered())
55 register_me();
56 }
57
58
detach()59 void pqxx::pipeline::detach()
60 {
61 if (registered())
62 unregister_me();
63 }
64
65
insert(std::string_view q)66 pqxx::pipeline::query_id pqxx::pipeline::insert(std::string_view q)
67 {
68 attach();
69 query_id const qid{generate_id()};
70 auto const i{m_queries.insert(std::make_pair(qid, Query(q))).first};
71
72 if (m_issuedrange.second == std::end(m_queries))
73 {
74 m_issuedrange.second = i;
75 if (m_issuedrange.first == std::end(m_queries))
76 m_issuedrange.first = i;
77 }
78 m_num_waiting++;
79
80 if (m_num_waiting > m_retain)
81 {
82 if (have_pending())
83 receive_if_available();
84 if (not have_pending())
85 issue();
86 }
87
88 return qid;
89 }
90
91
complete()92 void pqxx::pipeline::complete()
93 {
94 if (have_pending())
95 receive(m_issuedrange.second);
96 if (m_num_waiting and (m_error == qid_limit()))
97 {
98 issue();
99 receive(std::end(m_queries));
100 }
101 detach();
102 }
103
104
flush()105 void pqxx::pipeline::flush()
106 {
107 if (not std::empty(m_queries))
108 {
109 if (have_pending())
110 receive(m_issuedrange.second);
111 m_issuedrange.first = m_issuedrange.second = std::end(m_queries);
112 m_num_waiting = 0;
113 m_dummy_pending = false;
114 m_queries.clear();
115 }
116 detach();
117 }
118
119
cancel()120 void pqxx::pipeline::cancel()
121 {
122 while (have_pending())
123 {
124 pqxx::internal::gate::connection_pipeline(m_trans.conn()).cancel_query();
125 auto canceled_query{m_issuedrange.first};
126 ++m_issuedrange.first;
127 m_queries.erase(canceled_query);
128 }
129 }
130
131
is_finished(pipeline::query_id q) const132 bool pqxx::pipeline::is_finished(pipeline::query_id q) const
133 {
134 if (m_queries.find(q) == std::end(m_queries))
135 throw std::logic_error{
136 "Requested status for unknown query '" + to_string(q) + "'."};
137 return (QueryMap::const_iterator(m_issuedrange.first) ==
138 std::end(m_queries)) or
139 (q < m_issuedrange.first->first and q < m_error);
140 }
141
142
retrieve()143 std::pair<pqxx::pipeline::query_id, pqxx::result> pqxx::pipeline::retrieve()
144 {
145 if (std::empty(m_queries))
146 throw std::logic_error{"Attempt to retrieve result from empty pipeline."};
147 return retrieve(std::begin(m_queries));
148 }
149
150
retain(int retain_max)151 int pqxx::pipeline::retain(int retain_max)
152 {
153 if (retain_max < 0)
154 throw range_error{
155 "Attempt to make pipeline retain " + to_string(retain_max) + " queries"};
156
157 int const oldvalue{m_retain};
158 m_retain = retain_max;
159
160 if (m_num_waiting >= m_retain)
161 resume();
162
163 return oldvalue;
164 }
165
166
resume()167 void pqxx::pipeline::resume()
168 {
169 if (have_pending())
170 receive_if_available();
171 if (not have_pending() and m_num_waiting)
172 {
173 issue();
174 receive_if_available();
175 }
176 }
177
178
generate_id()179 pqxx::pipeline::query_id pqxx::pipeline::generate_id()
180 {
181 if (m_q_id == qid_limit())
182 throw std::overflow_error{"Too many queries went through pipeline."};
183 ++m_q_id;
184 return m_q_id;
185 }
186
187
issue()188 void pqxx::pipeline::issue()
189 {
190 // Retrieve that null result for the last query, if needed.
191 obtain_result();
192
193 // Don't issue anything if we've encountered an error.
194 if (m_error < qid_limit())
195 return;
196
197 // Start with oldest query (lowest id) not in previous issue range.
198 auto oldest{m_issuedrange.second};
199
200 // Construct cumulative query string for entire batch.
201 auto cum{separated_list(
202 theSeparator, oldest, std::end(m_queries),
203 [](QueryMap::const_iterator i) { return i->second.query; })};
204 auto const num_issued{
205 QueryMap::size_type(std::distance(oldest, std::end(m_queries)))};
206 bool const prepend_dummy{num_issued > 1};
207 if (prepend_dummy)
208 cum = theDummyQuery + cum;
209
210 pqxx::internal::gate::connection_pipeline{m_trans.conn()}.start_exec(
211 cum.c_str());
212
213 // Since we managed to send out these queries, update state to reflect this.
214 m_dummy_pending = prepend_dummy;
215 m_issuedrange.first = oldest;
216 m_issuedrange.second = std::end(m_queries);
217 m_num_waiting -= check_cast<int>(num_issued, "pipeline issue()");
218 }
219
220
internal_error(std::string const & err)221 void pqxx::pipeline::internal_error(std::string const &err)
222 {
223 set_error_at(0);
224 throw pqxx::internal_error{err};
225 }
226
227
obtain_result(bool expect_none)228 bool pqxx::pipeline::obtain_result(bool expect_none)
229 {
230 pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
231 auto const r{gate.get_result()};
232 if (r == nullptr)
233 {
234 if (have_pending() and not expect_none)
235 {
236 set_error_at(m_issuedrange.first->first);
237 m_issuedrange.second = m_issuedrange.first;
238 }
239 return false;
240 }
241
242 result const res{pqxx::internal::gate::result_creation::create(
243 r, std::begin(m_queries)->second.query, m_encoding)};
244
245 if (not have_pending())
246 {
247 set_error_at(std::begin(m_queries)->first);
248 throw std::logic_error{
249 "Got more results from pipeline than there were queries."};
250 }
251
252 // Must be the result for the oldest pending query.
253 if (not std::empty(m_issuedrange.first->second.res))
254 internal_error("Multiple results for one query.");
255
256 m_issuedrange.first->second.res = res;
257 ++m_issuedrange.first;
258
259 return true;
260 }
261
262
obtain_dummy()263 void pqxx::pipeline::obtain_dummy()
264 {
265 // Allocate once, re-use across invocations.
266 static auto const text{
267 std::make_shared<std::string>("[DUMMY PIPELINE QUERY]")};
268
269 pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
270 auto const r{gate.get_result()};
271 m_dummy_pending = false;
272
273 if (r == nullptr)
274 internal_error(
275 "Pipeline got no result from backend when it expected one.");
276
277 result R{pqxx::internal::gate::result_creation::create(r, text, m_encoding)};
278
279 bool OK{false};
280 try
281 {
282 pqxx::internal::gate::result_creation{R}.check_status();
283 OK = true;
284 }
285 catch (sql_error const &)
286 {}
287 if (OK)
288 {
289 if (std::size(R) > 1)
290 internal_error("Unexpected result for dummy query in pipeline.");
291
292 if (R.at(0).at(0).as<std::string>() != theDummyValue)
293 internal_error("Dummy query in pipeline returned unexpected value.");
294 return;
295 }
296
297 // XXX: Can we actually re-issue statements after a failure?
298 /* Execution of this batch failed.
299 *
300 * When we send multiple statements in one go, the backend treats them as a
301 * single transaction. So the entire batch was effectively rolled back.
302 *
303 * Since none of the queries in the batch were actually executed, we can
304 * afford to replay them one by one until we find the exact query that
305 * caused the error. This gives us not only a more specific error message
306 * to report, but also tells us which query to report it for.
307 */
308 // First, give the whole batch the same syntax error message, in case all
309 // else is going to fail.
310 for (auto i{m_issuedrange.first}; i != m_issuedrange.second; ++i)
311 i->second.res = R;
312
313 // Remember where the end of this batch was
314 auto const stop{m_issuedrange.second};
315
316 // Retrieve that null result for the last query, if needed
317 obtain_result(true);
318
319 // Reset internal state to forget botched batch attempt
320 m_num_waiting += check_cast<int>(
321 std::distance(m_issuedrange.first, stop), "pipeline obtain_dummy()");
322 m_issuedrange.second = m_issuedrange.first;
323
324 // Issue queries in failed batch one at a time.
325 unregister_me();
326 try
327 {
328 do
329 {
330 m_num_waiting--;
331 auto const query{*m_issuedrange.first->second.query};
332 auto &holder{m_issuedrange.first->second};
333 holder.res = m_trans.exec(query);
334 pqxx::internal::gate::result_creation{holder.res}.check_status();
335 ++m_issuedrange.first;
336 } while (m_issuedrange.first != stop);
337 }
338 catch (std::exception const &)
339 {
340 auto const thud{m_issuedrange.first->first};
341 ++m_issuedrange.first;
342 m_issuedrange.second = m_issuedrange.first;
343 auto q{m_issuedrange.first};
344 set_error_at((q == std::end(m_queries)) ? thud + 1 : q->first);
345 }
346 }
347
348
349 std::pair<pqxx::pipeline::query_id, pqxx::result>
retrieve(pipeline::QueryMap::iterator q)350 pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
351 {
352 if (q == std::end(m_queries))
353 throw std::logic_error{"Attempt to retrieve result for unknown query."};
354
355 if (q->first >= m_error)
356 throw std::runtime_error{
357 "Could not complete query in pipeline due to error in earlier query."};
358
359 // If query hasn't issued yet, do it now.
360 if (
361 m_issuedrange.second != std::end(m_queries) and
362 (q->first >= m_issuedrange.second->first))
363 {
364 if (have_pending())
365 receive(m_issuedrange.second);
366 if (m_error == qid_limit())
367 issue();
368 }
369
370 // If result not in yet, get it; else get at least whatever's convenient.
371 if (have_pending())
372 {
373 if (q->first >= m_issuedrange.first->first)
374 {
375 auto suc{q};
376 ++suc;
377 receive(suc);
378 }
379 else
380 {
381 receive_if_available();
382 }
383 }
384
385 if (q->first >= m_error)
386 throw std::runtime_error{
387 "Could not complete query in pipeline due to error in earlier query."};
388
389 // Don't leave the backend idle if there are queries waiting to be issued.
390 if (m_num_waiting and not have_pending() and (m_error == qid_limit()))
391 issue();
392
393 result const R{q->second.res};
394 auto const P{std::make_pair(q->first, R)};
395
396 m_queries.erase(q);
397
398 pqxx::internal::gate::result_creation{R}.check_status();
399 return P;
400 }
401
402
get_further_available_results()403 void pqxx::pipeline::get_further_available_results()
404 {
405 pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
406 while (not gate.is_busy() and obtain_result())
407 if (not gate.consume_input())
408 throw broken_connection{};
409 }
410
411
receive_if_available()412 void pqxx::pipeline::receive_if_available()
413 {
414 pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
415 if (not gate.consume_input())
416 throw broken_connection{};
417 if (gate.is_busy())
418 return;
419
420 if (m_dummy_pending)
421 obtain_dummy();
422 if (have_pending())
423 get_further_available_results();
424 }
425
426
receive(pipeline::QueryMap::const_iterator stop)427 void pqxx::pipeline::receive(pipeline::QueryMap::const_iterator stop)
428 {
429 if (m_dummy_pending)
430 obtain_dummy();
431
432 while (obtain_result() and
433 QueryMap::const_iterator{m_issuedrange.first} != stop)
434 ;
435
436 // Also haul in any remaining "targets of opportunity".
437 if (QueryMap::const_iterator{m_issuedrange.first} == stop)
438 get_further_available_results();
439 }
440