1 /*-------------------------------------------------------------------------
2 *
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres_fe.h"
17
18 #include <sys/time.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22
23 #include "catalog/pg_type_d.h"
24 #include "common/fe_memutils.h"
25 #include "libpq-fe.h"
26 #include "pg_getopt.h"
27 #include "portability/instr_time.h"
28
29
30 static void exit_nicely(PGconn *conn);
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32 int numsent);
33
34 const char *const progname = "libpq_pipeline";
35
36 /* Options and defaults */
37 char *tracefile = NULL; /* path to PQtrace() file */
38
39
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59
60 static void
exit_nicely(PGconn * conn)61 exit_nicely(PGconn *conn)
62 {
63 PQfinish(conn);
64 exit(1);
65 }
66
67 /*
68 * Print an error to stderr and terminate the program.
69 */
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 static void
pg_attribute_noreturn()72 pg_attribute_noreturn()
73 pg_fatal_impl(int line, const char *fmt,...)
74 {
75 va_list args;
76
77
78 fflush(stdout);
79
80 fprintf(stderr, "\n%s:%d: ", progname, line);
81 va_start(args, fmt);
82 vfprintf(stderr, fmt, args);
83 va_end(args);
84 Assert(fmt[strlen(fmt) - 1] != '\n');
85 fprintf(stderr, "\n");
86 exit(1);
87 }
88
89 static void
test_disallowed_in_pipeline(PGconn * conn)90 test_disallowed_in_pipeline(PGconn *conn)
91 {
92 PGresult *res = NULL;
93
94 fprintf(stderr, "test error cases... ");
95
96 if (PQisnonblocking(conn))
97 pg_fatal("Expected blocking connection mode");
98
99 if (PQenterPipelineMode(conn) != 1)
100 pg_fatal("Unable to enter pipeline mode");
101
102 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
103 pg_fatal("Pipeline mode not activated properly");
104
105 /* PQexec should fail in pipeline mode */
106 res = PQexec(conn, "SELECT 1");
107 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
108 pg_fatal("PQexec should fail in pipeline mode but succeeded");
109
110 /* Entering pipeline mode when already in pipeline mode is OK */
111 if (PQenterPipelineMode(conn) != 1)
112 pg_fatal("re-entering pipeline mode should be a no-op but failed");
113
114 if (PQisBusy(conn) != 0)
115 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
116
117 /* ok, back to normal command mode */
118 if (PQexitPipelineMode(conn) != 1)
119 pg_fatal("couldn't exit idle empty pipeline mode");
120
121 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
122 pg_fatal("Pipeline mode not terminated properly");
123
124 /* exiting pipeline mode when not in pipeline mode should be a no-op */
125 if (PQexitPipelineMode(conn) != 1)
126 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
127
128 /* can now PQexec again */
129 res = PQexec(conn, "SELECT 1");
130 if (PQresultStatus(res) != PGRES_TUPLES_OK)
131 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
132 PQerrorMessage(conn));
133
134 fprintf(stderr, "ok\n");
135 }
136
137 static void
test_multi_pipelines(PGconn * conn)138 test_multi_pipelines(PGconn *conn)
139 {
140 PGresult *res = NULL;
141 const char *dummy_params[1] = {"1"};
142 Oid dummy_param_oids[1] = {INT4OID};
143
144 fprintf(stderr, "multi pipeline... ");
145
146 /*
147 * Queue up a couple of small pipelines and process each without returning
148 * to command mode first.
149 */
150 if (PQenterPipelineMode(conn) != 1)
151 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
152
153 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
154 dummy_params, NULL, NULL, 0) != 1)
155 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
156
157 if (PQpipelineSync(conn) != 1)
158 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
159
160 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
161 dummy_params, NULL, NULL, 0) != 1)
162 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
163
164 if (PQpipelineSync(conn) != 1)
165 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
166
167 /* OK, start processing the results */
168 res = PQgetResult(conn);
169 if (res == NULL)
170 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
171 PQerrorMessage(conn));
172
173 if (PQresultStatus(res) != PGRES_TUPLES_OK)
174 pg_fatal("Unexpected result code %s from first pipeline item",
175 PQresStatus(PQresultStatus(res)));
176 PQclear(res);
177 res = NULL;
178
179 if (PQgetResult(conn) != NULL)
180 pg_fatal("PQgetResult returned something extra after first result");
181
182 if (PQexitPipelineMode(conn) != 0)
183 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
184
185 res = PQgetResult(conn);
186 if (res == NULL)
187 pg_fatal("PQgetResult returned null when sync result expected: %s",
188 PQerrorMessage(conn));
189
190 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
191 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
192 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
193 PQclear(res);
194
195 /* second pipeline */
196
197 res = PQgetResult(conn);
198 if (res == NULL)
199 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
200 PQerrorMessage(conn));
201
202 if (PQresultStatus(res) != PGRES_TUPLES_OK)
203 pg_fatal("Unexpected result code %s from second pipeline item",
204 PQresStatus(PQresultStatus(res)));
205
206 res = PQgetResult(conn);
207 if (res != NULL)
208 pg_fatal("Expected null result, got %s",
209 PQresStatus(PQresultStatus(res)));
210
211 res = PQgetResult(conn);
212 if (res == NULL)
213 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
214 PQerrorMessage(conn));
215
216 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
217 pg_fatal("Unexpected result code %s from second pipeline sync",
218 PQresStatus(PQresultStatus(res)));
219
220 /* We're still in pipeline mode ... */
221 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
222 pg_fatal("Fell out of pipeline mode somehow");
223
224 /* until we end it, which we can safely do now */
225 if (PQexitPipelineMode(conn) != 1)
226 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
227 PQerrorMessage(conn));
228
229 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
230 pg_fatal("exiting pipeline mode didn't seem to work");
231
232 fprintf(stderr, "ok\n");
233 }
234
235 /*
236 * Test behavior when a pipeline dispatches a number of commands that are
237 * not flushed by a sync point.
238 */
239 static void
test_nosync(PGconn * conn)240 test_nosync(PGconn *conn)
241 {
242 int numqueries = 10;
243 int results = 0;
244 int sock = PQsocket(conn);
245
246 fprintf(stderr, "nosync... ");
247
248 if (sock < 0)
249 pg_fatal("invalid socket");
250
251 if (PQenterPipelineMode(conn) != 1)
252 pg_fatal("could not enter pipeline mode");
253 for (int i = 0; i < numqueries; i++)
254 {
255 fd_set input_mask;
256 struct timeval tv;
257
258 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
259 0, NULL, NULL, NULL, NULL, 0) != 1)
260 pg_fatal("error sending select: %s", PQerrorMessage(conn));
261 PQflush(conn);
262
263 /*
264 * If the server has written anything to us, read (some of) it now.
265 */
266 FD_ZERO(&input_mask);
267 FD_SET(sock, &input_mask);
268 tv.tv_sec = 0;
269 tv.tv_usec = 0;
270 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
271 {
272 fprintf(stderr, "select() failed: %s\n", strerror(errno));
273 exit_nicely(conn);
274 }
275 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
276 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
277 }
278
279 /* tell server to flush its output buffer */
280 if (PQsendFlushRequest(conn) != 1)
281 pg_fatal("failed to send flush request");
282 PQflush(conn);
283
284 /* Now read all results */
285 for (;;)
286 {
287 PGresult *res;
288
289 res = PQgetResult(conn);
290
291 /* NULL results are only expected after TUPLES_OK */
292 if (res == NULL)
293 pg_fatal("got unexpected NULL result after %d results", results);
294
295 /* We expect exactly one TUPLES_OK result for each query we sent */
296 if (PQresultStatus(res) == PGRES_TUPLES_OK)
297 {
298 PGresult *res2;
299
300 /* and one NULL result should follow each */
301 res2 = PQgetResult(conn);
302 if (res2 != NULL)
303 pg_fatal("expected NULL, got %s",
304 PQresStatus(PQresultStatus(res2)));
305 PQclear(res);
306 results++;
307
308 /* if we're done, we're done */
309 if (results == numqueries)
310 break;
311
312 continue;
313 }
314
315 /* anything else is unexpected */
316 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
317 }
318
319 fprintf(stderr, "ok\n");
320 }
321
322 /*
323 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
324 * still have to get results for each pipeline item, but the item will just be
325 * a PGRES_PIPELINE_ABORTED code.
326 *
327 * This intentionally doesn't use a transaction to wrap the pipeline. You should
328 * usually use an xact, but in this case we want to observe the effects of each
329 * statement.
330 */
331 static void
test_pipeline_abort(PGconn * conn)332 test_pipeline_abort(PGconn *conn)
333 {
334 PGresult *res = NULL;
335 const char *dummy_params[1] = {"1"};
336 Oid dummy_param_oids[1] = {INT4OID};
337 int i;
338 int gotrows;
339 bool goterror;
340
341 fprintf(stderr, "aborted pipeline... ");
342
343 res = PQexec(conn, drop_table_sql);
344 if (PQresultStatus(res) != PGRES_COMMAND_OK)
345 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
346
347 res = PQexec(conn, create_table_sql);
348 if (PQresultStatus(res) != PGRES_COMMAND_OK)
349 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
350
351 /*
352 * Queue up a couple of small pipelines and process each without returning
353 * to command mode first. Make sure the second operation in the first
354 * pipeline ERRORs.
355 */
356 if (PQenterPipelineMode(conn) != 1)
357 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
358
359 dummy_params[0] = "1";
360 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
361 dummy_params, NULL, NULL, 0) != 1)
362 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
363
364 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
365 1, dummy_param_oids, dummy_params,
366 NULL, NULL, 0) != 1)
367 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
368
369 dummy_params[0] = "2";
370 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
371 dummy_params, NULL, NULL, 0) != 1)
372 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
373
374 if (PQpipelineSync(conn) != 1)
375 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
376
377 dummy_params[0] = "3";
378 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
379 dummy_params, NULL, NULL, 0) != 1)
380 pg_fatal("dispatching second-pipeline insert failed: %s",
381 PQerrorMessage(conn));
382
383 if (PQpipelineSync(conn) != 1)
384 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
385
386 /*
387 * OK, start processing the pipeline results.
388 *
389 * We should get a command-ok for the first query, then a fatal error and
390 * a pipeline aborted message for the second insert, a pipeline-end, then
391 * a command-ok and a pipeline-ok for the second pipeline operation.
392 */
393 res = PQgetResult(conn);
394 if (res == NULL)
395 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
396 if (PQresultStatus(res) != PGRES_COMMAND_OK)
397 pg_fatal("Unexpected result status %s: %s",
398 PQresStatus(PQresultStatus(res)),
399 PQresultErrorMessage(res));
400 PQclear(res);
401
402 /* NULL result to signal end-of-results for this command */
403 if ((res = PQgetResult(conn)) != NULL)
404 pg_fatal("Expected null result, got %s",
405 PQresStatus(PQresultStatus(res)));
406
407 /* Second query caused error, so we expect an error next */
408 res = PQgetResult(conn);
409 if (res == NULL)
410 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
411 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
412 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
413 PQresStatus(PQresultStatus(res)));
414 PQclear(res);
415
416 /* NULL result to signal end-of-results for this command */
417 if ((res = PQgetResult(conn)) != NULL)
418 pg_fatal("Expected null result, got %s",
419 PQresStatus(PQresultStatus(res)));
420
421 /*
422 * pipeline should now be aborted.
423 *
424 * Note that we could still queue more queries at this point if we wanted;
425 * they'd get added to a new third pipeline since we've already sent a
426 * second. The aborted flag relates only to the pipeline being received.
427 */
428 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
429 pg_fatal("pipeline should be flagged as aborted but isn't");
430
431 /* third query in pipeline, the second insert */
432 res = PQgetResult(conn);
433 if (res == NULL)
434 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
435 if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
436 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
437 PQresStatus(PQresultStatus(res)));
438 PQclear(res);
439
440 /* NULL result to signal end-of-results for this command */
441 if ((res = PQgetResult(conn)) != NULL)
442 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
443
444 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
445 pg_fatal("pipeline should be flagged as aborted but isn't");
446
447 /* Ensure we're still in pipeline */
448 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
449 pg_fatal("Fell out of pipeline mode somehow");
450
451 /*
452 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
453 *
454 * (This is so clients know to start processing results normally again and
455 * can tell the difference between skipped commands and the sync.)
456 */
457 res = PQgetResult(conn);
458 if (res == NULL)
459 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
460 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
461 pg_fatal("Unexpected result code from first pipeline sync\n"
462 "Expected PGRES_PIPELINE_SYNC, got %s",
463 PQresStatus(PQresultStatus(res)));
464 PQclear(res);
465
466 if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
467 pg_fatal("sync should've cleared the aborted flag but didn't");
468
469 /* We're still in pipeline mode... */
470 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
471 pg_fatal("Fell out of pipeline mode somehow");
472
473 /* the insert from the second pipeline */
474 res = PQgetResult(conn);
475 if (res == NULL)
476 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
477 if (PQresultStatus(res) != PGRES_COMMAND_OK)
478 pg_fatal("Unexpected result code %s from first item in second pipeline",
479 PQresStatus(PQresultStatus(res)));
480 PQclear(res);
481
482 /* Read the NULL result at the end of the command */
483 if ((res = PQgetResult(conn)) != NULL)
484 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
485
486 /* the second pipeline sync */
487 if ((res = PQgetResult(conn)) == NULL)
488 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
489 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
490 pg_fatal("Unexpected result code %s from second pipeline sync",
491 PQresStatus(PQresultStatus(res)));
492 PQclear(res);
493
494 if ((res = PQgetResult(conn)) != NULL)
495 pg_fatal("Expected null result, got %s: %s",
496 PQresStatus(PQresultStatus(res)),
497 PQerrorMessage(conn));
498
499 /* Try to send two queries in one command */
500 if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
501 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
502 if (PQpipelineSync(conn) != 1)
503 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
504 goterror = false;
505 while ((res = PQgetResult(conn)) != NULL)
506 {
507 switch (PQresultStatus(res))
508 {
509 case PGRES_FATAL_ERROR:
510 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
511 pg_fatal("expected error about multiple commands, got %s",
512 PQerrorMessage(conn));
513 printf("got expected %s", PQerrorMessage(conn));
514 goterror = true;
515 break;
516 default:
517 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
518 break;
519 }
520 }
521 if (!goterror)
522 pg_fatal("did not get cannot-insert-multiple-commands error");
523 res = PQgetResult(conn);
524 if (res == NULL)
525 pg_fatal("got NULL result");
526 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
527 pg_fatal("Unexpected result code %s from pipeline sync",
528 PQresStatus(PQresultStatus(res)));
529 fprintf(stderr, "ok\n");
530
531 /* Test single-row mode with an error partways */
532 if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
533 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
534 if (PQpipelineSync(conn) != 1)
535 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
536 PQsetSingleRowMode(conn);
537 goterror = false;
538 gotrows = 0;
539 while ((res = PQgetResult(conn)) != NULL)
540 {
541 switch (PQresultStatus(res))
542 {
543 case PGRES_SINGLE_TUPLE:
544 printf("got row: %s\n", PQgetvalue(res, 0, 0));
545 gotrows++;
546 break;
547 case PGRES_FATAL_ERROR:
548 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
549 pg_fatal("expected division-by-zero, got: %s (%s)",
550 PQerrorMessage(conn),
551 PQresultErrorField(res, PG_DIAG_SQLSTATE));
552 printf("got expected division-by-zero\n");
553 goterror = true;
554 break;
555 default:
556 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
557 }
558 PQclear(res);
559 }
560 if (!goterror)
561 pg_fatal("did not get division-by-zero error");
562 if (gotrows != 3)
563 pg_fatal("did not get three rows");
564 /* the third pipeline sync */
565 if ((res = PQgetResult(conn)) == NULL)
566 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
567 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
568 pg_fatal("Unexpected result code %s from third pipeline sync",
569 PQresStatus(PQresultStatus(res)));
570 PQclear(res);
571
572 /* We're still in pipeline mode... */
573 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
574 pg_fatal("Fell out of pipeline mode somehow");
575
576 /* until we end it, which we can safely do now */
577 if (PQexitPipelineMode(conn) != 1)
578 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
579 PQerrorMessage(conn));
580
581 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
582 pg_fatal("exiting pipeline mode didn't seem to work");
583
584 fprintf(stderr, "ok\n");
585
586 /*-
587 * Since we fired the pipelines off without a surrounding xact, the results
588 * should be:
589 *
590 * - Implicit xact started by server around 1st pipeline
591 * - First insert applied
592 * - Second statement aborted xact
593 * - Third insert skipped
594 * - Sync rolled back first implicit xact
595 * - Implicit xact created by server around 2nd pipeline
596 * - insert applied from 2nd pipeline
597 * - Sync commits 2nd xact
598 *
599 * So we should only have the value 3 that we inserted.
600 */
601 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
602
603 if (PQresultStatus(res) != PGRES_TUPLES_OK)
604 pg_fatal("Expected tuples, got %s: %s",
605 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
606 if (PQntuples(res) != 1)
607 pg_fatal("expected 1 result, got %d", PQntuples(res));
608 for (i = 0; i < PQntuples(res); i++)
609 {
610 const char *val = PQgetvalue(res, i, 0);
611
612 if (strcmp(val, "3") != 0)
613 pg_fatal("expected only insert with value 3, got %s", val);
614 }
615
616 PQclear(res);
617 }
618
619 /* State machine enum for test_pipelined_insert */
620 enum PipelineInsertStep
621 {
622 BI_BEGIN_TX,
623 BI_DROP_TABLE,
624 BI_CREATE_TABLE,
625 BI_PREPARE,
626 BI_INSERT_ROWS,
627 BI_COMMIT_TX,
628 BI_SYNC,
629 BI_DONE
630 };
631
632 static void
test_pipelined_insert(PGconn * conn,int n_rows)633 test_pipelined_insert(PGconn *conn, int n_rows)
634 {
635 Oid insert_param_oids[2] = {INT4OID, INT8OID};
636 const char *insert_params[2];
637 char insert_param_0[MAXINTLEN];
638 char insert_param_1[MAXINT8LEN];
639 enum PipelineInsertStep send_step = BI_BEGIN_TX,
640 recv_step = BI_BEGIN_TX;
641 int rows_to_send,
642 rows_to_receive;
643
644 insert_params[0] = insert_param_0;
645 insert_params[1] = insert_param_1;
646
647 rows_to_send = rows_to_receive = n_rows;
648
649 /*
650 * Do a pipelined insert into a table created at the start of the pipeline
651 */
652 if (PQenterPipelineMode(conn) != 1)
653 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
654
655 while (send_step != BI_PREPARE)
656 {
657 const char *sql;
658
659 switch (send_step)
660 {
661 case BI_BEGIN_TX:
662 sql = "BEGIN TRANSACTION";
663 send_step = BI_DROP_TABLE;
664 break;
665
666 case BI_DROP_TABLE:
667 sql = drop_table_sql;
668 send_step = BI_CREATE_TABLE;
669 break;
670
671 case BI_CREATE_TABLE:
672 sql = create_table_sql;
673 send_step = BI_PREPARE;
674 break;
675
676 default:
677 pg_fatal("invalid state");
678 sql = NULL; /* keep compiler quiet */
679 }
680
681 pg_debug("sending: %s\n", sql);
682 if (PQsendQueryParams(conn, sql,
683 0, NULL, NULL, NULL, NULL, 0) != 1)
684 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
685 }
686
687 Assert(send_step == BI_PREPARE);
688 pg_debug("sending: %s\n", insert_sql2);
689 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
690 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
691 send_step = BI_INSERT_ROWS;
692
693 /*
694 * Now we start inserting. We'll be sending enough data that we could fill
695 * our output buffer, so to avoid deadlocking we need to enter nonblocking
696 * mode and consume input while we send more output. As results of each
697 * query are processed we should pop them to allow processing of the next
698 * query. There's no need to finish the pipeline before processing
699 * results.
700 */
701 if (PQsetnonblocking(conn, 1) != 0)
702 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
703
704 while (recv_step != BI_DONE)
705 {
706 int sock;
707 fd_set input_mask;
708 fd_set output_mask;
709
710 sock = PQsocket(conn);
711
712 if (sock < 0)
713 break; /* shouldn't happen */
714
715 FD_ZERO(&input_mask);
716 FD_SET(sock, &input_mask);
717 FD_ZERO(&output_mask);
718 FD_SET(sock, &output_mask);
719
720 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
721 {
722 fprintf(stderr, "select() failed: %s\n", strerror(errno));
723 exit_nicely(conn);
724 }
725
726 /*
727 * Process any results, so we keep the server's output buffer free
728 * flowing and it can continue to process input
729 */
730 if (FD_ISSET(sock, &input_mask))
731 {
732 PQconsumeInput(conn);
733
734 /* Read until we'd block if we tried to read */
735 while (!PQisBusy(conn) && recv_step < BI_DONE)
736 {
737 PGresult *res;
738 const char *cmdtag = "";
739 const char *description = "";
740 int status;
741
742 /*
743 * Read next result. If no more results from this query,
744 * advance to the next query
745 */
746 res = PQgetResult(conn);
747 if (res == NULL)
748 continue;
749
750 status = PGRES_COMMAND_OK;
751 switch (recv_step)
752 {
753 case BI_BEGIN_TX:
754 cmdtag = "BEGIN";
755 recv_step++;
756 break;
757 case BI_DROP_TABLE:
758 cmdtag = "DROP TABLE";
759 recv_step++;
760 break;
761 case BI_CREATE_TABLE:
762 cmdtag = "CREATE TABLE";
763 recv_step++;
764 break;
765 case BI_PREPARE:
766 cmdtag = "";
767 description = "PREPARE";
768 recv_step++;
769 break;
770 case BI_INSERT_ROWS:
771 cmdtag = "INSERT";
772 rows_to_receive--;
773 if (rows_to_receive == 0)
774 recv_step++;
775 break;
776 case BI_COMMIT_TX:
777 cmdtag = "COMMIT";
778 recv_step++;
779 break;
780 case BI_SYNC:
781 cmdtag = "";
782 description = "SYNC";
783 status = PGRES_PIPELINE_SYNC;
784 recv_step++;
785 break;
786 case BI_DONE:
787 /* unreachable */
788 pg_fatal("unreachable state");
789 }
790
791 if (PQresultStatus(res) != status)
792 pg_fatal("%s reported status %s, expected %s\n"
793 "Error message: \"%s\"",
794 description, PQresStatus(PQresultStatus(res)),
795 PQresStatus(status), PQerrorMessage(conn));
796
797 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
798 pg_fatal("%s expected command tag '%s', got '%s'",
799 description, cmdtag, PQcmdStatus(res));
800
801 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
802
803 PQclear(res);
804 }
805 }
806
807 /* Write more rows and/or the end pipeline message, if needed */
808 if (FD_ISSET(sock, &output_mask))
809 {
810 PQflush(conn);
811
812 if (send_step == BI_INSERT_ROWS)
813 {
814 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
815 /* use up some buffer space with a wide value */
816 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
817
818 if (PQsendQueryPrepared(conn, "my_insert",
819 2, insert_params, NULL, NULL, 0) == 1)
820 {
821 pg_debug("sent row %d\n", rows_to_send);
822
823 rows_to_send--;
824 if (rows_to_send == 0)
825 send_step++;
826 }
827 else
828 {
829 /*
830 * in nonblocking mode, so it's OK for an insert to fail
831 * to send
832 */
833 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
834 rows_to_send, PQerrorMessage(conn));
835 }
836 }
837 else if (send_step == BI_COMMIT_TX)
838 {
839 if (PQsendQueryParams(conn, "COMMIT",
840 0, NULL, NULL, NULL, NULL, 0) == 1)
841 {
842 pg_debug("sent COMMIT\n");
843 send_step++;
844 }
845 else
846 {
847 fprintf(stderr, "WARNING: failed to send commit: %s\n",
848 PQerrorMessage(conn));
849 }
850 }
851 else if (send_step == BI_SYNC)
852 {
853 if (PQpipelineSync(conn) == 1)
854 {
855 fprintf(stdout, "pipeline sync sent\n");
856 send_step++;
857 }
858 else
859 {
860 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
861 PQerrorMessage(conn));
862 }
863 }
864 }
865 }
866
867 /* We've got the sync message and the pipeline should be done */
868 if (PQexitPipelineMode(conn) != 1)
869 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
870 PQerrorMessage(conn));
871
872 if (PQsetnonblocking(conn, 0) != 0)
873 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
874
875 fprintf(stderr, "ok\n");
876 }
877
878 static void
test_prepared(PGconn * conn)879 test_prepared(PGconn *conn)
880 {
881 PGresult *res = NULL;
882 Oid param_oids[1] = {INT4OID};
883 Oid expected_oids[4];
884 Oid typ;
885
886 fprintf(stderr, "prepared... ");
887
888 if (PQenterPipelineMode(conn) != 1)
889 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
890 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
891 "interval '1 sec'",
892 1, param_oids) != 1)
893 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
894 expected_oids[0] = INT4OID;
895 expected_oids[1] = TEXTOID;
896 expected_oids[2] = NUMERICOID;
897 expected_oids[3] = INTERVALOID;
898 if (PQsendDescribePrepared(conn, "select_one") != 1)
899 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
900 if (PQpipelineSync(conn) != 1)
901 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
902
903 res = PQgetResult(conn);
904 if (res == NULL)
905 pg_fatal("PQgetResult returned null");
906 if (PQresultStatus(res) != PGRES_COMMAND_OK)
907 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
908 PQclear(res);
909 res = PQgetResult(conn);
910 if (res != NULL)
911 pg_fatal("expected NULL result");
912
913 res = PQgetResult(conn);
914 if (res == NULL)
915 pg_fatal("PQgetResult returned NULL");
916 if (PQresultStatus(res) != PGRES_COMMAND_OK)
917 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
918 if (PQnfields(res) != lengthof(expected_oids))
919 pg_fatal("expected %d columns, got %d",
920 lengthof(expected_oids), PQnfields(res));
921 for (int i = 0; i < PQnfields(res); i++)
922 {
923 typ = PQftype(res, i);
924 if (typ != expected_oids[i])
925 pg_fatal("field %d: expected type %u, got %u",
926 i, expected_oids[i], typ);
927 }
928 PQclear(res);
929 res = PQgetResult(conn);
930 if (res != NULL)
931 pg_fatal("expected NULL result");
932
933 res = PQgetResult(conn);
934 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
935 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
936
937 if (PQexitPipelineMode(conn) != 1)
938 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
939
940 PQexec(conn, "BEGIN");
941 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
942 PQenterPipelineMode(conn);
943 if (PQsendDescribePortal(conn, "cursor_one") != 1)
944 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
945 if (PQpipelineSync(conn) != 1)
946 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
947 res = PQgetResult(conn);
948 if (res == NULL)
949 pg_fatal("expected NULL result");
950 if (PQresultStatus(res) != PGRES_COMMAND_OK)
951 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
952
953 typ = PQftype(res, 0);
954 if (typ != INT4OID)
955 pg_fatal("portal: expected type %u, got %u",
956 INT4OID, typ);
957 PQclear(res);
958 res = PQgetResult(conn);
959 if (res != NULL)
960 pg_fatal("expected NULL result");
961 res = PQgetResult(conn);
962 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
963 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
964
965 if (PQexitPipelineMode(conn) != 1)
966 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
967
968 fprintf(stderr, "ok\n");
969 }
970
971 static void
test_simple_pipeline(PGconn * conn)972 test_simple_pipeline(PGconn *conn)
973 {
974 PGresult *res = NULL;
975 const char *dummy_params[1] = {"1"};
976 Oid dummy_param_oids[1] = {INT4OID};
977
978 fprintf(stderr, "simple pipeline... ");
979
980 /*
981 * Enter pipeline mode and dispatch a set of operations, which we'll then
982 * process the results of as they come in.
983 *
984 * For a simple case we should be able to do this without interim
985 * processing of results since our output buffer will give us enough slush
986 * to work with and we won't block on sending. So blocking mode is fine.
987 */
988 if (PQisnonblocking(conn))
989 pg_fatal("Expected blocking connection mode");
990
991 if (PQenterPipelineMode(conn) != 1)
992 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
993
994 if (PQsendQueryParams(conn, "SELECT $1",
995 1, dummy_param_oids, dummy_params,
996 NULL, NULL, 0) != 1)
997 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
998
999 if (PQexitPipelineMode(conn) != 0)
1000 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1001
1002 if (PQpipelineSync(conn) != 1)
1003 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1004
1005 res = PQgetResult(conn);
1006 if (res == NULL)
1007 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1008 PQerrorMessage(conn));
1009
1010 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1011 pg_fatal("Unexpected result code %s from first pipeline item",
1012 PQresStatus(PQresultStatus(res)));
1013
1014 PQclear(res);
1015 res = NULL;
1016
1017 if (PQgetResult(conn) != NULL)
1018 pg_fatal("PQgetResult returned something extra after first query result.");
1019
1020 /*
1021 * Even though we've processed the result there's still a sync to come and
1022 * we can't exit pipeline mode yet
1023 */
1024 if (PQexitPipelineMode(conn) != 0)
1025 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1026
1027 res = PQgetResult(conn);
1028 if (res == NULL)
1029 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1030 PQerrorMessage(conn));
1031
1032 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1033 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1034 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1035
1036 PQclear(res);
1037 res = NULL;
1038
1039 if (PQgetResult(conn) != NULL)
1040 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1041 PQresStatus(PQresultStatus(res)));
1042
1043 /* We're still in pipeline mode... */
1044 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1045 pg_fatal("Fell out of pipeline mode somehow");
1046
1047 /* ... until we end it, which we can safely do now */
1048 if (PQexitPipelineMode(conn) != 1)
1049 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1050 PQerrorMessage(conn));
1051
1052 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1053 pg_fatal("Exiting pipeline mode didn't seem to work");
1054
1055 fprintf(stderr, "ok\n");
1056 }
1057
1058 static void
test_singlerowmode(PGconn * conn)1059 test_singlerowmode(PGconn *conn)
1060 {
1061 PGresult *res;
1062 int i;
1063 bool pipeline_ended = false;
1064
1065 /* 1 pipeline, 3 queries in it */
1066 if (PQenterPipelineMode(conn) != 1)
1067 pg_fatal("failed to enter pipeline mode: %s",
1068 PQerrorMessage(conn));
1069
1070 for (i = 0; i < 3; i++)
1071 {
1072 char *param[1];
1073
1074 param[0] = psprintf("%d", 44 + i);
1075
1076 if (PQsendQueryParams(conn,
1077 "SELECT generate_series(42, $1)",
1078 1,
1079 NULL,
1080 (const char **) param,
1081 NULL,
1082 NULL,
1083 0) != 1)
1084 pg_fatal("failed to send query: %s",
1085 PQerrorMessage(conn));
1086 pfree(param[0]);
1087 }
1088 if (PQpipelineSync(conn) != 1)
1089 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1090
1091 for (i = 0; !pipeline_ended; i++)
1092 {
1093 bool first = true;
1094 bool saw_ending_tuplesok;
1095 bool isSingleTuple = false;
1096
1097 /* Set single row mode for only first 2 SELECT queries */
1098 if (i < 2)
1099 {
1100 if (PQsetSingleRowMode(conn) != 1)
1101 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1102 }
1103
1104 /* Consume rows for this query */
1105 saw_ending_tuplesok = false;
1106 while ((res = PQgetResult(conn)) != NULL)
1107 {
1108 ExecStatusType est = PQresultStatus(res);
1109
1110 if (est == PGRES_PIPELINE_SYNC)
1111 {
1112 fprintf(stderr, "end of pipeline reached\n");
1113 pipeline_ended = true;
1114 PQclear(res);
1115 if (i != 3)
1116 pg_fatal("Expected three results, got %d", i);
1117 break;
1118 }
1119
1120 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1121 if (first)
1122 {
1123 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1124 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1125 i, PQresStatus(est));
1126 if (i >= 2 && est != PGRES_TUPLES_OK)
1127 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1128 i, PQresStatus(est));
1129 first = false;
1130 }
1131
1132 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1133 switch (est)
1134 {
1135 case PGRES_TUPLES_OK:
1136 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1137 saw_ending_tuplesok = true;
1138 if (isSingleTuple)
1139 {
1140 if (PQntuples(res) == 0)
1141 fprintf(stderr, "all tuples received in query %d\n", i);
1142 else
1143 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1144 }
1145 break;
1146
1147 case PGRES_SINGLE_TUPLE:
1148 isSingleTuple = true;
1149 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1150 break;
1151
1152 default:
1153 pg_fatal("unexpected");
1154 }
1155 PQclear(res);
1156 }
1157 if (!pipeline_ended && !saw_ending_tuplesok)
1158 pg_fatal("didn't get expected terminating TUPLES_OK");
1159 }
1160
1161 if (PQexitPipelineMode(conn) != 1)
1162 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1163 }
1164
1165 /*
1166 * Simple test to verify that a pipeline is discarded as a whole when there's
1167 * an error, ignoring transaction commands.
1168 */
1169 static void
test_transaction(PGconn * conn)1170 test_transaction(PGconn *conn)
1171 {
1172 PGresult *res;
1173 bool expect_null;
1174 int num_syncs = 0;
1175
1176 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1177 "CREATE TABLE pq_pipeline_tst (id int)");
1178 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1179 pg_fatal("failed to create test table: %s",
1180 PQerrorMessage(conn));
1181 PQclear(res);
1182
1183 if (PQenterPipelineMode(conn) != 1)
1184 pg_fatal("failed to enter pipeline mode: %s",
1185 PQerrorMessage(conn));
1186 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1187 pg_fatal("could not send prepare on pipeline: %s",
1188 PQerrorMessage(conn));
1189
1190 if (PQsendQueryParams(conn,
1191 "BEGIN",
1192 0, NULL, NULL, NULL, NULL, 0) != 1)
1193 pg_fatal("failed to send query: %s",
1194 PQerrorMessage(conn));
1195 if (PQsendQueryParams(conn,
1196 "SELECT 0/0",
1197 0, NULL, NULL, NULL, NULL, 0) != 1)
1198 pg_fatal("failed to send query: %s",
1199 PQerrorMessage(conn));
1200
1201 /*
1202 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1203 * get out of the pipeline-aborted state first.
1204 */
1205 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1206 pg_fatal("failed to execute prepared: %s",
1207 PQerrorMessage(conn));
1208
1209 /* This insert fails because we're in pipeline-aborted state */
1210 if (PQsendQueryParams(conn,
1211 "INSERT INTO pq_pipeline_tst VALUES (1)",
1212 0, NULL, NULL, NULL, NULL, 0) != 1)
1213 pg_fatal("failed to send query: %s",
1214 PQerrorMessage(conn));
1215 if (PQpipelineSync(conn) != 1)
1216 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1217 num_syncs++;
1218
1219 /*
1220 * This insert fails even though the pipeline got a SYNC, because we're in
1221 * an aborted transaction
1222 */
1223 if (PQsendQueryParams(conn,
1224 "INSERT INTO pq_pipeline_tst VALUES (2)",
1225 0, NULL, NULL, NULL, NULL, 0) != 1)
1226 pg_fatal("failed to send query: %s",
1227 PQerrorMessage(conn));
1228 if (PQpipelineSync(conn) != 1)
1229 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1230 num_syncs++;
1231
1232 /*
1233 * Send ROLLBACK using prepared stmt. This one works because we just did
1234 * PQpipelineSync above.
1235 */
1236 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1237 pg_fatal("failed to execute prepared: %s",
1238 PQerrorMessage(conn));
1239
1240 /*
1241 * Now that we're out of a transaction and in pipeline-good mode, this
1242 * insert works
1243 */
1244 if (PQsendQueryParams(conn,
1245 "INSERT INTO pq_pipeline_tst VALUES (3)",
1246 0, NULL, NULL, NULL, NULL, 0) != 1)
1247 pg_fatal("failed to send query: %s",
1248 PQerrorMessage(conn));
1249 /* Send two syncs now -- match up to SYNC messages below */
1250 if (PQpipelineSync(conn) != 1)
1251 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1252 num_syncs++;
1253 if (PQpipelineSync(conn) != 1)
1254 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1255 num_syncs++;
1256
1257 expect_null = false;
1258 for (int i = 0;; i++)
1259 {
1260 ExecStatusType restype;
1261
1262 res = PQgetResult(conn);
1263 if (res == NULL)
1264 {
1265 printf("%d: got NULL result\n", i);
1266 if (!expect_null)
1267 pg_fatal("did not expect NULL here");
1268 expect_null = false;
1269 continue;
1270 }
1271 restype = PQresultStatus(res);
1272 printf("%d: got status %s", i, PQresStatus(restype));
1273 if (expect_null)
1274 pg_fatal("expected NULL");
1275 if (restype == PGRES_FATAL_ERROR)
1276 printf("; error: %s", PQerrorMessage(conn));
1277 else if (restype == PGRES_PIPELINE_ABORTED)
1278 {
1279 printf(": command didn't run because pipeline aborted\n");
1280 }
1281 else
1282 printf("\n");
1283 PQclear(res);
1284
1285 if (restype == PGRES_PIPELINE_SYNC)
1286 num_syncs--;
1287 else
1288 expect_null = true;
1289 if (num_syncs <= 0)
1290 break;
1291 }
1292 if (PQgetResult(conn) != NULL)
1293 pg_fatal("returned something extra after all the syncs: %s",
1294 PQresStatus(PQresultStatus(res)));
1295
1296 if (PQexitPipelineMode(conn) != 1)
1297 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1298
1299 /* We expect to find one tuple containing the value "3" */
1300 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1301 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1302 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1303 if (PQntuples(res) != 1)
1304 pg_fatal("did not get 1 tuple");
1305 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1306 pg_fatal("did not get expected tuple");
1307 PQclear(res);
1308
1309 fprintf(stderr, "ok\n");
1310 }
1311
1312 /*
1313 * In this test mode we send a stream of queries, with one in the middle
1314 * causing an error. Verify that we can still send some more after the
1315 * error and have libpq work properly.
1316 */
1317 static void
test_uniqviol(PGconn * conn)1318 test_uniqviol(PGconn *conn)
1319 {
1320 int sock = PQsocket(conn);
1321 PGresult *res;
1322 Oid paramTypes[2] = {INT8OID, INT8OID};
1323 const char *paramValues[2];
1324 char paramValue0[MAXINT8LEN];
1325 char paramValue1[MAXINT8LEN];
1326 int ctr = 0;
1327 int numsent = 0;
1328 int results = 0;
1329 bool read_done = false;
1330 bool write_done = false;
1331 bool error_sent = false;
1332 bool got_error = false;
1333 int switched = 0;
1334 int socketful = 0;
1335 fd_set in_fds;
1336 fd_set out_fds;
1337
1338 fprintf(stderr, "uniqviol ...");
1339
1340 PQsetnonblocking(conn, 1);
1341
1342 paramValues[0] = paramValue0;
1343 paramValues[1] = paramValue1;
1344 sprintf(paramValue1, "42");
1345
1346 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1347 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1348 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1349 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1350
1351 res = PQexec(conn, "begin");
1352 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1353 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1354
1355 res = PQprepare(conn, "insertion",
1356 "insert into ppln_uniqviol values ($1, $2) returning id",
1357 2, paramTypes);
1358 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1359 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1360
1361 if (PQenterPipelineMode(conn) != 1)
1362 pg_fatal("failed to enter pipeline mode");
1363
1364 while (!read_done)
1365 {
1366 /*
1367 * Avoid deadlocks by reading everything the server has sent before
1368 * sending anything. (Special precaution is needed here to process
1369 * PQisBusy before testing the socket for read-readiness, because the
1370 * socket does not turn read-ready after "sending" queries in aborted
1371 * pipeline mode.)
1372 */
1373 while (PQisBusy(conn) == 0)
1374 {
1375 bool new_error;
1376
1377 if (results >= numsent)
1378 {
1379 if (write_done)
1380 read_done = true;
1381 break;
1382 }
1383
1384 res = PQgetResult(conn);
1385 new_error = process_result(conn, res, results, numsent);
1386 if (new_error && got_error)
1387 pg_fatal("got two errors");
1388 got_error |= new_error;
1389 if (results++ >= numsent - 1)
1390 {
1391 if (write_done)
1392 read_done = true;
1393 break;
1394 }
1395 }
1396
1397 if (read_done)
1398 break;
1399
1400 FD_ZERO(&out_fds);
1401 FD_SET(sock, &out_fds);
1402
1403 FD_ZERO(&in_fds);
1404 FD_SET(sock, &in_fds);
1405
1406 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1407 {
1408 if (errno == EINTR)
1409 continue;
1410 pg_fatal("select() failed: %m");
1411 }
1412
1413 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1414 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1415
1416 /*
1417 * If the socket is writable and we haven't finished sending queries,
1418 * send some.
1419 */
1420 if (!write_done && FD_ISSET(sock, &out_fds))
1421 {
1422 for (;;)
1423 {
1424 int flush;
1425
1426 /*
1427 * provoke uniqueness violation exactly once after having
1428 * switched to read mode.
1429 */
1430 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1431 {
1432 sprintf(paramValue0, "%d", numsent / 2);
1433 fprintf(stderr, "E");
1434 error_sent = true;
1435 }
1436 else
1437 {
1438 fprintf(stderr, ".");
1439 sprintf(paramValue0, "%d", ctr++);
1440 }
1441
1442 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1443 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1444 numsent++;
1445
1446 /* Are we done writing? */
1447 if (socketful != 0 && numsent % socketful == 42 && error_sent)
1448 {
1449 if (PQsendFlushRequest(conn) != 1)
1450 pg_fatal("failed to send flush request");
1451 write_done = true;
1452 fprintf(stderr, "\ndone writing\n");
1453 PQflush(conn);
1454 break;
1455 }
1456
1457 /* is the outgoing socket full? */
1458 flush = PQflush(conn);
1459 if (flush == -1)
1460 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1461 if (flush == 1)
1462 {
1463 if (socketful == 0)
1464 socketful = numsent;
1465 fprintf(stderr, "\nswitch to reading\n");
1466 switched++;
1467 break;
1468 }
1469 }
1470 }
1471 }
1472
1473 if (!got_error)
1474 pg_fatal("did not get expected error");
1475
1476 fprintf(stderr, "ok\n");
1477 }
1478
1479 /*
1480 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1481 * the expected NULL that should follow it.
1482 *
1483 * Returns true if we read a fatal error message, otherwise false.
1484 */
1485 static bool
process_result(PGconn * conn,PGresult * res,int results,int numsent)1486 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1487 {
1488 PGresult *res2;
1489 bool got_error = false;
1490
1491 if (res == NULL)
1492 pg_fatal("got unexpected NULL");
1493
1494 switch (PQresultStatus(res))
1495 {
1496 case PGRES_FATAL_ERROR:
1497 got_error = true;
1498 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1499 PQclear(res);
1500
1501 res2 = PQgetResult(conn);
1502 if (res2 != NULL)
1503 pg_fatal("expected NULL, got %s",
1504 PQresStatus(PQresultStatus(res2)));
1505 break;
1506
1507 case PGRES_TUPLES_OK:
1508 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1509 PQclear(res);
1510
1511 res2 = PQgetResult(conn);
1512 if (res2 != NULL)
1513 pg_fatal("expected NULL, got %s",
1514 PQresStatus(PQresultStatus(res2)));
1515 break;
1516
1517 case PGRES_PIPELINE_ABORTED:
1518 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1519 res2 = PQgetResult(conn);
1520 if (res2 != NULL)
1521 pg_fatal("expected NULL, got %s",
1522 PQresStatus(PQresultStatus(res2)));
1523 break;
1524
1525 default:
1526 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1527 }
1528
1529 return got_error;
1530 }
1531
1532
1533 static void
usage(const char * progname)1534 usage(const char *progname)
1535 {
1536 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1537 fprintf(stderr, "Usage:\n");
1538 fprintf(stderr, " %s [OPTION] tests\n", progname);
1539 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1540 fprintf(stderr, "\nOptions:\n");
1541 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1542 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1543 }
1544
1545 static void
print_test_list(void)1546 print_test_list(void)
1547 {
1548 printf("disallowed_in_pipeline\n");
1549 printf("multi_pipelines\n");
1550 printf("nosync\n");
1551 printf("pipeline_abort\n");
1552 printf("pipelined_insert\n");
1553 printf("prepared\n");
1554 printf("simple_pipeline\n");
1555 printf("singlerow\n");
1556 printf("transaction\n");
1557 printf("uniqviol\n");
1558 }
1559
1560 int
main(int argc,char ** argv)1561 main(int argc, char **argv)
1562 {
1563 const char *conninfo = "";
1564 PGconn *conn;
1565 FILE *trace;
1566 char *testname;
1567 int numrows = 10000;
1568 PGresult *res;
1569 int c;
1570
1571 while ((c = getopt(argc, argv, "t:r:")) != -1)
1572 {
1573 switch (c)
1574 {
1575 case 't': /* trace file */
1576 tracefile = pg_strdup(optarg);
1577 break;
1578 case 'r': /* numrows */
1579 errno = 0;
1580 numrows = strtol(optarg, NULL, 10);
1581 if (errno != 0 || numrows <= 0)
1582 {
1583 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1584 optarg);
1585 exit(1);
1586 }
1587 break;
1588 }
1589 }
1590
1591 if (optind < argc)
1592 {
1593 testname = pg_strdup(argv[optind]);
1594 optind++;
1595 }
1596 else
1597 {
1598 usage(argv[0]);
1599 exit(1);
1600 }
1601
1602 if (strcmp(testname, "tests") == 0)
1603 {
1604 print_test_list();
1605 exit(0);
1606 }
1607
1608 if (optind < argc)
1609 {
1610 conninfo = pg_strdup(argv[optind]);
1611 optind++;
1612 }
1613
1614 /* Make a connection to the database */
1615 conn = PQconnectdb(conninfo);
1616 if (PQstatus(conn) != CONNECTION_OK)
1617 {
1618 fprintf(stderr, "Connection to database failed: %s\n",
1619 PQerrorMessage(conn));
1620 exit_nicely(conn);
1621 }
1622
1623 res = PQexec(conn, "SET lc_messages TO \"C\"");
1624 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1625 pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1626 res = PQexec(conn, "SET force_parallel_mode = off");
1627 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1628 pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
1629
1630 /* Set the trace file, if requested */
1631 if (tracefile != NULL)
1632 {
1633 trace = fopen(tracefile, "w");
1634 if (trace == NULL)
1635 pg_fatal("could not open file \"%s\": %m", tracefile);
1636
1637 /* Make it line-buffered */
1638 setvbuf(trace, NULL, PG_IOLBF, 0);
1639
1640 PQtrace(conn, trace);
1641 PQsetTraceFlags(conn,
1642 PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
1643 }
1644
1645 if (strcmp(testname, "disallowed_in_pipeline") == 0)
1646 test_disallowed_in_pipeline(conn);
1647 else if (strcmp(testname, "multi_pipelines") == 0)
1648 test_multi_pipelines(conn);
1649 else if (strcmp(testname, "nosync") == 0)
1650 test_nosync(conn);
1651 else if (strcmp(testname, "pipeline_abort") == 0)
1652 test_pipeline_abort(conn);
1653 else if (strcmp(testname, "pipelined_insert") == 0)
1654 test_pipelined_insert(conn, numrows);
1655 else if (strcmp(testname, "prepared") == 0)
1656 test_prepared(conn);
1657 else if (strcmp(testname, "simple_pipeline") == 0)
1658 test_simple_pipeline(conn);
1659 else if (strcmp(testname, "singlerow") == 0)
1660 test_singlerowmode(conn);
1661 else if (strcmp(testname, "transaction") == 0)
1662 test_transaction(conn);
1663 else if (strcmp(testname, "uniqviol") == 0)
1664 test_uniqviol(conn);
1665 else
1666 {
1667 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1668 exit(1);
1669 }
1670
1671 /* close the connection to the database and cleanup */
1672 PQfinish(conn);
1673 return 0;
1674 }
1675