1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 
7 #include <postgres.h>
8 #include <nodes/pg_list.h>
9 #include <libpq-fe.h>
10 #include <storage/latch.h>
11 #include <miscadmin.h>
12 #include <pgstat.h>
13 #include <fmgr.h>
14 #include <utils/lsyscache.h>
15 #include <catalog/pg_type.h>
16 #include <nodes/pathnodes.h>
17 
18 #include <annotations.h>
19 #include "async.h"
20 #include "connection.h"
21 #include "utils.h"
22 
23 /**
24  * State machine for AsyncRequest:
25  *
26  *   +-------------+           +--------------+        +--------------+
27  *   |             |           |              |        |              |
28  *   |  DEFERRED   +---------->+  EXECUTING   +------->+   COMPLETED  |
29  *   |             |           |              |        |              |
30  *   +-------------+           +--------------+        +--------------+
31  *
32  **/
33 
34 typedef enum AsyncRequestState
35 {
36 	DEFERRED,
37 	EXECUTING,
38 	COMPLETED,
39 } AsyncRequestState;
40 
41 typedef struct AsyncRequest
42 {
43 	const char *sql;
44 	TSConnection *conn;
45 	AsyncRequestState state;
46 	const char *stmt_name;
47 	int prep_stmt_params;
48 	async_response_callback response_cb;
49 	void *user_data; /* custom data saved with the request */
50 	StmtParams *params;
51 	int res_format; /* text or binary */
52 	bool is_xact_transition;
53 } AsyncRequest;
54 
55 typedef struct PreparedStmt
56 {
57 	const char *sql;
58 	TSConnection *conn;
59 	const char *stmt_name;
60 	int n_params;
61 } PreparedStmt;
62 
63 /* It is often useful to get the request along with the result in the response */
64 
65 typedef struct AsyncResponse
66 {
67 	AsyncResponseType type;
68 } AsyncResponse;
69 
70 typedef struct AsyncResponseResult
71 {
72 	AsyncResponse base;
73 	PGresult *result;
74 	AsyncRequest *request;
75 } AsyncResponseResult;
76 
77 typedef struct AsyncResponseCommunicationError
78 {
79 	AsyncResponse base;
80 	AsyncRequest *request;
81 } AsyncResponseCommunicationError;
82 
83 typedef struct AsyncResponseError
84 {
85 	AsyncResponse base;
86 	const char *errmsg;
87 } AsyncResponseError;
88 
89 typedef struct AsyncRequestSet
90 {
91 	List *requests;
92 } AsyncRequestSet;
93 
94 static AsyncRequest *
async_request_create(TSConnection * conn,const char * sql,const char * stmt_name,int prep_stmt_params,StmtParams * stmt_params,int res_format)95 async_request_create(TSConnection *conn, const char *sql, const char *stmt_name,
96 					 int prep_stmt_params, StmtParams *stmt_params, int res_format)
97 {
98 	AsyncRequest *req;
99 
100 	if (conn == NULL)
101 		elog(ERROR, "can't create AsyncRequest with NULL connection");
102 
103 	req = palloc0(sizeof(AsyncRequest));
104 	*req = (AsyncRequest){ .conn = conn,
105 						   .state = DEFERRED,
106 						   .sql = pstrdup(sql),
107 						   .stmt_name = stmt_name,
108 						   .params = stmt_params,
109 						   .prep_stmt_params = prep_stmt_params,
110 						   .res_format = res_format };
111 
112 	return req;
113 }
114 
115 static void
async_request_set_state(AsyncRequest * req,AsyncRequestState new_state)116 async_request_set_state(AsyncRequest *req, AsyncRequestState new_state)
117 {
118 	if (req->state != DEFERRED)
119 		Assert(req->state != new_state);
120 
121 #ifdef USE_ASSERT_CHECKING
122 	switch (new_state)
123 	{
124 		case DEFERRED:
125 			/* initial state */
126 			Assert(req->state == DEFERRED);
127 			break;
128 		case EXECUTING:
129 			Assert(req->state == DEFERRED);
130 			break;
131 		case COMPLETED:
132 			Assert(req->state == EXECUTING);
133 	}
134 #endif
135 	req->state = new_state;
136 }
137 
138 /* Send a request. In case there is an ongoing request for the connection,
139    we will not send the request but set its status to DEFERRED.
140    Getting a response from DEFERRED AsyncRequest will try sending it if
141    the connection is not in use.
142 
143    Note that we can only send one sql statement per request.
144    This is because we use `PQsendQueryParams` which uses the extended query protocol
145    instead of the simple one. The extended protocol does not support multiple
146    statements. In the future we can use a `PQsendQuery` variant for queries without parameters,
147    which can support multiple statements because it uses the simple protocol. But this is
148    an optimization for another time.
149 */
150 static AsyncRequest *
async_request_send_internal(AsyncRequest * req,int elevel)151 async_request_send_internal(AsyncRequest *req, int elevel)
152 {
153 	if (req->state != DEFERRED)
154 		elog(elevel, "can't send async request in state \"%d\"", req->state);
155 
156 	if (remote_connection_is_processing(req->conn))
157 		return req;
158 
159 	/* Send configuration parameters if necessary */
160 	remote_connection_configure_if_changed(req->conn);
161 
162 	if (req->stmt_name)
163 	{
164 		/*
165 		 * We intentionally do not specify parameter types here, but leave the
166 		 * data node to derive them by default.  This avoids possible problems
167 		 * with the data node using different type OIDs than we do.  All of
168 		 * the prepared statements we use in this module are simple enough that
169 		 * the data node will make the right choices.
170 		 */
171 		if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn),
172 							   req->stmt_name,
173 							   req->sql,
174 							   req->prep_stmt_params,
175 							   NULL))
176 		{
177 			/*
178 			 * null is fine to pass down as the res, the connection error message
179 			 * will get through
180 			 */
181 			remote_connection_elog(req->conn, elevel);
182 			return NULL;
183 		}
184 	}
185 	else
186 	{
187 		if (0 == PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
188 								   req->sql,
189 								   stmt_params_total_values(req->params),
190 								   /* param types - see note above */ NULL,
191 								   stmt_params_values(req->params),
192 								   stmt_params_lengths(req->params),
193 								   stmt_params_formats(req->params),
194 								   req->res_format))
195 		{
196 			/*
197 			 * null is fine to pass down as the res, the connection error message
198 			 * will get through
199 			 */
200 			remote_connection_elog(req->conn, elevel);
201 			return NULL;
202 		}
203 	}
204 	async_request_set_state(req, EXECUTING);
205 	remote_connection_set_status(req->conn, CONN_PROCESSING);
206 	return req;
207 }
208 
209 AsyncRequest *
async_request_send_with_stmt_params_elevel_res_format(TSConnection * conn,const char * sql_statement,StmtParams * params,int elevel,int res_format)210 async_request_send_with_stmt_params_elevel_res_format(TSConnection *conn, const char *sql_statement,
211 													  StmtParams *params, int elevel,
212 													  int res_format)
213 {
214 	AsyncRequest *req = async_request_create(conn, sql_statement, NULL, 0, params, res_format);
215 	req = async_request_send_internal(req, elevel);
216 	return req;
217 }
218 
219 AsyncRequest *
async_request_send_prepare(TSConnection * conn,const char * sql,int n_params)220 async_request_send_prepare(TSConnection *conn, const char *sql, int n_params)
221 {
222 	AsyncRequest *req;
223 	size_t stmt_name_len = NAMEDATALEN;
224 	char *stmt_name = palloc(sizeof(char) * stmt_name_len);
225 	int written;
226 
227 	/* Construct name we'll use for the prepared statement. */
228 	written =
229 		snprintf(stmt_name, stmt_name_len, "ts_prep_%u", remote_connection_get_prep_stmt_number());
230 
231 	if (written < 0 || written >= stmt_name_len)
232 		elog(ERROR, "cannot create prepared statement name");
233 
234 	req = async_request_create(conn, sql, stmt_name, n_params, NULL, FORMAT_TEXT);
235 	req = async_request_send_internal(req, ERROR);
236 
237 	return req;
238 }
239 
240 extern AsyncRequest *
async_request_send_prepared_stmt(PreparedStmt * stmt,const char * const * param_values)241 async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_values)
242 {
243 	AsyncRequest *req =
244 		async_request_create(stmt->conn,
245 							 stmt->sql,
246 							 NULL,
247 							 stmt->n_params,
248 							 stmt_params_create_from_values((const char **) param_values,
249 															stmt->n_params),
250 							 FORMAT_TEXT);
251 	return async_request_send_internal(req, ERROR);
252 }
253 
254 AsyncRequest *
async_request_send_prepared_stmt_with_params(PreparedStmt * stmt,StmtParams * params,int res_format)255 async_request_send_prepared_stmt_with_params(PreparedStmt *stmt, StmtParams *params, int res_format)
256 {
257 	AsyncRequest *req =
258 		async_request_create(stmt->conn, stmt->sql, NULL, stmt->n_params, params, res_format);
259 	return async_request_send_internal(req, ERROR);
260 }
261 
262 /* Set user data. Often it is useful to attach data with a request so
263    that it can later be fetched from the response. */
264 void
async_request_attach_user_data(AsyncRequest * req,void * user_data)265 async_request_attach_user_data(AsyncRequest *req, void *user_data)
266 {
267 	req->user_data = user_data;
268 }
269 
270 void
async_request_set_response_callback(AsyncRequest * req,async_response_callback cb,void * user_data)271 async_request_set_response_callback(AsyncRequest *req, async_response_callback cb, void *user_data)
272 {
273 	req->response_cb = cb;
274 	req->user_data = user_data;
275 }
276 
277 static AsyncResponseResult *
async_response_result_create(AsyncRequest * req,PGresult * res)278 async_response_result_create(AsyncRequest *req, PGresult *res)
279 {
280 	AsyncResponseResult *ares;
281 	AsyncResponseType type = RESPONSE_RESULT;
282 
283 	if (PQresultStatus(res) == PGRES_SINGLE_TUPLE)
284 		type = RESPONSE_ROW;
285 
286 	ares = palloc0(sizeof(AsyncResponseResult));
287 
288 	*ares = (AsyncResponseResult){
289 		.base = { .type = type },
290 		.request = req,
291 		.result = res,
292 	};
293 
294 	return ares;
295 }
296 
297 static AsyncResponseCommunicationError *
async_response_communication_error_create(AsyncRequest * req)298 async_response_communication_error_create(AsyncRequest *req)
299 {
300 	AsyncResponseCommunicationError *ares = palloc0(sizeof(AsyncResponseCommunicationError));
301 
302 	*ares = (AsyncResponseCommunicationError){
303 		.base = { .type = RESPONSE_COMMUNICATION_ERROR },
304 		.request = req,
305 	};
306 
307 	return ares;
308 }
309 
310 static AsyncResponse *
async_response_timeout_create()311 async_response_timeout_create()
312 {
313 	AsyncResponse *ares = palloc0(sizeof(AsyncResponse));
314 
315 	*ares = (AsyncResponse){
316 		.type = RESPONSE_TIMEOUT,
317 	};
318 
319 	return ares;
320 }
321 
322 static AsyncResponse *
async_response_error_create(const char * errmsg)323 async_response_error_create(const char *errmsg)
324 {
325 	AsyncResponseError *ares = palloc0(sizeof(AsyncResponseError));
326 
327 	*ares = (AsyncResponseError){
328 		.base = { .type = RESPONSE_ERROR },
329 		.errmsg = pstrdup(errmsg),
330 	};
331 
332 	return &ares->base;
333 }
334 
335 void
async_response_result_close(AsyncResponseResult * res)336 async_response_result_close(AsyncResponseResult *res)
337 {
338 	PQclear(res->result);
339 	pfree(res);
340 }
341 
342 /* Closes the async response. Note that `async_response_report_error` does this automatically. */
343 void
async_response_close(AsyncResponse * res)344 async_response_close(AsyncResponse *res)
345 {
346 	switch (res->type)
347 	{
348 		case RESPONSE_RESULT:
349 		case RESPONSE_ROW:
350 			async_response_result_close((AsyncResponseResult *) res);
351 			break;
352 		default:
353 			pfree(res);
354 			break;
355 	}
356 }
357 
358 AsyncResponseType
async_response_get_type(AsyncResponse * res)359 async_response_get_type(AsyncResponse *res)
360 {
361 	return res->type;
362 }
363 
364 /* get the user data attached to the corresponding request */
365 void *
async_response_result_get_user_data(AsyncResponseResult * res)366 async_response_result_get_user_data(AsyncResponseResult *res)
367 {
368 	return res->request->user_data;
369 }
370 
371 PGresult *
async_response_result_get_pg_result(AsyncResponseResult * res)372 async_response_result_get_pg_result(AsyncResponseResult *res)
373 {
374 	return res->result;
375 }
376 
377 AsyncRequest *
async_response_result_get_request(AsyncResponseResult * res)378 async_response_result_get_request(AsyncResponseResult *res)
379 {
380 	return res->request;
381 }
382 
383 bool
async_request_set_single_row_mode(AsyncRequest * req)384 async_request_set_single_row_mode(AsyncRequest *req)
385 {
386 	return remote_connection_set_single_row_mode(req->conn);
387 }
388 
389 TSConnection *
async_request_get_connection(AsyncRequest * req)390 async_request_get_connection(AsyncRequest *req)
391 {
392 	return req->conn;
393 }
394 
395 void
async_response_report_error(AsyncResponse * res,int elevel)396 async_response_report_error(AsyncResponse *res, int elevel)
397 {
398 	switch (res->type)
399 	{
400 		case RESPONSE_RESULT:
401 		case RESPONSE_ROW:
402 		{
403 			AsyncResponseResult *aresult = (AsyncResponseResult *) res;
404 			ExecStatusType status = PQresultStatus(aresult->result);
405 
406 			switch (status)
407 			{
408 				case PGRES_COMMAND_OK:
409 				case PGRES_TUPLES_OK:
410 				case PGRES_SINGLE_TUPLE:
411 					break;
412 				case PGRES_NONFATAL_ERROR:
413 				case PGRES_FATAL_ERROR:
414 					/* result is closed by remote_result_elog in case it throws
415 					 * error */
416 					remote_result_elog(aresult->result, elevel);
417 					break;
418 				default:
419 					PG_TRY();
420 					{
421 						elog(elevel, "unexpected response status %u", status);
422 					}
423 					PG_CATCH();
424 					{
425 						async_response_close(res);
426 						PG_RE_THROW();
427 					}
428 					PG_END_TRY();
429 			}
430 			break;
431 		}
432 		case RESPONSE_COMMUNICATION_ERROR:
433 			remote_connection_elog(((AsyncResponseCommunicationError *) res)->request->conn,
434 								   elevel);
435 			break;
436 		case RESPONSE_ERROR:
437 			elog(elevel, "%s", ((AsyncResponseError *) res)->errmsg);
438 			break;
439 		case RESPONSE_TIMEOUT:
440 			elog(elevel, "async operation timed out");
441 	}
442 }
443 
444 void
async_response_report_error_or_close(AsyncResponse * res,int elevel)445 async_response_report_error_or_close(AsyncResponse *res, int elevel)
446 {
447 	async_response_report_error(res, elevel);
448 	async_response_close(res);
449 }
450 
451 /*
452  * This is a convenience function to wait for a single result from a request.
453  * This function requires that the request is for a single SQL statement.
454  */
455 AsyncResponseResult *
async_request_wait_any_result(AsyncRequest * req)456 async_request_wait_any_result(AsyncRequest *req)
457 {
458 	AsyncRequestSet set = { 0 };
459 	AsyncResponseResult *result;
460 
461 	async_request_set_add(&set, req);
462 	result = async_request_set_wait_any_result(&set);
463 
464 	/* Should expect exactly one response */
465 	if (NULL == result)
466 		elog(ERROR, "remote request failed");
467 
468 	/* Make sure to drain the connection only if we've retrieved complete result set */
469 	if (result->base.type == RESPONSE_RESULT)
470 	{
471 		AsyncResponseResult *extra;
472 		bool got_extra = false;
473 
474 		/* Must drain any remaining result until NULL */
475 		while ((extra = async_request_set_wait_any_result(&set)))
476 		{
477 			async_response_result_close(extra);
478 			got_extra = true;
479 		}
480 
481 		if (got_extra)
482 		{
483 			async_response_result_close(result);
484 			elog(ERROR, "request must be for one sql statement");
485 		}
486 	}
487 
488 	return result;
489 }
490 
491 AsyncResponseResult *
async_request_wait_ok_result(AsyncRequest * req)492 async_request_wait_ok_result(AsyncRequest *req)
493 {
494 	AsyncResponseResult *res = async_request_wait_any_result(req);
495 
496 	if (PQresultStatus(res->result) != PGRES_COMMAND_OK &&
497 		PQresultStatus(res->result) != PGRES_TUPLES_OK)
498 	{
499 		async_response_report_error(&res->base, ERROR);
500 		Assert(false);
501 	}
502 
503 	return res;
504 }
505 
506 /*
507  * Get the result of an async request during cleanup.
508  *
509  * Cleanup is typically necessary for a query that is being interrupted by
510  * transaction abort, or a query that was initiated as part of transaction
511  * abort to get the remote side back to the appropriate state.
512  *
513  * endtime is the time at which we should give up and assume the remote
514  * side is dead.
515  *
516  * An AsyncReponse is always returned, indicating last PGresult received,
517  * a timeout, or error.
518  */
519 AsyncResponse *
async_request_cleanup_result(AsyncRequest * req,TimestampTz endtime)520 async_request_cleanup_result(AsyncRequest *req, TimestampTz endtime)
521 {
522 	TSConnection *conn = async_request_get_connection(req);
523 	PGresult *last_res = NULL;
524 	AsyncResponse *rsp = NULL;
525 
526 	switch (req->state)
527 	{
528 		case DEFERRED:
529 			if (remote_connection_is_processing(req->conn))
530 				return async_response_error_create("request already in progress");
531 
532 			req = async_request_send_internal(req, WARNING);
533 
534 			if (req == NULL)
535 				return async_response_error_create("failed to send deferred request");
536 
537 			Assert(req->state == EXECUTING);
538 			break;
539 		case EXECUTING:
540 			break;
541 		case COMPLETED:
542 			return async_response_error_create("request already completed");
543 	}
544 
545 	switch (remote_connection_drain(conn, endtime, &last_res))
546 	{
547 		case CONN_TIMEOUT:
548 			rsp = async_response_timeout_create();
549 			break;
550 		case CONN_DISCONNECT:
551 			rsp = &async_response_communication_error_create(req)->base;
552 			break;
553 		case CONN_NO_RESPONSE:
554 			rsp = async_response_error_create("no response during cleanup");
555 			break;
556 		case CONN_OK:
557 			Assert(last_res != NULL);
558 			rsp = &async_response_result_create(req, last_res)->base;
559 			break;
560 	}
561 
562 	Assert(rsp != NULL);
563 
564 	return rsp;
565 }
566 
567 void
async_request_wait_ok_command(AsyncRequest * req)568 async_request_wait_ok_command(AsyncRequest *req)
569 {
570 	AsyncResponseResult *res = async_request_wait_any_result(req);
571 
572 	if (PQresultStatus(res->result) != PGRES_COMMAND_OK)
573 	{
574 		async_response_report_error(&res->base, ERROR);
575 		Assert(false);
576 	}
577 
578 	async_response_result_close(res);
579 }
580 
581 PreparedStmt *
async_request_wait_prepared_statement(AsyncRequest * request)582 async_request_wait_prepared_statement(AsyncRequest *request)
583 {
584 	AsyncResponseResult *result;
585 	PreparedStmt *prep;
586 
587 	Assert(request->stmt_name != NULL);
588 
589 	result = async_request_wait_ok_result(request);
590 	prep = async_response_result_generate_prepared_stmt(result);
591 	async_response_result_close(result);
592 
593 	return prep;
594 }
595 
596 AsyncRequestSet *
async_request_set_create()597 async_request_set_create()
598 {
599 	return palloc0(sizeof(AsyncRequestSet));
600 }
601 
602 void
async_request_set_add(AsyncRequestSet * set,AsyncRequest * req)603 async_request_set_add(AsyncRequestSet *set, AsyncRequest *req)
604 {
605 	set->requests = list_append_unique_ptr(set->requests, req);
606 }
607 
608 static AsyncResponse *
get_single_response_nonblocking(AsyncRequestSet * set)609 get_single_response_nonblocking(AsyncRequestSet *set)
610 {
611 	ListCell *lc;
612 
613 	foreach (lc, set->requests)
614 	{
615 		AsyncRequest *req = lfirst(lc);
616 		PGconn *pg_conn = remote_connection_get_pg_conn(req->conn);
617 
618 		switch (req->state)
619 		{
620 			case DEFERRED:
621 				if (remote_connection_is_processing(req->conn))
622 					return async_response_error_create("request already in progress");
623 
624 				req = async_request_send_internal(req, WARNING);
625 
626 				if (req == NULL)
627 					return async_response_error_create("failed to send deferred request");
628 
629 				Assert(req->state == EXECUTING);
630 				TS_FALLTHROUGH;
631 			case EXECUTING:
632 				if (0 == PQisBusy(pg_conn))
633 				{
634 					PGresult *res = PQgetResult(pg_conn);
635 
636 					if (NULL == res)
637 					{
638 						/*
639 						 * NULL return means query is complete
640 						 */
641 						set->requests = list_delete_ptr(set->requests, req);
642 						remote_connection_set_status(req->conn, CONN_IDLE);
643 						async_request_set_state(req, COMPLETED);
644 
645 						/* set changed so rerun function */
646 						return get_single_response_nonblocking(set);
647 					}
648 					return &async_response_result_create(req, res)->base;
649 				}
650 				break;
651 			case COMPLETED:
652 				return async_response_error_create("request already completed");
653 		}
654 	}
655 
656 	return NULL;
657 }
658 
659 /*
660  * wait_to_consume_data waits until data is recieved and put into buffers
661  * so that it can be recieved without blocking by `get_single_response_nonblocking`
662  * or similar.
663  *
664  * Returns NULL on success or an "error" AsyncResponse
665  */
666 static AsyncResponse *
wait_to_consume_data(AsyncRequestSet * set,TimestampTz end_time)667 wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
668 {
669 	/*
670 	 * Looks like there is no good way to modify a WaitEventSet so we have to
671 	 * make a new one, otherwise we can't turn off wait events
672 	 */
673 	WaitEventSet *we_set;
674 	ListCell *lc;
675 	int rc;
676 	WaitEvent event;
677 	uint32 wait_event_info = PG_WAIT_EXTENSION;
678 	AsyncRequest *wait_req;
679 	AsyncResponse *result;
680 	long timeout_ms = -1L;
681 
682 	Assert(list_length(set->requests) > 0);
683 
684 	if (end_time != TS_NO_TIMEOUT)
685 	{
686 		TimestampTz now = GetCurrentTimestamp();
687 		long secs;
688 		int microsecs;
689 
690 		if (now >= end_time)
691 			return async_response_timeout_create();
692 
693 		TimestampDifference(now, end_time, &secs, &microsecs);
694 		timeout_ms = secs * 1000 + (microsecs / 1000);
695 	}
696 
697 	we_set = CreateWaitEventSet(CurrentMemoryContext, list_length(set->requests) + 1);
698 
699 	/* always wait for my latch */
700 	AddWaitEventToSet(we_set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) MyLatch, NULL);
701 
702 	foreach (lc, set->requests)
703 	{
704 		AsyncRequest *req = lfirst(lc);
705 
706 		AddWaitEventToSet(we_set,
707 						  WL_SOCKET_READABLE,
708 						  PQsocket(remote_connection_get_pg_conn(req->conn)),
709 						  NULL,
710 						  req);
711 	}
712 
713 	while (true)
714 	{
715 		wait_req = NULL;
716 		rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, wait_event_info);
717 
718 		if (rc == 0)
719 		{
720 			result = async_response_timeout_create();
721 			break;
722 		}
723 
724 		CHECK_FOR_INTERRUPTS();
725 
726 		if (event.events & WL_LATCH_SET)
727 			ResetLatch(MyLatch);
728 		else if (event.events & WL_SOCKET_READABLE)
729 		{
730 			wait_req = event.user_data;
731 			Assert(wait_req != NULL);
732 
733 			if (0 == PQconsumeInput(remote_connection_get_pg_conn(wait_req->conn)))
734 			{
735 				/* remove connection from set */
736 				set->requests = list_delete_ptr(set->requests, wait_req);
737 				result = &async_response_communication_error_create(wait_req)->base;
738 				break;
739 			}
740 			result = NULL;
741 			break;
742 		}
743 		else
744 		{
745 			result = async_response_error_create("unexpected event");
746 			break;
747 		}
748 	}
749 
750 	FreeWaitEventSet(we_set);
751 	return result;
752 }
753 
754 /* Return NULL when nothing more to do in set */
755 AsyncResponse *
async_request_set_wait_any_response_deadline(AsyncRequestSet * set,TimestampTz endtime)756 async_request_set_wait_any_response_deadline(AsyncRequestSet *set, TimestampTz endtime)
757 {
758 	AsyncResponse *response;
759 
760 	while (true)
761 	{
762 		response = get_single_response_nonblocking(set);
763 
764 		if (response != NULL)
765 			break;
766 
767 		if (list_length(set->requests) == 0)
768 			/* nothing to wait on anymore */
769 			return NULL;
770 
771 		response = wait_to_consume_data(set, endtime);
772 
773 		if (response != NULL)
774 			break;
775 	}
776 
777 	/* Make sure callbacks are run when a response is received. For a timeout,
778 	 * we run the callbacks on all the requests the user has been waiting
779 	 * on. */
780 	if (NULL != response)
781 	{
782 		List *requests = NIL;
783 		ListCell *lc;
784 
785 		switch (response->type)
786 		{
787 			case RESPONSE_RESULT:
788 			case RESPONSE_ROW:
789 				requests = list_make1(((AsyncResponseResult *) response)->request);
790 				break;
791 			case RESPONSE_COMMUNICATION_ERROR:
792 				requests = list_make1(((AsyncResponseCommunicationError *) response)->request);
793 				break;
794 			case RESPONSE_ERROR:
795 			case RESPONSE_TIMEOUT:
796 				requests = set->requests;
797 				break;
798 		}
799 
800 		foreach (lc, requests)
801 		{
802 			AsyncRequest *req = lfirst(lc);
803 
804 			if (NULL != req->response_cb)
805 				req->response_cb(req, response, req->user_data);
806 		}
807 	}
808 
809 	return response;
810 }
811 
812 AsyncResponseResult *
async_request_set_wait_any_result(AsyncRequestSet * set)813 async_request_set_wait_any_result(AsyncRequestSet *set)
814 {
815 	AsyncResponse *res = async_request_set_wait_any_response(set);
816 
817 	if (res == NULL)
818 		return NULL;
819 
820 	if (!(RESPONSE_RESULT == res->type || RESPONSE_ROW == res->type))
821 		async_response_report_error(res, ERROR);
822 
823 	return (AsyncResponseResult *) res;
824 }
825 
826 AsyncResponseResult *
async_request_set_wait_ok_result(AsyncRequestSet * set)827 async_request_set_wait_ok_result(AsyncRequestSet *set)
828 {
829 	AsyncResponseResult *response_result = async_request_set_wait_any_result(set);
830 	ExecStatusType status;
831 
832 	if (response_result == NULL)
833 		return NULL;
834 
835 	status = PQresultStatus(response_result->result);
836 
837 	if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK)
838 	{
839 		async_response_report_error(&response_result->base, ERROR);
840 		Assert(false);
841 	}
842 
843 	return response_result;
844 }
845 
846 void
async_request_set_wait_all_ok_commands(AsyncRequestSet * set)847 async_request_set_wait_all_ok_commands(AsyncRequestSet *set)
848 {
849 	AsyncResponse *rsp;
850 	AsyncResponse *bad_rsp = NULL;
851 
852 	/* Drain all responses and record the first error */
853 	while ((rsp = async_request_set_wait_any_response(set)))
854 	{
855 		switch (async_response_get_type(rsp))
856 		{
857 			case RESPONSE_RESULT:
858 			case RESPONSE_ROW:
859 			{
860 				AsyncResponseResult *ar = (AsyncResponseResult *) rsp;
861 				ExecStatusType status = PQresultStatus(async_response_result_get_pg_result(ar));
862 
863 				if (status != PGRES_COMMAND_OK && bad_rsp == NULL)
864 					bad_rsp = rsp;
865 				else
866 					async_response_result_close(ar);
867 				break;
868 			}
869 			default:
870 				if (bad_rsp == NULL)
871 					bad_rsp = rsp;
872 				break;
873 		}
874 	}
875 
876 	/* Throw error once request set is drained */
877 	if (bad_rsp != NULL)
878 		async_response_report_error(bad_rsp, ERROR);
879 }
880 
881 void
async_request_discard_response(AsyncRequest * req)882 async_request_discard_response(AsyncRequest *req)
883 {
884 	AsyncResponseResult *result = NULL;
885 
886 	Assert(req != NULL);
887 
888 	do
889 	{
890 		/* for row-by-row fetching we need to loop until we consume the whole response */
891 		result = async_request_wait_any_result(req);
892 		if (result != NULL)
893 			async_response_result_close(result);
894 	} while (result != NULL && req->state != COMPLETED);
895 }
896 
897 void
prepared_stmt_close(PreparedStmt * stmt)898 prepared_stmt_close(PreparedStmt *stmt)
899 {
900 	char sql[64] = { '\0' };
901 	int ret;
902 
903 	ret = snprintf(sql, sizeof(sql), "DEALLOCATE %s", stmt->stmt_name);
904 
905 	if (ret < 0 || ret >= sizeof(sql))
906 		elog(ERROR, "could not create deallocate statement");
907 
908 	async_request_wait_ok_command(async_request_send(stmt->conn, sql));
909 }
910 
911 /* Request must have been generated by async_request_send_prepare() */
912 PreparedStmt *
async_response_result_generate_prepared_stmt(AsyncResponseResult * result)913 async_response_result_generate_prepared_stmt(AsyncResponseResult *result)
914 {
915 	PreparedStmt *prep;
916 
917 	if (PQresultStatus(result->result) != PGRES_COMMAND_OK)
918 		async_response_report_error(&result->base, ERROR);
919 
920 	prep = palloc0(sizeof(PreparedStmt));
921 
922 	*prep = (PreparedStmt){
923 		.conn = result->request->conn,
924 		.sql = result->request->sql,
925 		.stmt_name = result->request->stmt_name,
926 		.n_params = result->request->prep_stmt_params,
927 	};
928 
929 	return prep;
930 }
931