1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6
7 #include <postgres.h>
8 #include <nodes/pg_list.h>
9 #include <libpq-fe.h>
10 #include <storage/latch.h>
11 #include <miscadmin.h>
12 #include <pgstat.h>
13 #include <fmgr.h>
14 #include <utils/lsyscache.h>
15 #include <catalog/pg_type.h>
16 #include <nodes/pathnodes.h>
17
18 #include <annotations.h>
19 #include "async.h"
20 #include "connection.h"
21 #include "utils.h"
22
23 /**
24 * State machine for AsyncRequest:
25 *
26 * +-------------+ +--------------+ +--------------+
27 * | | | | | |
28 * | DEFERRED +---------->+ EXECUTING +------->+ COMPLETED |
29 * | | | | | |
30 * +-------------+ +--------------+ +--------------+
31 *
32 **/
33
34 typedef enum AsyncRequestState
35 {
36 DEFERRED,
37 EXECUTING,
38 COMPLETED,
39 } AsyncRequestState;
40
41 typedef struct AsyncRequest
42 {
43 const char *sql;
44 TSConnection *conn;
45 AsyncRequestState state;
46 const char *stmt_name;
47 int prep_stmt_params;
48 async_response_callback response_cb;
49 void *user_data; /* custom data saved with the request */
50 StmtParams *params;
51 int res_format; /* text or binary */
52 bool is_xact_transition;
53 } AsyncRequest;
54
55 typedef struct PreparedStmt
56 {
57 const char *sql;
58 TSConnection *conn;
59 const char *stmt_name;
60 int n_params;
61 } PreparedStmt;
62
63 /* It is often useful to get the request along with the result in the response */
64
65 typedef struct AsyncResponse
66 {
67 AsyncResponseType type;
68 } AsyncResponse;
69
70 typedef struct AsyncResponseResult
71 {
72 AsyncResponse base;
73 PGresult *result;
74 AsyncRequest *request;
75 } AsyncResponseResult;
76
77 typedef struct AsyncResponseCommunicationError
78 {
79 AsyncResponse base;
80 AsyncRequest *request;
81 } AsyncResponseCommunicationError;
82
83 typedef struct AsyncResponseError
84 {
85 AsyncResponse base;
86 const char *errmsg;
87 } AsyncResponseError;
88
89 typedef struct AsyncRequestSet
90 {
91 List *requests;
92 } AsyncRequestSet;
93
94 static AsyncRequest *
async_request_create(TSConnection * conn,const char * sql,const char * stmt_name,int prep_stmt_params,StmtParams * stmt_params,int res_format)95 async_request_create(TSConnection *conn, const char *sql, const char *stmt_name,
96 int prep_stmt_params, StmtParams *stmt_params, int res_format)
97 {
98 AsyncRequest *req;
99
100 if (conn == NULL)
101 elog(ERROR, "can't create AsyncRequest with NULL connection");
102
103 req = palloc0(sizeof(AsyncRequest));
104 *req = (AsyncRequest){ .conn = conn,
105 .state = DEFERRED,
106 .sql = pstrdup(sql),
107 .stmt_name = stmt_name,
108 .params = stmt_params,
109 .prep_stmt_params = prep_stmt_params,
110 .res_format = res_format };
111
112 return req;
113 }
114
115 static void
async_request_set_state(AsyncRequest * req,AsyncRequestState new_state)116 async_request_set_state(AsyncRequest *req, AsyncRequestState new_state)
117 {
118 if (req->state != DEFERRED)
119 Assert(req->state != new_state);
120
121 #ifdef USE_ASSERT_CHECKING
122 switch (new_state)
123 {
124 case DEFERRED:
125 /* initial state */
126 Assert(req->state == DEFERRED);
127 break;
128 case EXECUTING:
129 Assert(req->state == DEFERRED);
130 break;
131 case COMPLETED:
132 Assert(req->state == EXECUTING);
133 }
134 #endif
135 req->state = new_state;
136 }
137
138 /* Send a request. In case there is an ongoing request for the connection,
139 we will not send the request but set its status to DEFERRED.
140 Getting a response from DEFERRED AsyncRequest will try sending it if
141 the connection is not in use.
142
143 Note that we can only send one sql statement per request.
144 This is because we use `PQsendQueryParams` which uses the extended query protocol
145 instead of the simple one. The extended protocol does not support multiple
146 statements. In the future we can use a `PQsendQuery` variant for queries without parameters,
147 which can support multiple statements because it uses the simple protocol. But this is
148 an optimization for another time.
149 */
150 static AsyncRequest *
async_request_send_internal(AsyncRequest * req,int elevel)151 async_request_send_internal(AsyncRequest *req, int elevel)
152 {
153 if (req->state != DEFERRED)
154 elog(elevel, "can't send async request in state \"%d\"", req->state);
155
156 if (remote_connection_is_processing(req->conn))
157 return req;
158
159 /* Send configuration parameters if necessary */
160 remote_connection_configure_if_changed(req->conn);
161
162 if (req->stmt_name)
163 {
164 /*
165 * We intentionally do not specify parameter types here, but leave the
166 * data node to derive them by default. This avoids possible problems
167 * with the data node using different type OIDs than we do. All of
168 * the prepared statements we use in this module are simple enough that
169 * the data node will make the right choices.
170 */
171 if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn),
172 req->stmt_name,
173 req->sql,
174 req->prep_stmt_params,
175 NULL))
176 {
177 /*
178 * null is fine to pass down as the res, the connection error message
179 * will get through
180 */
181 remote_connection_elog(req->conn, elevel);
182 return NULL;
183 }
184 }
185 else
186 {
187 if (0 == PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
188 req->sql,
189 stmt_params_total_values(req->params),
190 /* param types - see note above */ NULL,
191 stmt_params_values(req->params),
192 stmt_params_lengths(req->params),
193 stmt_params_formats(req->params),
194 req->res_format))
195 {
196 /*
197 * null is fine to pass down as the res, the connection error message
198 * will get through
199 */
200 remote_connection_elog(req->conn, elevel);
201 return NULL;
202 }
203 }
204 async_request_set_state(req, EXECUTING);
205 remote_connection_set_status(req->conn, CONN_PROCESSING);
206 return req;
207 }
208
209 AsyncRequest *
async_request_send_with_stmt_params_elevel_res_format(TSConnection * conn,const char * sql_statement,StmtParams * params,int elevel,int res_format)210 async_request_send_with_stmt_params_elevel_res_format(TSConnection *conn, const char *sql_statement,
211 StmtParams *params, int elevel,
212 int res_format)
213 {
214 AsyncRequest *req = async_request_create(conn, sql_statement, NULL, 0, params, res_format);
215 req = async_request_send_internal(req, elevel);
216 return req;
217 }
218
219 AsyncRequest *
async_request_send_prepare(TSConnection * conn,const char * sql,int n_params)220 async_request_send_prepare(TSConnection *conn, const char *sql, int n_params)
221 {
222 AsyncRequest *req;
223 size_t stmt_name_len = NAMEDATALEN;
224 char *stmt_name = palloc(sizeof(char) * stmt_name_len);
225 int written;
226
227 /* Construct name we'll use for the prepared statement. */
228 written =
229 snprintf(stmt_name, stmt_name_len, "ts_prep_%u", remote_connection_get_prep_stmt_number());
230
231 if (written < 0 || written >= stmt_name_len)
232 elog(ERROR, "cannot create prepared statement name");
233
234 req = async_request_create(conn, sql, stmt_name, n_params, NULL, FORMAT_TEXT);
235 req = async_request_send_internal(req, ERROR);
236
237 return req;
238 }
239
240 extern AsyncRequest *
async_request_send_prepared_stmt(PreparedStmt * stmt,const char * const * param_values)241 async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_values)
242 {
243 AsyncRequest *req =
244 async_request_create(stmt->conn,
245 stmt->sql,
246 NULL,
247 stmt->n_params,
248 stmt_params_create_from_values((const char **) param_values,
249 stmt->n_params),
250 FORMAT_TEXT);
251 return async_request_send_internal(req, ERROR);
252 }
253
254 AsyncRequest *
async_request_send_prepared_stmt_with_params(PreparedStmt * stmt,StmtParams * params,int res_format)255 async_request_send_prepared_stmt_with_params(PreparedStmt *stmt, StmtParams *params, int res_format)
256 {
257 AsyncRequest *req =
258 async_request_create(stmt->conn, stmt->sql, NULL, stmt->n_params, params, res_format);
259 return async_request_send_internal(req, ERROR);
260 }
261
262 /* Set user data. Often it is useful to attach data with a request so
263 that it can later be fetched from the response. */
264 void
async_request_attach_user_data(AsyncRequest * req,void * user_data)265 async_request_attach_user_data(AsyncRequest *req, void *user_data)
266 {
267 req->user_data = user_data;
268 }
269
270 void
async_request_set_response_callback(AsyncRequest * req,async_response_callback cb,void * user_data)271 async_request_set_response_callback(AsyncRequest *req, async_response_callback cb, void *user_data)
272 {
273 req->response_cb = cb;
274 req->user_data = user_data;
275 }
276
277 static AsyncResponseResult *
async_response_result_create(AsyncRequest * req,PGresult * res)278 async_response_result_create(AsyncRequest *req, PGresult *res)
279 {
280 AsyncResponseResult *ares;
281 AsyncResponseType type = RESPONSE_RESULT;
282
283 if (PQresultStatus(res) == PGRES_SINGLE_TUPLE)
284 type = RESPONSE_ROW;
285
286 ares = palloc0(sizeof(AsyncResponseResult));
287
288 *ares = (AsyncResponseResult){
289 .base = { .type = type },
290 .request = req,
291 .result = res,
292 };
293
294 return ares;
295 }
296
297 static AsyncResponseCommunicationError *
async_response_communication_error_create(AsyncRequest * req)298 async_response_communication_error_create(AsyncRequest *req)
299 {
300 AsyncResponseCommunicationError *ares = palloc0(sizeof(AsyncResponseCommunicationError));
301
302 *ares = (AsyncResponseCommunicationError){
303 .base = { .type = RESPONSE_COMMUNICATION_ERROR },
304 .request = req,
305 };
306
307 return ares;
308 }
309
310 static AsyncResponse *
async_response_timeout_create()311 async_response_timeout_create()
312 {
313 AsyncResponse *ares = palloc0(sizeof(AsyncResponse));
314
315 *ares = (AsyncResponse){
316 .type = RESPONSE_TIMEOUT,
317 };
318
319 return ares;
320 }
321
322 static AsyncResponse *
async_response_error_create(const char * errmsg)323 async_response_error_create(const char *errmsg)
324 {
325 AsyncResponseError *ares = palloc0(sizeof(AsyncResponseError));
326
327 *ares = (AsyncResponseError){
328 .base = { .type = RESPONSE_ERROR },
329 .errmsg = pstrdup(errmsg),
330 };
331
332 return &ares->base;
333 }
334
335 void
async_response_result_close(AsyncResponseResult * res)336 async_response_result_close(AsyncResponseResult *res)
337 {
338 PQclear(res->result);
339 pfree(res);
340 }
341
342 /* Closes the async response. Note that `async_response_report_error` does this automatically. */
343 void
async_response_close(AsyncResponse * res)344 async_response_close(AsyncResponse *res)
345 {
346 switch (res->type)
347 {
348 case RESPONSE_RESULT:
349 case RESPONSE_ROW:
350 async_response_result_close((AsyncResponseResult *) res);
351 break;
352 default:
353 pfree(res);
354 break;
355 }
356 }
357
358 AsyncResponseType
async_response_get_type(AsyncResponse * res)359 async_response_get_type(AsyncResponse *res)
360 {
361 return res->type;
362 }
363
364 /* get the user data attached to the corresponding request */
365 void *
async_response_result_get_user_data(AsyncResponseResult * res)366 async_response_result_get_user_data(AsyncResponseResult *res)
367 {
368 return res->request->user_data;
369 }
370
371 PGresult *
async_response_result_get_pg_result(AsyncResponseResult * res)372 async_response_result_get_pg_result(AsyncResponseResult *res)
373 {
374 return res->result;
375 }
376
377 AsyncRequest *
async_response_result_get_request(AsyncResponseResult * res)378 async_response_result_get_request(AsyncResponseResult *res)
379 {
380 return res->request;
381 }
382
383 bool
async_request_set_single_row_mode(AsyncRequest * req)384 async_request_set_single_row_mode(AsyncRequest *req)
385 {
386 return remote_connection_set_single_row_mode(req->conn);
387 }
388
389 TSConnection *
async_request_get_connection(AsyncRequest * req)390 async_request_get_connection(AsyncRequest *req)
391 {
392 return req->conn;
393 }
394
395 void
async_response_report_error(AsyncResponse * res,int elevel)396 async_response_report_error(AsyncResponse *res, int elevel)
397 {
398 switch (res->type)
399 {
400 case RESPONSE_RESULT:
401 case RESPONSE_ROW:
402 {
403 AsyncResponseResult *aresult = (AsyncResponseResult *) res;
404 ExecStatusType status = PQresultStatus(aresult->result);
405
406 switch (status)
407 {
408 case PGRES_COMMAND_OK:
409 case PGRES_TUPLES_OK:
410 case PGRES_SINGLE_TUPLE:
411 break;
412 case PGRES_NONFATAL_ERROR:
413 case PGRES_FATAL_ERROR:
414 /* result is closed by remote_result_elog in case it throws
415 * error */
416 remote_result_elog(aresult->result, elevel);
417 break;
418 default:
419 PG_TRY();
420 {
421 elog(elevel, "unexpected response status %u", status);
422 }
423 PG_CATCH();
424 {
425 async_response_close(res);
426 PG_RE_THROW();
427 }
428 PG_END_TRY();
429 }
430 break;
431 }
432 case RESPONSE_COMMUNICATION_ERROR:
433 remote_connection_elog(((AsyncResponseCommunicationError *) res)->request->conn,
434 elevel);
435 break;
436 case RESPONSE_ERROR:
437 elog(elevel, "%s", ((AsyncResponseError *) res)->errmsg);
438 break;
439 case RESPONSE_TIMEOUT:
440 elog(elevel, "async operation timed out");
441 }
442 }
443
444 void
async_response_report_error_or_close(AsyncResponse * res,int elevel)445 async_response_report_error_or_close(AsyncResponse *res, int elevel)
446 {
447 async_response_report_error(res, elevel);
448 async_response_close(res);
449 }
450
451 /*
452 * This is a convenience function to wait for a single result from a request.
453 * This function requires that the request is for a single SQL statement.
454 */
455 AsyncResponseResult *
async_request_wait_any_result(AsyncRequest * req)456 async_request_wait_any_result(AsyncRequest *req)
457 {
458 AsyncRequestSet set = { 0 };
459 AsyncResponseResult *result;
460
461 async_request_set_add(&set, req);
462 result = async_request_set_wait_any_result(&set);
463
464 /* Should expect exactly one response */
465 if (NULL == result)
466 elog(ERROR, "remote request failed");
467
468 /* Make sure to drain the connection only if we've retrieved complete result set */
469 if (result->base.type == RESPONSE_RESULT)
470 {
471 AsyncResponseResult *extra;
472 bool got_extra = false;
473
474 /* Must drain any remaining result until NULL */
475 while ((extra = async_request_set_wait_any_result(&set)))
476 {
477 async_response_result_close(extra);
478 got_extra = true;
479 }
480
481 if (got_extra)
482 {
483 async_response_result_close(result);
484 elog(ERROR, "request must be for one sql statement");
485 }
486 }
487
488 return result;
489 }
490
491 AsyncResponseResult *
async_request_wait_ok_result(AsyncRequest * req)492 async_request_wait_ok_result(AsyncRequest *req)
493 {
494 AsyncResponseResult *res = async_request_wait_any_result(req);
495
496 if (PQresultStatus(res->result) != PGRES_COMMAND_OK &&
497 PQresultStatus(res->result) != PGRES_TUPLES_OK)
498 {
499 async_response_report_error(&res->base, ERROR);
500 Assert(false);
501 }
502
503 return res;
504 }
505
506 /*
507 * Get the result of an async request during cleanup.
508 *
509 * Cleanup is typically necessary for a query that is being interrupted by
510 * transaction abort, or a query that was initiated as part of transaction
511 * abort to get the remote side back to the appropriate state.
512 *
513 * endtime is the time at which we should give up and assume the remote
514 * side is dead.
515 *
516 * An AsyncReponse is always returned, indicating last PGresult received,
517 * a timeout, or error.
518 */
519 AsyncResponse *
async_request_cleanup_result(AsyncRequest * req,TimestampTz endtime)520 async_request_cleanup_result(AsyncRequest *req, TimestampTz endtime)
521 {
522 TSConnection *conn = async_request_get_connection(req);
523 PGresult *last_res = NULL;
524 AsyncResponse *rsp = NULL;
525
526 switch (req->state)
527 {
528 case DEFERRED:
529 if (remote_connection_is_processing(req->conn))
530 return async_response_error_create("request already in progress");
531
532 req = async_request_send_internal(req, WARNING);
533
534 if (req == NULL)
535 return async_response_error_create("failed to send deferred request");
536
537 Assert(req->state == EXECUTING);
538 break;
539 case EXECUTING:
540 break;
541 case COMPLETED:
542 return async_response_error_create("request already completed");
543 }
544
545 switch (remote_connection_drain(conn, endtime, &last_res))
546 {
547 case CONN_TIMEOUT:
548 rsp = async_response_timeout_create();
549 break;
550 case CONN_DISCONNECT:
551 rsp = &async_response_communication_error_create(req)->base;
552 break;
553 case CONN_NO_RESPONSE:
554 rsp = async_response_error_create("no response during cleanup");
555 break;
556 case CONN_OK:
557 Assert(last_res != NULL);
558 rsp = &async_response_result_create(req, last_res)->base;
559 break;
560 }
561
562 Assert(rsp != NULL);
563
564 return rsp;
565 }
566
567 void
async_request_wait_ok_command(AsyncRequest * req)568 async_request_wait_ok_command(AsyncRequest *req)
569 {
570 AsyncResponseResult *res = async_request_wait_any_result(req);
571
572 if (PQresultStatus(res->result) != PGRES_COMMAND_OK)
573 {
574 async_response_report_error(&res->base, ERROR);
575 Assert(false);
576 }
577
578 async_response_result_close(res);
579 }
580
581 PreparedStmt *
async_request_wait_prepared_statement(AsyncRequest * request)582 async_request_wait_prepared_statement(AsyncRequest *request)
583 {
584 AsyncResponseResult *result;
585 PreparedStmt *prep;
586
587 Assert(request->stmt_name != NULL);
588
589 result = async_request_wait_ok_result(request);
590 prep = async_response_result_generate_prepared_stmt(result);
591 async_response_result_close(result);
592
593 return prep;
594 }
595
596 AsyncRequestSet *
async_request_set_create()597 async_request_set_create()
598 {
599 return palloc0(sizeof(AsyncRequestSet));
600 }
601
602 void
async_request_set_add(AsyncRequestSet * set,AsyncRequest * req)603 async_request_set_add(AsyncRequestSet *set, AsyncRequest *req)
604 {
605 set->requests = list_append_unique_ptr(set->requests, req);
606 }
607
608 static AsyncResponse *
get_single_response_nonblocking(AsyncRequestSet * set)609 get_single_response_nonblocking(AsyncRequestSet *set)
610 {
611 ListCell *lc;
612
613 foreach (lc, set->requests)
614 {
615 AsyncRequest *req = lfirst(lc);
616 PGconn *pg_conn = remote_connection_get_pg_conn(req->conn);
617
618 switch (req->state)
619 {
620 case DEFERRED:
621 if (remote_connection_is_processing(req->conn))
622 return async_response_error_create("request already in progress");
623
624 req = async_request_send_internal(req, WARNING);
625
626 if (req == NULL)
627 return async_response_error_create("failed to send deferred request");
628
629 Assert(req->state == EXECUTING);
630 TS_FALLTHROUGH;
631 case EXECUTING:
632 if (0 == PQisBusy(pg_conn))
633 {
634 PGresult *res = PQgetResult(pg_conn);
635
636 if (NULL == res)
637 {
638 /*
639 * NULL return means query is complete
640 */
641 set->requests = list_delete_ptr(set->requests, req);
642 remote_connection_set_status(req->conn, CONN_IDLE);
643 async_request_set_state(req, COMPLETED);
644
645 /* set changed so rerun function */
646 return get_single_response_nonblocking(set);
647 }
648 return &async_response_result_create(req, res)->base;
649 }
650 break;
651 case COMPLETED:
652 return async_response_error_create("request already completed");
653 }
654 }
655
656 return NULL;
657 }
658
659 /*
660 * wait_to_consume_data waits until data is recieved and put into buffers
661 * so that it can be recieved without blocking by `get_single_response_nonblocking`
662 * or similar.
663 *
664 * Returns NULL on success or an "error" AsyncResponse
665 */
666 static AsyncResponse *
wait_to_consume_data(AsyncRequestSet * set,TimestampTz end_time)667 wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
668 {
669 /*
670 * Looks like there is no good way to modify a WaitEventSet so we have to
671 * make a new one, otherwise we can't turn off wait events
672 */
673 WaitEventSet *we_set;
674 ListCell *lc;
675 int rc;
676 WaitEvent event;
677 uint32 wait_event_info = PG_WAIT_EXTENSION;
678 AsyncRequest *wait_req;
679 AsyncResponse *result;
680 long timeout_ms = -1L;
681
682 Assert(list_length(set->requests) > 0);
683
684 if (end_time != TS_NO_TIMEOUT)
685 {
686 TimestampTz now = GetCurrentTimestamp();
687 long secs;
688 int microsecs;
689
690 if (now >= end_time)
691 return async_response_timeout_create();
692
693 TimestampDifference(now, end_time, &secs, µsecs);
694 timeout_ms = secs * 1000 + (microsecs / 1000);
695 }
696
697 we_set = CreateWaitEventSet(CurrentMemoryContext, list_length(set->requests) + 1);
698
699 /* always wait for my latch */
700 AddWaitEventToSet(we_set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) MyLatch, NULL);
701
702 foreach (lc, set->requests)
703 {
704 AsyncRequest *req = lfirst(lc);
705
706 AddWaitEventToSet(we_set,
707 WL_SOCKET_READABLE,
708 PQsocket(remote_connection_get_pg_conn(req->conn)),
709 NULL,
710 req);
711 }
712
713 while (true)
714 {
715 wait_req = NULL;
716 rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, wait_event_info);
717
718 if (rc == 0)
719 {
720 result = async_response_timeout_create();
721 break;
722 }
723
724 CHECK_FOR_INTERRUPTS();
725
726 if (event.events & WL_LATCH_SET)
727 ResetLatch(MyLatch);
728 else if (event.events & WL_SOCKET_READABLE)
729 {
730 wait_req = event.user_data;
731 Assert(wait_req != NULL);
732
733 if (0 == PQconsumeInput(remote_connection_get_pg_conn(wait_req->conn)))
734 {
735 /* remove connection from set */
736 set->requests = list_delete_ptr(set->requests, wait_req);
737 result = &async_response_communication_error_create(wait_req)->base;
738 break;
739 }
740 result = NULL;
741 break;
742 }
743 else
744 {
745 result = async_response_error_create("unexpected event");
746 break;
747 }
748 }
749
750 FreeWaitEventSet(we_set);
751 return result;
752 }
753
754 /* Return NULL when nothing more to do in set */
755 AsyncResponse *
async_request_set_wait_any_response_deadline(AsyncRequestSet * set,TimestampTz endtime)756 async_request_set_wait_any_response_deadline(AsyncRequestSet *set, TimestampTz endtime)
757 {
758 AsyncResponse *response;
759
760 while (true)
761 {
762 response = get_single_response_nonblocking(set);
763
764 if (response != NULL)
765 break;
766
767 if (list_length(set->requests) == 0)
768 /* nothing to wait on anymore */
769 return NULL;
770
771 response = wait_to_consume_data(set, endtime);
772
773 if (response != NULL)
774 break;
775 }
776
777 /* Make sure callbacks are run when a response is received. For a timeout,
778 * we run the callbacks on all the requests the user has been waiting
779 * on. */
780 if (NULL != response)
781 {
782 List *requests = NIL;
783 ListCell *lc;
784
785 switch (response->type)
786 {
787 case RESPONSE_RESULT:
788 case RESPONSE_ROW:
789 requests = list_make1(((AsyncResponseResult *) response)->request);
790 break;
791 case RESPONSE_COMMUNICATION_ERROR:
792 requests = list_make1(((AsyncResponseCommunicationError *) response)->request);
793 break;
794 case RESPONSE_ERROR:
795 case RESPONSE_TIMEOUT:
796 requests = set->requests;
797 break;
798 }
799
800 foreach (lc, requests)
801 {
802 AsyncRequest *req = lfirst(lc);
803
804 if (NULL != req->response_cb)
805 req->response_cb(req, response, req->user_data);
806 }
807 }
808
809 return response;
810 }
811
812 AsyncResponseResult *
async_request_set_wait_any_result(AsyncRequestSet * set)813 async_request_set_wait_any_result(AsyncRequestSet *set)
814 {
815 AsyncResponse *res = async_request_set_wait_any_response(set);
816
817 if (res == NULL)
818 return NULL;
819
820 if (!(RESPONSE_RESULT == res->type || RESPONSE_ROW == res->type))
821 async_response_report_error(res, ERROR);
822
823 return (AsyncResponseResult *) res;
824 }
825
826 AsyncResponseResult *
async_request_set_wait_ok_result(AsyncRequestSet * set)827 async_request_set_wait_ok_result(AsyncRequestSet *set)
828 {
829 AsyncResponseResult *response_result = async_request_set_wait_any_result(set);
830 ExecStatusType status;
831
832 if (response_result == NULL)
833 return NULL;
834
835 status = PQresultStatus(response_result->result);
836
837 if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK)
838 {
839 async_response_report_error(&response_result->base, ERROR);
840 Assert(false);
841 }
842
843 return response_result;
844 }
845
846 void
async_request_set_wait_all_ok_commands(AsyncRequestSet * set)847 async_request_set_wait_all_ok_commands(AsyncRequestSet *set)
848 {
849 AsyncResponse *rsp;
850 AsyncResponse *bad_rsp = NULL;
851
852 /* Drain all responses and record the first error */
853 while ((rsp = async_request_set_wait_any_response(set)))
854 {
855 switch (async_response_get_type(rsp))
856 {
857 case RESPONSE_RESULT:
858 case RESPONSE_ROW:
859 {
860 AsyncResponseResult *ar = (AsyncResponseResult *) rsp;
861 ExecStatusType status = PQresultStatus(async_response_result_get_pg_result(ar));
862
863 if (status != PGRES_COMMAND_OK && bad_rsp == NULL)
864 bad_rsp = rsp;
865 else
866 async_response_result_close(ar);
867 break;
868 }
869 default:
870 if (bad_rsp == NULL)
871 bad_rsp = rsp;
872 break;
873 }
874 }
875
876 /* Throw error once request set is drained */
877 if (bad_rsp != NULL)
878 async_response_report_error(bad_rsp, ERROR);
879 }
880
881 void
async_request_discard_response(AsyncRequest * req)882 async_request_discard_response(AsyncRequest *req)
883 {
884 AsyncResponseResult *result = NULL;
885
886 Assert(req != NULL);
887
888 do
889 {
890 /* for row-by-row fetching we need to loop until we consume the whole response */
891 result = async_request_wait_any_result(req);
892 if (result != NULL)
893 async_response_result_close(result);
894 } while (result != NULL && req->state != COMPLETED);
895 }
896
897 void
prepared_stmt_close(PreparedStmt * stmt)898 prepared_stmt_close(PreparedStmt *stmt)
899 {
900 char sql[64] = { '\0' };
901 int ret;
902
903 ret = snprintf(sql, sizeof(sql), "DEALLOCATE %s", stmt->stmt_name);
904
905 if (ret < 0 || ret >= sizeof(sql))
906 elog(ERROR, "could not create deallocate statement");
907
908 async_request_wait_ok_command(async_request_send(stmt->conn, sql));
909 }
910
911 /* Request must have been generated by async_request_send_prepare() */
912 PreparedStmt *
async_response_result_generate_prepared_stmt(AsyncResponseResult * result)913 async_response_result_generate_prepared_stmt(AsyncResponseResult *result)
914 {
915 PreparedStmt *prep;
916
917 if (PQresultStatus(result->result) != PGRES_COMMAND_OK)
918 async_response_report_error(&result->base, ERROR);
919
920 prep = palloc0(sizeof(PreparedStmt));
921
922 *prep = (PreparedStmt){
923 .conn = result->request->conn,
924 .sql = result->request->sql,
925 .stmt_name = result->request->stmt_name,
926 .n_params = result->request->prep_stmt_params,
927 };
928
929 return prep;
930 }
931