1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  *		Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *		src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/time.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "catalog/pg_type_d.h"
24 #include "common/fe_memutils.h"
25 #include "libpq-fe.h"
26 #include "pg_getopt.h"
27 #include "portability/instr_time.h"
28 
29 
30 static void exit_nicely(PGconn *conn);
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32 						   int numsent);
33 
34 const char *const progname = "libpq_pipeline";
35 
36 /* Options and defaults */
37 char	   *tracefile = NULL;	/* path to PQtrace() file */
38 
39 
40 #ifdef DEBUG_OUTPUT
41 #define	pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45 
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59 
60 static void
exit_nicely(PGconn * conn)61 exit_nicely(PGconn *conn)
62 {
63 	PQfinish(conn);
64 	exit(1);
65 }
66 
67 /*
68  * Print an error to stderr and terminate the program.
69  */
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 static void
pg_attribute_noreturn()72 pg_attribute_noreturn()
73 pg_fatal_impl(int line, const char *fmt,...)
74 {
75 	va_list		args;
76 
77 
78 	fflush(stdout);
79 
80 	fprintf(stderr, "\n%s:%d: ", progname, line);
81 	va_start(args, fmt);
82 	vfprintf(stderr, fmt, args);
83 	va_end(args);
84 	Assert(fmt[strlen(fmt) - 1] != '\n');
85 	fprintf(stderr, "\n");
86 	exit(1);
87 }
88 
89 static void
test_disallowed_in_pipeline(PGconn * conn)90 test_disallowed_in_pipeline(PGconn *conn)
91 {
92 	PGresult   *res = NULL;
93 
94 	fprintf(stderr, "test error cases... ");
95 
96 	if (PQisnonblocking(conn))
97 		pg_fatal("Expected blocking connection mode");
98 
99 	if (PQenterPipelineMode(conn) != 1)
100 		pg_fatal("Unable to enter pipeline mode");
101 
102 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
103 		pg_fatal("Pipeline mode not activated properly");
104 
105 	/* PQexec should fail in pipeline mode */
106 	res = PQexec(conn, "SELECT 1");
107 	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
108 		pg_fatal("PQexec should fail in pipeline mode but succeeded");
109 
110 	/* Entering pipeline mode when already in pipeline mode is OK */
111 	if (PQenterPipelineMode(conn) != 1)
112 		pg_fatal("re-entering pipeline mode should be a no-op but failed");
113 
114 	if (PQisBusy(conn) != 0)
115 		pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
116 
117 	/* ok, back to normal command mode */
118 	if (PQexitPipelineMode(conn) != 1)
119 		pg_fatal("couldn't exit idle empty pipeline mode");
120 
121 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
122 		pg_fatal("Pipeline mode not terminated properly");
123 
124 	/* exiting pipeline mode when not in pipeline mode should be a no-op */
125 	if (PQexitPipelineMode(conn) != 1)
126 		pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
127 
128 	/* can now PQexec again */
129 	res = PQexec(conn, "SELECT 1");
130 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
131 		pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
132 				 PQerrorMessage(conn));
133 
134 	fprintf(stderr, "ok\n");
135 }
136 
137 static void
test_multi_pipelines(PGconn * conn)138 test_multi_pipelines(PGconn *conn)
139 {
140 	PGresult   *res = NULL;
141 	const char *dummy_params[1] = {"1"};
142 	Oid			dummy_param_oids[1] = {INT4OID};
143 
144 	fprintf(stderr, "multi pipeline... ");
145 
146 	/*
147 	 * Queue up a couple of small pipelines and process each without returning
148 	 * to command mode first.
149 	 */
150 	if (PQenterPipelineMode(conn) != 1)
151 		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
152 
153 	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
154 						  dummy_params, NULL, NULL, 0) != 1)
155 		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
156 
157 	if (PQpipelineSync(conn) != 1)
158 		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
159 
160 	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
161 						  dummy_params, NULL, NULL, 0) != 1)
162 		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
163 
164 	if (PQpipelineSync(conn) != 1)
165 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
166 
167 	/* OK, start processing the results */
168 	res = PQgetResult(conn);
169 	if (res == NULL)
170 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
171 				 PQerrorMessage(conn));
172 
173 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
174 		pg_fatal("Unexpected result code %s from first pipeline item",
175 				 PQresStatus(PQresultStatus(res)));
176 	PQclear(res);
177 	res = NULL;
178 
179 	if (PQgetResult(conn) != NULL)
180 		pg_fatal("PQgetResult returned something extra after first result");
181 
182 	if (PQexitPipelineMode(conn) != 0)
183 		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
184 
185 	res = PQgetResult(conn);
186 	if (res == NULL)
187 		pg_fatal("PQgetResult returned null when sync result expected: %s",
188 				 PQerrorMessage(conn));
189 
190 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
191 		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
192 				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
193 	PQclear(res);
194 
195 	/* second pipeline */
196 
197 	res = PQgetResult(conn);
198 	if (res == NULL)
199 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
200 				 PQerrorMessage(conn));
201 
202 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
203 		pg_fatal("Unexpected result code %s from second pipeline item",
204 				 PQresStatus(PQresultStatus(res)));
205 
206 	res = PQgetResult(conn);
207 	if (res != NULL)
208 		pg_fatal("Expected null result, got %s",
209 				 PQresStatus(PQresultStatus(res)));
210 
211 	res = PQgetResult(conn);
212 	if (res == NULL)
213 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
214 				 PQerrorMessage(conn));
215 
216 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
217 		pg_fatal("Unexpected result code %s from second pipeline sync",
218 				 PQresStatus(PQresultStatus(res)));
219 
220 	/* We're still in pipeline mode ... */
221 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
222 		pg_fatal("Fell out of pipeline mode somehow");
223 
224 	/* until we end it, which we can safely do now */
225 	if (PQexitPipelineMode(conn) != 1)
226 		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
227 				 PQerrorMessage(conn));
228 
229 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
230 		pg_fatal("exiting pipeline mode didn't seem to work");
231 
232 	fprintf(stderr, "ok\n");
233 }
234 
235 /*
236  * Test behavior when a pipeline dispatches a number of commands that are
237  * not flushed by a sync point.
238  */
239 static void
test_nosync(PGconn * conn)240 test_nosync(PGconn *conn)
241 {
242 	int			numqueries = 10;
243 	int			results = 0;
244 	int			sock = PQsocket(conn);
245 
246 	fprintf(stderr, "nosync... ");
247 
248 	if (sock < 0)
249 		pg_fatal("invalid socket");
250 
251 	if (PQenterPipelineMode(conn) != 1)
252 		pg_fatal("could not enter pipeline mode");
253 	for (int i = 0; i < numqueries; i++)
254 	{
255 		fd_set		input_mask;
256 		struct timeval tv;
257 
258 		if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
259 							  0, NULL, NULL, NULL, NULL, 0) != 1)
260 			pg_fatal("error sending select: %s", PQerrorMessage(conn));
261 		PQflush(conn);
262 
263 		/*
264 		 * If the server has written anything to us, read (some of) it now.
265 		 */
266 		FD_ZERO(&input_mask);
267 		FD_SET(sock, &input_mask);
268 		tv.tv_sec = 0;
269 		tv.tv_usec = 0;
270 		if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
271 		{
272 			fprintf(stderr, "select() failed: %s\n", strerror(errno));
273 			exit_nicely(conn);
274 		}
275 		if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
276 			pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
277 	}
278 
279 	/* tell server to flush its output buffer */
280 	if (PQsendFlushRequest(conn) != 1)
281 		pg_fatal("failed to send flush request");
282 	PQflush(conn);
283 
284 	/* Now read all results */
285 	for (;;)
286 	{
287 		PGresult   *res;
288 
289 		res = PQgetResult(conn);
290 
291 		/* NULL results are only expected after TUPLES_OK */
292 		if (res == NULL)
293 			pg_fatal("got unexpected NULL result after %d results", results);
294 
295 		/* We expect exactly one TUPLES_OK result for each query we sent */
296 		if (PQresultStatus(res) == PGRES_TUPLES_OK)
297 		{
298 			PGresult   *res2;
299 
300 			/* and one NULL result should follow each */
301 			res2 = PQgetResult(conn);
302 			if (res2 != NULL)
303 				pg_fatal("expected NULL, got %s",
304 						 PQresStatus(PQresultStatus(res2)));
305 			PQclear(res);
306 			results++;
307 
308 			/* if we're done, we're done */
309 			if (results == numqueries)
310 				break;
311 
312 			continue;
313 		}
314 
315 		/* anything else is unexpected */
316 		pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
317 	}
318 
319 	fprintf(stderr, "ok\n");
320 }
321 
322 /*
323  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
324  * still have to get results for each pipeline item, but the item will just be
325  * a PGRES_PIPELINE_ABORTED code.
326  *
327  * This intentionally doesn't use a transaction to wrap the pipeline. You should
328  * usually use an xact, but in this case we want to observe the effects of each
329  * statement.
330  */
331 static void
test_pipeline_abort(PGconn * conn)332 test_pipeline_abort(PGconn *conn)
333 {
334 	PGresult   *res = NULL;
335 	const char *dummy_params[1] = {"1"};
336 	Oid			dummy_param_oids[1] = {INT4OID};
337 	int			i;
338 	int			gotrows;
339 	bool		goterror;
340 
341 	fprintf(stderr, "aborted pipeline... ");
342 
343 	res = PQexec(conn, drop_table_sql);
344 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
345 		pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
346 
347 	res = PQexec(conn, create_table_sql);
348 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
349 		pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
350 
351 	/*
352 	 * Queue up a couple of small pipelines and process each without returning
353 	 * to command mode first. Make sure the second operation in the first
354 	 * pipeline ERRORs.
355 	 */
356 	if (PQenterPipelineMode(conn) != 1)
357 		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
358 
359 	dummy_params[0] = "1";
360 	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
361 						  dummy_params, NULL, NULL, 0) != 1)
362 		pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
363 
364 	if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
365 						  1, dummy_param_oids, dummy_params,
366 						  NULL, NULL, 0) != 1)
367 		pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
368 
369 	dummy_params[0] = "2";
370 	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
371 						  dummy_params, NULL, NULL, 0) != 1)
372 		pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
373 
374 	if (PQpipelineSync(conn) != 1)
375 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
376 
377 	dummy_params[0] = "3";
378 	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
379 						  dummy_params, NULL, NULL, 0) != 1)
380 		pg_fatal("dispatching second-pipeline insert failed: %s",
381 				 PQerrorMessage(conn));
382 
383 	if (PQpipelineSync(conn) != 1)
384 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
385 
386 	/*
387 	 * OK, start processing the pipeline results.
388 	 *
389 	 * We should get a command-ok for the first query, then a fatal error and
390 	 * a pipeline aborted message for the second insert, a pipeline-end, then
391 	 * a command-ok and a pipeline-ok for the second pipeline operation.
392 	 */
393 	res = PQgetResult(conn);
394 	if (res == NULL)
395 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
396 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
397 		pg_fatal("Unexpected result status %s: %s",
398 				 PQresStatus(PQresultStatus(res)),
399 				 PQresultErrorMessage(res));
400 	PQclear(res);
401 
402 	/* NULL result to signal end-of-results for this command */
403 	if ((res = PQgetResult(conn)) != NULL)
404 		pg_fatal("Expected null result, got %s",
405 				 PQresStatus(PQresultStatus(res)));
406 
407 	/* Second query caused error, so we expect an error next */
408 	res = PQgetResult(conn);
409 	if (res == NULL)
410 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
411 	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
412 		pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
413 				 PQresStatus(PQresultStatus(res)));
414 	PQclear(res);
415 
416 	/* NULL result to signal end-of-results for this command */
417 	if ((res = PQgetResult(conn)) != NULL)
418 		pg_fatal("Expected null result, got %s",
419 				 PQresStatus(PQresultStatus(res)));
420 
421 	/*
422 	 * pipeline should now be aborted.
423 	 *
424 	 * Note that we could still queue more queries at this point if we wanted;
425 	 * they'd get added to a new third pipeline since we've already sent a
426 	 * second. The aborted flag relates only to the pipeline being received.
427 	 */
428 	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
429 		pg_fatal("pipeline should be flagged as aborted but isn't");
430 
431 	/* third query in pipeline, the second insert */
432 	res = PQgetResult(conn);
433 	if (res == NULL)
434 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
435 	if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
436 		pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
437 				 PQresStatus(PQresultStatus(res)));
438 	PQclear(res);
439 
440 	/* NULL result to signal end-of-results for this command */
441 	if ((res = PQgetResult(conn)) != NULL)
442 		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
443 
444 	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
445 		pg_fatal("pipeline should be flagged as aborted but isn't");
446 
447 	/* Ensure we're still in pipeline */
448 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
449 		pg_fatal("Fell out of pipeline mode somehow");
450 
451 	/*
452 	 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
453 	 *
454 	 * (This is so clients know to start processing results normally again and
455 	 * can tell the difference between skipped commands and the sync.)
456 	 */
457 	res = PQgetResult(conn);
458 	if (res == NULL)
459 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
460 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
461 		pg_fatal("Unexpected result code from first pipeline sync\n"
462 				 "Expected PGRES_PIPELINE_SYNC, got %s",
463 				 PQresStatus(PQresultStatus(res)));
464 	PQclear(res);
465 
466 	if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
467 		pg_fatal("sync should've cleared the aborted flag but didn't");
468 
469 	/* We're still in pipeline mode... */
470 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
471 		pg_fatal("Fell out of pipeline mode somehow");
472 
473 	/* the insert from the second pipeline */
474 	res = PQgetResult(conn);
475 	if (res == NULL)
476 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
477 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
478 		pg_fatal("Unexpected result code %s from first item in second pipeline",
479 				 PQresStatus(PQresultStatus(res)));
480 	PQclear(res);
481 
482 	/* Read the NULL result at the end of the command */
483 	if ((res = PQgetResult(conn)) != NULL)
484 		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
485 
486 	/* the second pipeline sync */
487 	if ((res = PQgetResult(conn)) == NULL)
488 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
489 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
490 		pg_fatal("Unexpected result code %s from second pipeline sync",
491 				 PQresStatus(PQresultStatus(res)));
492 	PQclear(res);
493 
494 	if ((res = PQgetResult(conn)) != NULL)
495 		pg_fatal("Expected null result, got %s: %s",
496 				 PQresStatus(PQresultStatus(res)),
497 				 PQerrorMessage(conn));
498 
499 	/* Try to send two queries in one command */
500 	if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
501 		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
502 	if (PQpipelineSync(conn) != 1)
503 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
504 	goterror = false;
505 	while ((res = PQgetResult(conn)) != NULL)
506 	{
507 		switch (PQresultStatus(res))
508 		{
509 			case PGRES_FATAL_ERROR:
510 				if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
511 					pg_fatal("expected error about multiple commands, got %s",
512 							 PQerrorMessage(conn));
513 				printf("got expected %s", PQerrorMessage(conn));
514 				goterror = true;
515 				break;
516 			default:
517 				pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
518 				break;
519 		}
520 	}
521 	if (!goterror)
522 		pg_fatal("did not get cannot-insert-multiple-commands error");
523 	res = PQgetResult(conn);
524 	if (res == NULL)
525 		pg_fatal("got NULL result");
526 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
527 		pg_fatal("Unexpected result code %s from pipeline sync",
528 				 PQresStatus(PQresultStatus(res)));
529 	fprintf(stderr, "ok\n");
530 
531 	/* Test single-row mode with an error partways */
532 	if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
533 		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
534 	if (PQpipelineSync(conn) != 1)
535 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
536 	PQsetSingleRowMode(conn);
537 	goterror = false;
538 	gotrows = 0;
539 	while ((res = PQgetResult(conn)) != NULL)
540 	{
541 		switch (PQresultStatus(res))
542 		{
543 			case PGRES_SINGLE_TUPLE:
544 				printf("got row: %s\n", PQgetvalue(res, 0, 0));
545 				gotrows++;
546 				break;
547 			case PGRES_FATAL_ERROR:
548 				if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
549 					pg_fatal("expected division-by-zero, got: %s (%s)",
550 							 PQerrorMessage(conn),
551 							 PQresultErrorField(res, PG_DIAG_SQLSTATE));
552 				printf("got expected division-by-zero\n");
553 				goterror = true;
554 				break;
555 			default:
556 				pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
557 		}
558 		PQclear(res);
559 	}
560 	if (!goterror)
561 		pg_fatal("did not get division-by-zero error");
562 	if (gotrows != 3)
563 		pg_fatal("did not get three rows");
564 	/* the third pipeline sync */
565 	if ((res = PQgetResult(conn)) == NULL)
566 		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
567 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
568 		pg_fatal("Unexpected result code %s from third pipeline sync",
569 				 PQresStatus(PQresultStatus(res)));
570 	PQclear(res);
571 
572 	/* We're still in pipeline mode... */
573 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
574 		pg_fatal("Fell out of pipeline mode somehow");
575 
576 	/* until we end it, which we can safely do now */
577 	if (PQexitPipelineMode(conn) != 1)
578 		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
579 				 PQerrorMessage(conn));
580 
581 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
582 		pg_fatal("exiting pipeline mode didn't seem to work");
583 
584 	fprintf(stderr, "ok\n");
585 
586 	/*-
587 	 * Since we fired the pipelines off without a surrounding xact, the results
588 	 * should be:
589 	 *
590 	 * - Implicit xact started by server around 1st pipeline
591 	 * - First insert applied
592 	 * - Second statement aborted xact
593 	 * - Third insert skipped
594 	 * - Sync rolled back first implicit xact
595 	 * - Implicit xact created by server around 2nd pipeline
596 	 * - insert applied from 2nd pipeline
597 	 * - Sync commits 2nd xact
598 	 *
599 	 * So we should only have the value 3 that we inserted.
600 	 */
601 	res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
602 
603 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
604 		pg_fatal("Expected tuples, got %s: %s",
605 				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
606 	if (PQntuples(res) != 1)
607 		pg_fatal("expected 1 result, got %d", PQntuples(res));
608 	for (i = 0; i < PQntuples(res); i++)
609 	{
610 		const char *val = PQgetvalue(res, i, 0);
611 
612 		if (strcmp(val, "3") != 0)
613 			pg_fatal("expected only insert with value 3, got %s", val);
614 	}
615 
616 	PQclear(res);
617 }
618 
619 /* State machine enum for test_pipelined_insert */
620 enum PipelineInsertStep
621 {
622 	BI_BEGIN_TX,
623 	BI_DROP_TABLE,
624 	BI_CREATE_TABLE,
625 	BI_PREPARE,
626 	BI_INSERT_ROWS,
627 	BI_COMMIT_TX,
628 	BI_SYNC,
629 	BI_DONE
630 };
631 
632 static void
test_pipelined_insert(PGconn * conn,int n_rows)633 test_pipelined_insert(PGconn *conn, int n_rows)
634 {
635 	Oid			insert_param_oids[2] = {INT4OID, INT8OID};
636 	const char *insert_params[2];
637 	char		insert_param_0[MAXINTLEN];
638 	char		insert_param_1[MAXINT8LEN];
639 	enum PipelineInsertStep send_step = BI_BEGIN_TX,
640 				recv_step = BI_BEGIN_TX;
641 	int			rows_to_send,
642 				rows_to_receive;
643 
644 	insert_params[0] = insert_param_0;
645 	insert_params[1] = insert_param_1;
646 
647 	rows_to_send = rows_to_receive = n_rows;
648 
649 	/*
650 	 * Do a pipelined insert into a table created at the start of the pipeline
651 	 */
652 	if (PQenterPipelineMode(conn) != 1)
653 		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
654 
655 	while (send_step != BI_PREPARE)
656 	{
657 		const char *sql;
658 
659 		switch (send_step)
660 		{
661 			case BI_BEGIN_TX:
662 				sql = "BEGIN TRANSACTION";
663 				send_step = BI_DROP_TABLE;
664 				break;
665 
666 			case BI_DROP_TABLE:
667 				sql = drop_table_sql;
668 				send_step = BI_CREATE_TABLE;
669 				break;
670 
671 			case BI_CREATE_TABLE:
672 				sql = create_table_sql;
673 				send_step = BI_PREPARE;
674 				break;
675 
676 			default:
677 				pg_fatal("invalid state");
678 				sql = NULL;		/* keep compiler quiet */
679 		}
680 
681 		pg_debug("sending: %s\n", sql);
682 		if (PQsendQueryParams(conn, sql,
683 							  0, NULL, NULL, NULL, NULL, 0) != 1)
684 			pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
685 	}
686 
687 	Assert(send_step == BI_PREPARE);
688 	pg_debug("sending: %s\n", insert_sql2);
689 	if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
690 		pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
691 	send_step = BI_INSERT_ROWS;
692 
693 	/*
694 	 * Now we start inserting. We'll be sending enough data that we could fill
695 	 * our output buffer, so to avoid deadlocking we need to enter nonblocking
696 	 * mode and consume input while we send more output. As results of each
697 	 * query are processed we should pop them to allow processing of the next
698 	 * query. There's no need to finish the pipeline before processing
699 	 * results.
700 	 */
701 	if (PQsetnonblocking(conn, 1) != 0)
702 		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
703 
704 	while (recv_step != BI_DONE)
705 	{
706 		int			sock;
707 		fd_set		input_mask;
708 		fd_set		output_mask;
709 
710 		sock = PQsocket(conn);
711 
712 		if (sock < 0)
713 			break;				/* shouldn't happen */
714 
715 		FD_ZERO(&input_mask);
716 		FD_SET(sock, &input_mask);
717 		FD_ZERO(&output_mask);
718 		FD_SET(sock, &output_mask);
719 
720 		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
721 		{
722 			fprintf(stderr, "select() failed: %s\n", strerror(errno));
723 			exit_nicely(conn);
724 		}
725 
726 		/*
727 		 * Process any results, so we keep the server's output buffer free
728 		 * flowing and it can continue to process input
729 		 */
730 		if (FD_ISSET(sock, &input_mask))
731 		{
732 			PQconsumeInput(conn);
733 
734 			/* Read until we'd block if we tried to read */
735 			while (!PQisBusy(conn) && recv_step < BI_DONE)
736 			{
737 				PGresult   *res;
738 				const char *cmdtag = "";
739 				const char *description = "";
740 				int			status;
741 
742 				/*
743 				 * Read next result.  If no more results from this query,
744 				 * advance to the next query
745 				 */
746 				res = PQgetResult(conn);
747 				if (res == NULL)
748 					continue;
749 
750 				status = PGRES_COMMAND_OK;
751 				switch (recv_step)
752 				{
753 					case BI_BEGIN_TX:
754 						cmdtag = "BEGIN";
755 						recv_step++;
756 						break;
757 					case BI_DROP_TABLE:
758 						cmdtag = "DROP TABLE";
759 						recv_step++;
760 						break;
761 					case BI_CREATE_TABLE:
762 						cmdtag = "CREATE TABLE";
763 						recv_step++;
764 						break;
765 					case BI_PREPARE:
766 						cmdtag = "";
767 						description = "PREPARE";
768 						recv_step++;
769 						break;
770 					case BI_INSERT_ROWS:
771 						cmdtag = "INSERT";
772 						rows_to_receive--;
773 						if (rows_to_receive == 0)
774 							recv_step++;
775 						break;
776 					case BI_COMMIT_TX:
777 						cmdtag = "COMMIT";
778 						recv_step++;
779 						break;
780 					case BI_SYNC:
781 						cmdtag = "";
782 						description = "SYNC";
783 						status = PGRES_PIPELINE_SYNC;
784 						recv_step++;
785 						break;
786 					case BI_DONE:
787 						/* unreachable */
788 						pg_fatal("unreachable state");
789 				}
790 
791 				if (PQresultStatus(res) != status)
792 					pg_fatal("%s reported status %s, expected %s\n"
793 							 "Error message: \"%s\"",
794 							 description, PQresStatus(PQresultStatus(res)),
795 							 PQresStatus(status), PQerrorMessage(conn));
796 
797 				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
798 					pg_fatal("%s expected command tag '%s', got '%s'",
799 							 description, cmdtag, PQcmdStatus(res));
800 
801 				pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
802 
803 				PQclear(res);
804 			}
805 		}
806 
807 		/* Write more rows and/or the end pipeline message, if needed */
808 		if (FD_ISSET(sock, &output_mask))
809 		{
810 			PQflush(conn);
811 
812 			if (send_step == BI_INSERT_ROWS)
813 			{
814 				snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
815 				/* use up some buffer space with a wide value */
816 				snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
817 
818 				if (PQsendQueryPrepared(conn, "my_insert",
819 										2, insert_params, NULL, NULL, 0) == 1)
820 				{
821 					pg_debug("sent row %d\n", rows_to_send);
822 
823 					rows_to_send--;
824 					if (rows_to_send == 0)
825 						send_step++;
826 				}
827 				else
828 				{
829 					/*
830 					 * in nonblocking mode, so it's OK for an insert to fail
831 					 * to send
832 					 */
833 					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
834 							rows_to_send, PQerrorMessage(conn));
835 				}
836 			}
837 			else if (send_step == BI_COMMIT_TX)
838 			{
839 				if (PQsendQueryParams(conn, "COMMIT",
840 									  0, NULL, NULL, NULL, NULL, 0) == 1)
841 				{
842 					pg_debug("sent COMMIT\n");
843 					send_step++;
844 				}
845 				else
846 				{
847 					fprintf(stderr, "WARNING: failed to send commit: %s\n",
848 							PQerrorMessage(conn));
849 				}
850 			}
851 			else if (send_step == BI_SYNC)
852 			{
853 				if (PQpipelineSync(conn) == 1)
854 				{
855 					fprintf(stdout, "pipeline sync sent\n");
856 					send_step++;
857 				}
858 				else
859 				{
860 					fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
861 							PQerrorMessage(conn));
862 				}
863 			}
864 		}
865 	}
866 
867 	/* We've got the sync message and the pipeline should be done */
868 	if (PQexitPipelineMode(conn) != 1)
869 		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
870 				 PQerrorMessage(conn));
871 
872 	if (PQsetnonblocking(conn, 0) != 0)
873 		pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
874 
875 	fprintf(stderr, "ok\n");
876 }
877 
878 static void
test_prepared(PGconn * conn)879 test_prepared(PGconn *conn)
880 {
881 	PGresult   *res = NULL;
882 	Oid			param_oids[1] = {INT4OID};
883 	Oid			expected_oids[4];
884 	Oid			typ;
885 
886 	fprintf(stderr, "prepared... ");
887 
888 	if (PQenterPipelineMode(conn) != 1)
889 		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
890 	if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
891 					  "interval '1 sec'",
892 					  1, param_oids) != 1)
893 		pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
894 	expected_oids[0] = INT4OID;
895 	expected_oids[1] = TEXTOID;
896 	expected_oids[2] = NUMERICOID;
897 	expected_oids[3] = INTERVALOID;
898 	if (PQsendDescribePrepared(conn, "select_one") != 1)
899 		pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
900 	if (PQpipelineSync(conn) != 1)
901 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
902 
903 	res = PQgetResult(conn);
904 	if (res == NULL)
905 		pg_fatal("PQgetResult returned null");
906 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
907 		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
908 	PQclear(res);
909 	res = PQgetResult(conn);
910 	if (res != NULL)
911 		pg_fatal("expected NULL result");
912 
913 	res = PQgetResult(conn);
914 	if (res == NULL)
915 		pg_fatal("PQgetResult returned NULL");
916 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
917 		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
918 	if (PQnfields(res) != lengthof(expected_oids))
919 		pg_fatal("expected %d columns, got %d",
920 				 lengthof(expected_oids), PQnfields(res));
921 	for (int i = 0; i < PQnfields(res); i++)
922 	{
923 		typ = PQftype(res, i);
924 		if (typ != expected_oids[i])
925 			pg_fatal("field %d: expected type %u, got %u",
926 					 i, expected_oids[i], typ);
927 	}
928 	PQclear(res);
929 	res = PQgetResult(conn);
930 	if (res != NULL)
931 		pg_fatal("expected NULL result");
932 
933 	res = PQgetResult(conn);
934 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
935 		pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
936 
937 	if (PQexitPipelineMode(conn) != 1)
938 		pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
939 
940 	PQexec(conn, "BEGIN");
941 	PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
942 	PQenterPipelineMode(conn);
943 	if (PQsendDescribePortal(conn, "cursor_one") != 1)
944 		pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
945 	if (PQpipelineSync(conn) != 1)
946 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
947 	res = PQgetResult(conn);
948 	if (res == NULL)
949 		pg_fatal("expected NULL result");
950 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
951 		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
952 
953 	typ = PQftype(res, 0);
954 	if (typ != INT4OID)
955 		pg_fatal("portal: expected type %u, got %u",
956 				 INT4OID, typ);
957 	PQclear(res);
958 	res = PQgetResult(conn);
959 	if (res != NULL)
960 		pg_fatal("expected NULL result");
961 	res = PQgetResult(conn);
962 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
963 		pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
964 
965 	if (PQexitPipelineMode(conn) != 1)
966 		pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
967 
968 	fprintf(stderr, "ok\n");
969 }
970 
971 static void
test_simple_pipeline(PGconn * conn)972 test_simple_pipeline(PGconn *conn)
973 {
974 	PGresult   *res = NULL;
975 	const char *dummy_params[1] = {"1"};
976 	Oid			dummy_param_oids[1] = {INT4OID};
977 
978 	fprintf(stderr, "simple pipeline... ");
979 
980 	/*
981 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
982 	 * process the results of as they come in.
983 	 *
984 	 * For a simple case we should be able to do this without interim
985 	 * processing of results since our output buffer will give us enough slush
986 	 * to work with and we won't block on sending. So blocking mode is fine.
987 	 */
988 	if (PQisnonblocking(conn))
989 		pg_fatal("Expected blocking connection mode");
990 
991 	if (PQenterPipelineMode(conn) != 1)
992 		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
993 
994 	if (PQsendQueryParams(conn, "SELECT $1",
995 						  1, dummy_param_oids, dummy_params,
996 						  NULL, NULL, 0) != 1)
997 		pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
998 
999 	if (PQexitPipelineMode(conn) != 0)
1000 		pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1001 
1002 	if (PQpipelineSync(conn) != 1)
1003 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1004 
1005 	res = PQgetResult(conn);
1006 	if (res == NULL)
1007 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1008 				 PQerrorMessage(conn));
1009 
1010 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1011 		pg_fatal("Unexpected result code %s from first pipeline item",
1012 				 PQresStatus(PQresultStatus(res)));
1013 
1014 	PQclear(res);
1015 	res = NULL;
1016 
1017 	if (PQgetResult(conn) != NULL)
1018 		pg_fatal("PQgetResult returned something extra after first query result.");
1019 
1020 	/*
1021 	 * Even though we've processed the result there's still a sync to come and
1022 	 * we can't exit pipeline mode yet
1023 	 */
1024 	if (PQexitPipelineMode(conn) != 0)
1025 		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1026 
1027 	res = PQgetResult(conn);
1028 	if (res == NULL)
1029 		pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1030 				 PQerrorMessage(conn));
1031 
1032 	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1033 		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1034 				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1035 
1036 	PQclear(res);
1037 	res = NULL;
1038 
1039 	if (PQgetResult(conn) != NULL)
1040 		pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1041 				 PQresStatus(PQresultStatus(res)));
1042 
1043 	/* We're still in pipeline mode... */
1044 	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1045 		pg_fatal("Fell out of pipeline mode somehow");
1046 
1047 	/* ... until we end it, which we can safely do now */
1048 	if (PQexitPipelineMode(conn) != 1)
1049 		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1050 				 PQerrorMessage(conn));
1051 
1052 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1053 		pg_fatal("Exiting pipeline mode didn't seem to work");
1054 
1055 	fprintf(stderr, "ok\n");
1056 }
1057 
1058 static void
test_singlerowmode(PGconn * conn)1059 test_singlerowmode(PGconn *conn)
1060 {
1061 	PGresult   *res;
1062 	int			i;
1063 	bool		pipeline_ended = false;
1064 
1065 	/* 1 pipeline, 3 queries in it */
1066 	if (PQenterPipelineMode(conn) != 1)
1067 		pg_fatal("failed to enter pipeline mode: %s",
1068 				 PQerrorMessage(conn));
1069 
1070 	for (i = 0; i < 3; i++)
1071 	{
1072 		char	   *param[1];
1073 
1074 		param[0] = psprintf("%d", 44 + i);
1075 
1076 		if (PQsendQueryParams(conn,
1077 							  "SELECT generate_series(42, $1)",
1078 							  1,
1079 							  NULL,
1080 							  (const char **) param,
1081 							  NULL,
1082 							  NULL,
1083 							  0) != 1)
1084 			pg_fatal("failed to send query: %s",
1085 					 PQerrorMessage(conn));
1086 		pfree(param[0]);
1087 	}
1088 	if (PQpipelineSync(conn) != 1)
1089 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1090 
1091 	for (i = 0; !pipeline_ended; i++)
1092 	{
1093 		bool		first = true;
1094 		bool		saw_ending_tuplesok;
1095 		bool		isSingleTuple = false;
1096 
1097 		/* Set single row mode for only first 2 SELECT queries */
1098 		if (i < 2)
1099 		{
1100 			if (PQsetSingleRowMode(conn) != 1)
1101 				pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1102 		}
1103 
1104 		/* Consume rows for this query */
1105 		saw_ending_tuplesok = false;
1106 		while ((res = PQgetResult(conn)) != NULL)
1107 		{
1108 			ExecStatusType est = PQresultStatus(res);
1109 
1110 			if (est == PGRES_PIPELINE_SYNC)
1111 			{
1112 				fprintf(stderr, "end of pipeline reached\n");
1113 				pipeline_ended = true;
1114 				PQclear(res);
1115 				if (i != 3)
1116 					pg_fatal("Expected three results, got %d", i);
1117 				break;
1118 			}
1119 
1120 			/* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1121 			if (first)
1122 			{
1123 				if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1124 					pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1125 							 i, PQresStatus(est));
1126 				if (i >= 2 && est != PGRES_TUPLES_OK)
1127 					pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1128 							 i, PQresStatus(est));
1129 				first = false;
1130 			}
1131 
1132 			fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1133 			switch (est)
1134 			{
1135 				case PGRES_TUPLES_OK:
1136 					fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1137 					saw_ending_tuplesok = true;
1138 					if (isSingleTuple)
1139 					{
1140 						if (PQntuples(res) == 0)
1141 							fprintf(stderr, "all tuples received in query %d\n", i);
1142 						else
1143 							pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1144 					}
1145 					break;
1146 
1147 				case PGRES_SINGLE_TUPLE:
1148 					isSingleTuple = true;
1149 					fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1150 					break;
1151 
1152 				default:
1153 					pg_fatal("unexpected");
1154 			}
1155 			PQclear(res);
1156 		}
1157 		if (!pipeline_ended && !saw_ending_tuplesok)
1158 			pg_fatal("didn't get expected terminating TUPLES_OK");
1159 	}
1160 
1161 	if (PQexitPipelineMode(conn) != 1)
1162 		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1163 }
1164 
1165 /*
1166  * Simple test to verify that a pipeline is discarded as a whole when there's
1167  * an error, ignoring transaction commands.
1168  */
1169 static void
test_transaction(PGconn * conn)1170 test_transaction(PGconn *conn)
1171 {
1172 	PGresult   *res;
1173 	bool		expect_null;
1174 	int			num_syncs = 0;
1175 
1176 	res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1177 				 "CREATE TABLE pq_pipeline_tst (id int)");
1178 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1179 		pg_fatal("failed to create test table: %s",
1180 				 PQerrorMessage(conn));
1181 	PQclear(res);
1182 
1183 	if (PQenterPipelineMode(conn) != 1)
1184 		pg_fatal("failed to enter pipeline mode: %s",
1185 				 PQerrorMessage(conn));
1186 	if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1187 		pg_fatal("could not send prepare on pipeline: %s",
1188 				 PQerrorMessage(conn));
1189 
1190 	if (PQsendQueryParams(conn,
1191 						  "BEGIN",
1192 						  0, NULL, NULL, NULL, NULL, 0) != 1)
1193 		pg_fatal("failed to send query: %s",
1194 				 PQerrorMessage(conn));
1195 	if (PQsendQueryParams(conn,
1196 						  "SELECT 0/0",
1197 						  0, NULL, NULL, NULL, NULL, 0) != 1)
1198 		pg_fatal("failed to send query: %s",
1199 				 PQerrorMessage(conn));
1200 
1201 	/*
1202 	 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1203 	 * get out of the pipeline-aborted state first.
1204 	 */
1205 	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1206 		pg_fatal("failed to execute prepared: %s",
1207 				 PQerrorMessage(conn));
1208 
1209 	/* This insert fails because we're in pipeline-aborted state */
1210 	if (PQsendQueryParams(conn,
1211 						  "INSERT INTO pq_pipeline_tst VALUES (1)",
1212 						  0, NULL, NULL, NULL, NULL, 0) != 1)
1213 		pg_fatal("failed to send query: %s",
1214 				 PQerrorMessage(conn));
1215 	if (PQpipelineSync(conn) != 1)
1216 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1217 	num_syncs++;
1218 
1219 	/*
1220 	 * This insert fails even though the pipeline got a SYNC, because we're in
1221 	 * an aborted transaction
1222 	 */
1223 	if (PQsendQueryParams(conn,
1224 						  "INSERT INTO pq_pipeline_tst VALUES (2)",
1225 						  0, NULL, NULL, NULL, NULL, 0) != 1)
1226 		pg_fatal("failed to send query: %s",
1227 				 PQerrorMessage(conn));
1228 	if (PQpipelineSync(conn) != 1)
1229 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1230 	num_syncs++;
1231 
1232 	/*
1233 	 * Send ROLLBACK using prepared stmt. This one works because we just did
1234 	 * PQpipelineSync above.
1235 	 */
1236 	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1237 		pg_fatal("failed to execute prepared: %s",
1238 				 PQerrorMessage(conn));
1239 
1240 	/*
1241 	 * Now that we're out of a transaction and in pipeline-good mode, this
1242 	 * insert works
1243 	 */
1244 	if (PQsendQueryParams(conn,
1245 						  "INSERT INTO pq_pipeline_tst VALUES (3)",
1246 						  0, NULL, NULL, NULL, NULL, 0) != 1)
1247 		pg_fatal("failed to send query: %s",
1248 				 PQerrorMessage(conn));
1249 	/* Send two syncs now -- match up to SYNC messages below */
1250 	if (PQpipelineSync(conn) != 1)
1251 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1252 	num_syncs++;
1253 	if (PQpipelineSync(conn) != 1)
1254 		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1255 	num_syncs++;
1256 
1257 	expect_null = false;
1258 	for (int i = 0;; i++)
1259 	{
1260 		ExecStatusType restype;
1261 
1262 		res = PQgetResult(conn);
1263 		if (res == NULL)
1264 		{
1265 			printf("%d: got NULL result\n", i);
1266 			if (!expect_null)
1267 				pg_fatal("did not expect NULL here");
1268 			expect_null = false;
1269 			continue;
1270 		}
1271 		restype = PQresultStatus(res);
1272 		printf("%d: got status %s", i, PQresStatus(restype));
1273 		if (expect_null)
1274 			pg_fatal("expected NULL");
1275 		if (restype == PGRES_FATAL_ERROR)
1276 			printf("; error: %s", PQerrorMessage(conn));
1277 		else if (restype == PGRES_PIPELINE_ABORTED)
1278 		{
1279 			printf(": command didn't run because pipeline aborted\n");
1280 		}
1281 		else
1282 			printf("\n");
1283 		PQclear(res);
1284 
1285 		if (restype == PGRES_PIPELINE_SYNC)
1286 			num_syncs--;
1287 		else
1288 			expect_null = true;
1289 		if (num_syncs <= 0)
1290 			break;
1291 	}
1292 	if (PQgetResult(conn) != NULL)
1293 		pg_fatal("returned something extra after all the syncs: %s",
1294 				 PQresStatus(PQresultStatus(res)));
1295 
1296 	if (PQexitPipelineMode(conn) != 1)
1297 		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1298 
1299 	/* We expect to find one tuple containing the value "3" */
1300 	res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1301 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1302 		pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1303 	if (PQntuples(res) != 1)
1304 		pg_fatal("did not get 1 tuple");
1305 	if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1306 		pg_fatal("did not get expected tuple");
1307 	PQclear(res);
1308 
1309 	fprintf(stderr, "ok\n");
1310 }
1311 
1312 /*
1313  * In this test mode we send a stream of queries, with one in the middle
1314  * causing an error.  Verify that we can still send some more after the
1315  * error and have libpq work properly.
1316  */
1317 static void
test_uniqviol(PGconn * conn)1318 test_uniqviol(PGconn *conn)
1319 {
1320 	int			sock = PQsocket(conn);
1321 	PGresult   *res;
1322 	Oid			paramTypes[2] = {INT8OID, INT8OID};
1323 	const char *paramValues[2];
1324 	char		paramValue0[MAXINT8LEN];
1325 	char		paramValue1[MAXINT8LEN];
1326 	int			ctr = 0;
1327 	int			numsent = 0;
1328 	int			results = 0;
1329 	bool		read_done = false;
1330 	bool		write_done = false;
1331 	bool		error_sent = false;
1332 	bool		got_error = false;
1333 	int			switched = 0;
1334 	int			socketful = 0;
1335 	fd_set		in_fds;
1336 	fd_set		out_fds;
1337 
1338 	fprintf(stderr, "uniqviol ...");
1339 
1340 	PQsetnonblocking(conn, 1);
1341 
1342 	paramValues[0] = paramValue0;
1343 	paramValues[1] = paramValue1;
1344 	sprintf(paramValue1, "42");
1345 
1346 	res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1347 				 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1348 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1349 		pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1350 
1351 	res = PQexec(conn, "begin");
1352 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1353 		pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1354 
1355 	res = PQprepare(conn, "insertion",
1356 					"insert into ppln_uniqviol values ($1, $2) returning id",
1357 					2, paramTypes);
1358 	if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1359 		pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1360 
1361 	if (PQenterPipelineMode(conn) != 1)
1362 		pg_fatal("failed to enter pipeline mode");
1363 
1364 	while (!read_done)
1365 	{
1366 		/*
1367 		 * Avoid deadlocks by reading everything the server has sent before
1368 		 * sending anything.  (Special precaution is needed here to process
1369 		 * PQisBusy before testing the socket for read-readiness, because the
1370 		 * socket does not turn read-ready after "sending" queries in aborted
1371 		 * pipeline mode.)
1372 		 */
1373 		while (PQisBusy(conn) == 0)
1374 		{
1375 			bool		new_error;
1376 
1377 			if (results >= numsent)
1378 			{
1379 				if (write_done)
1380 					read_done = true;
1381 				break;
1382 			}
1383 
1384 			res = PQgetResult(conn);
1385 			new_error = process_result(conn, res, results, numsent);
1386 			if (new_error && got_error)
1387 				pg_fatal("got two errors");
1388 			got_error |= new_error;
1389 			if (results++ >= numsent - 1)
1390 			{
1391 				if (write_done)
1392 					read_done = true;
1393 				break;
1394 			}
1395 		}
1396 
1397 		if (read_done)
1398 			break;
1399 
1400 		FD_ZERO(&out_fds);
1401 		FD_SET(sock, &out_fds);
1402 
1403 		FD_ZERO(&in_fds);
1404 		FD_SET(sock, &in_fds);
1405 
1406 		if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1407 		{
1408 			if (errno == EINTR)
1409 				continue;
1410 			pg_fatal("select() failed: %m");
1411 		}
1412 
1413 		if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1414 			pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1415 
1416 		/*
1417 		 * If the socket is writable and we haven't finished sending queries,
1418 		 * send some.
1419 		 */
1420 		if (!write_done && FD_ISSET(sock, &out_fds))
1421 		{
1422 			for (;;)
1423 			{
1424 				int			flush;
1425 
1426 				/*
1427 				 * provoke uniqueness violation exactly once after having
1428 				 * switched to read mode.
1429 				 */
1430 				if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1431 				{
1432 					sprintf(paramValue0, "%d", numsent / 2);
1433 					fprintf(stderr, "E");
1434 					error_sent = true;
1435 				}
1436 				else
1437 				{
1438 					fprintf(stderr, ".");
1439 					sprintf(paramValue0, "%d", ctr++);
1440 				}
1441 
1442 				if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1443 					pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1444 				numsent++;
1445 
1446 				/* Are we done writing? */
1447 				if (socketful != 0 && numsent % socketful == 42 && error_sent)
1448 				{
1449 					if (PQsendFlushRequest(conn) != 1)
1450 						pg_fatal("failed to send flush request");
1451 					write_done = true;
1452 					fprintf(stderr, "\ndone writing\n");
1453 					PQflush(conn);
1454 					break;
1455 				}
1456 
1457 				/* is the outgoing socket full? */
1458 				flush = PQflush(conn);
1459 				if (flush == -1)
1460 					pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1461 				if (flush == 1)
1462 				{
1463 					if (socketful == 0)
1464 						socketful = numsent;
1465 					fprintf(stderr, "\nswitch to reading\n");
1466 					switched++;
1467 					break;
1468 				}
1469 			}
1470 		}
1471 	}
1472 
1473 	if (!got_error)
1474 		pg_fatal("did not get expected error");
1475 
1476 	fprintf(stderr, "ok\n");
1477 }
1478 
1479 /*
1480  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1481  * the expected NULL that should follow it.
1482  *
1483  * Returns true if we read a fatal error message, otherwise false.
1484  */
1485 static bool
process_result(PGconn * conn,PGresult * res,int results,int numsent)1486 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1487 {
1488 	PGresult   *res2;
1489 	bool		got_error = false;
1490 
1491 	if (res == NULL)
1492 		pg_fatal("got unexpected NULL");
1493 
1494 	switch (PQresultStatus(res))
1495 	{
1496 		case PGRES_FATAL_ERROR:
1497 			got_error = true;
1498 			fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1499 			PQclear(res);
1500 
1501 			res2 = PQgetResult(conn);
1502 			if (res2 != NULL)
1503 				pg_fatal("expected NULL, got %s",
1504 						 PQresStatus(PQresultStatus(res2)));
1505 			break;
1506 
1507 		case PGRES_TUPLES_OK:
1508 			fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1509 			PQclear(res);
1510 
1511 			res2 = PQgetResult(conn);
1512 			if (res2 != NULL)
1513 				pg_fatal("expected NULL, got %s",
1514 						 PQresStatus(PQresultStatus(res2)));
1515 			break;
1516 
1517 		case PGRES_PIPELINE_ABORTED:
1518 			fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1519 			res2 = PQgetResult(conn);
1520 			if (res2 != NULL)
1521 				pg_fatal("expected NULL, got %s",
1522 						 PQresStatus(PQresultStatus(res2)));
1523 			break;
1524 
1525 		default:
1526 			pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1527 	}
1528 
1529 	return got_error;
1530 }
1531 
1532 
1533 static void
usage(const char * progname)1534 usage(const char *progname)
1535 {
1536 	fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1537 	fprintf(stderr, "Usage:\n");
1538 	fprintf(stderr, "  %s [OPTION] tests\n", progname);
1539 	fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1540 	fprintf(stderr, "\nOptions:\n");
1541 	fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
1542 	fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
1543 }
1544 
1545 static void
print_test_list(void)1546 print_test_list(void)
1547 {
1548 	printf("disallowed_in_pipeline\n");
1549 	printf("multi_pipelines\n");
1550 	printf("nosync\n");
1551 	printf("pipeline_abort\n");
1552 	printf("pipelined_insert\n");
1553 	printf("prepared\n");
1554 	printf("simple_pipeline\n");
1555 	printf("singlerow\n");
1556 	printf("transaction\n");
1557 	printf("uniqviol\n");
1558 }
1559 
1560 int
main(int argc,char ** argv)1561 main(int argc, char **argv)
1562 {
1563 	const char *conninfo = "";
1564 	PGconn	   *conn;
1565 	FILE	   *trace;
1566 	char	   *testname;
1567 	int			numrows = 10000;
1568 	PGresult   *res;
1569 	int			c;
1570 
1571 	while ((c = getopt(argc, argv, "t:r:")) != -1)
1572 	{
1573 		switch (c)
1574 		{
1575 			case 't':			/* trace file */
1576 				tracefile = pg_strdup(optarg);
1577 				break;
1578 			case 'r':			/* numrows */
1579 				errno = 0;
1580 				numrows = strtol(optarg, NULL, 10);
1581 				if (errno != 0 || numrows <= 0)
1582 				{
1583 					fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1584 							optarg);
1585 					exit(1);
1586 				}
1587 				break;
1588 		}
1589 	}
1590 
1591 	if (optind < argc)
1592 	{
1593 		testname = pg_strdup(argv[optind]);
1594 		optind++;
1595 	}
1596 	else
1597 	{
1598 		usage(argv[0]);
1599 		exit(1);
1600 	}
1601 
1602 	if (strcmp(testname, "tests") == 0)
1603 	{
1604 		print_test_list();
1605 		exit(0);
1606 	}
1607 
1608 	if (optind < argc)
1609 	{
1610 		conninfo = pg_strdup(argv[optind]);
1611 		optind++;
1612 	}
1613 
1614 	/* Make a connection to the database */
1615 	conn = PQconnectdb(conninfo);
1616 	if (PQstatus(conn) != CONNECTION_OK)
1617 	{
1618 		fprintf(stderr, "Connection to database failed: %s\n",
1619 				PQerrorMessage(conn));
1620 		exit_nicely(conn);
1621 	}
1622 
1623 	res = PQexec(conn, "SET lc_messages TO \"C\"");
1624 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1625 		pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1626 	res = PQexec(conn, "SET force_parallel_mode = off");
1627 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1628 		pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
1629 
1630 	/* Set the trace file, if requested */
1631 	if (tracefile != NULL)
1632 	{
1633 		trace = fopen(tracefile, "w");
1634 		if (trace == NULL)
1635 			pg_fatal("could not open file \"%s\": %m", tracefile);
1636 
1637 		/* Make it line-buffered */
1638 		setvbuf(trace, NULL, PG_IOLBF, 0);
1639 
1640 		PQtrace(conn, trace);
1641 		PQsetTraceFlags(conn,
1642 						PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
1643 	}
1644 
1645 	if (strcmp(testname, "disallowed_in_pipeline") == 0)
1646 		test_disallowed_in_pipeline(conn);
1647 	else if (strcmp(testname, "multi_pipelines") == 0)
1648 		test_multi_pipelines(conn);
1649 	else if (strcmp(testname, "nosync") == 0)
1650 		test_nosync(conn);
1651 	else if (strcmp(testname, "pipeline_abort") == 0)
1652 		test_pipeline_abort(conn);
1653 	else if (strcmp(testname, "pipelined_insert") == 0)
1654 		test_pipelined_insert(conn, numrows);
1655 	else if (strcmp(testname, "prepared") == 0)
1656 		test_prepared(conn);
1657 	else if (strcmp(testname, "simple_pipeline") == 0)
1658 		test_simple_pipeline(conn);
1659 	else if (strcmp(testname, "singlerow") == 0)
1660 		test_singlerowmode(conn);
1661 	else if (strcmp(testname, "transaction") == 0)
1662 		test_transaction(conn);
1663 	else if (strcmp(testname, "uniqviol") == 0)
1664 		test_uniqviol(conn);
1665 	else
1666 	{
1667 		fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1668 		exit(1);
1669 	}
1670 
1671 	/* close the connection to the database and cleanup */
1672 	PQfinish(conn);
1673 	return 0;
1674 }
1675