1 /*
2 * PL/Proxy - easy access to partitioned database.
3 *
4 * Copyright (c) 2006-2020 PL/Proxy Authors
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Actual execution logic is here.
21 *
22 * - Tag particural databases, where query must be sent.
23 * - Send the query.
24 * - Fetch the results.
25 */
26
27 #include "plproxy.h"
28
29 #include <sys/time.h>
30
31 /* find poll() */
32 #if defined(HAVE_POLL_H)
33 #include <poll.h>
34 #elif defined(WIN32)
35 #define poll(fds, nfds, timeout_ms) WSAPoll(fds, nfds, timeout_ms)
36 #elif !defined(POLLIN)
37 #error "PL/Proxy requires poll() API"
38 #endif
39
40 /* some error happened */
41 static void
conn_error(ProxyFunction * func,ProxyConnection * conn,const char * desc)42 conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
43 {
44 plproxy_error(func, "[%s] %s: %s",
45 PQdb(conn->cur->db), desc, PQerrorMessage(conn->cur->db));
46 }
47
48 /* Compare if major/minor match. Works on "MAJ.MIN.*" */
49 static bool
cmp_branch(const char * this,const char * that)50 cmp_branch(const char *this, const char *that)
51 {
52 int dot = 0;
53 int i;
54
55 for (i = 0; this[i] || that[i]; i++)
56 {
57 /* allow just maj.min verson */
58 if (dot && this[i] == '.' && !that[i])
59 return true;
60 if (dot && that[i] == '.' && !this[i])
61 return true;
62
63 /* compare, different length is also handled here */
64 if (this[i] != that[i])
65 return false;
66
67 /* stop on second dot */
68 if (this[i] == '.' && dot++)
69 return true;
70 }
71 return true;
72 }
73
74 static void
flush_connection(ProxyFunction * func,ProxyConnection * conn)75 flush_connection(ProxyFunction *func, ProxyConnection *conn)
76 {
77 int res;
78
79 /* flush it down */
80 res = PQflush(conn->cur->db);
81
82 /* set actual state */
83 if (res > 0)
84 conn->cur->state = C_QUERY_WRITE;
85 else if (res == 0)
86 conn->cur->state = C_QUERY_READ;
87 else
88 conn_error(func, conn, "PQflush");
89 }
90
91 /*
92 * Small sanity checking for new connections.
93 *
94 * Current checks:
95 * - Does there happen any encoding conversations?
96 * - Difference in standard_conforming_strings.
97 */
98 static int
tune_connection(ProxyFunction * func,ProxyConnection * conn)99 tune_connection(ProxyFunction *func, ProxyConnection *conn)
100 {
101 const char *this_enc, *dst_enc;
102 const char *dst_ver;
103 StringInfo sql = NULL;
104
105 /*
106 * check if target server has same backend version.
107 */
108 dst_ver = PQparameterStatus(conn->cur->db, "server_version");
109 conn->cur->same_ver = cmp_branch(dst_ver, PG_VERSION);
110
111 /*
112 * Make sure remote I/O is done using local server_encoding.
113 */
114 this_enc = GetDatabaseEncodingName();
115 dst_enc = PQparameterStatus(conn->cur->db, "client_encoding");
116 if (dst_enc && strcmp(this_enc, dst_enc))
117 {
118 if (!sql)
119 sql = makeStringInfo();
120 appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc);
121 }
122
123 /*
124 * if second time in this function, they should be active already.
125 */
126 if (sql && conn->cur->tuning)
127 {
128 /* display SET query */
129 appendStringInfo(sql, "-- does not seem to apply");
130 conn_error(func, conn, sql->data);
131 }
132
133 /*
134 * send tuning query
135 */
136 if (sql)
137 {
138 conn->cur->tuning = 1;
139 conn->cur->state = C_QUERY_WRITE;
140 if (!PQsendQuery(conn->cur->db, sql->data))
141 conn_error(func, conn, "PQsendQuery");
142 pfree(sql->data);
143 pfree(sql);
144
145 flush_connection(func, conn);
146 return 1;
147 }
148
149 conn->cur->tuning = 0;
150 return 0;
151 }
152
153 /* send the query to server connection */
154 static void
send_query(ProxyFunction * func,ProxyConnection * conn,const char ** values,int * plengths,int * pformats)155 send_query(ProxyFunction *func, ProxyConnection *conn,
156 const char **values, int *plengths, int *pformats)
157 {
158 int res;
159 struct timeval now;
160 ProxyQuery *q = func->remote_sql;
161 ProxyConfig *cf = &func->cur_cluster->config;
162 int binary_result = 0;
163
164 gettimeofday(&now, NULL);
165 conn->cur->query_time = now.tv_sec;
166
167 tune_connection(func, conn);
168 if (conn->cur->tuning)
169 return;
170
171 /* use binary result only on same backend ver */
172 if (cf->disable_binary == 0 && conn->cur->same_ver)
173 {
174 /* binary recv for non-record types */
175 if (func->ret_scalar)
176 {
177 if (func->ret_scalar->has_recv)
178 binary_result = 1;
179 }
180 else
181 {
182 if (func->ret_composite->use_binary)
183 binary_result = 1;
184 }
185 }
186
187 /* send query */
188 conn->cur->state = C_QUERY_WRITE;
189 res = PQsendQueryParams(conn->cur->db, q->sql, q->arg_count,
190 NULL, /* paramTypes */
191 values, /* paramValues */
192 plengths, /* paramLengths */
193 pformats, /* paramFormats */
194 binary_result); /* resultformat, 0-text, 1-bin */
195 if (!res)
196 conn_error(func, conn, "PQsendQueryParams");
197
198 /* flush it down */
199 flush_connection(func, conn);
200 }
201
202 /* returns false of conn should be dropped */
203 static bool
check_old_conn(ProxyFunction * func,ProxyConnection * conn,struct timeval * now)204 check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
205 {
206 time_t t;
207 int res;
208 struct pollfd pfd;
209 ProxyConfig *cf = &func->cur_cluster->config;
210
211 if (PQstatus(conn->cur->db) != CONNECTION_OK)
212 return false;
213
214 /* check if too old */
215 if (cf->connection_lifetime > 0)
216 {
217 t = now->tv_sec - conn->cur->connect_time;
218 if (t >= cf->connection_lifetime)
219 return false;
220 }
221
222 /* how long ts been idle */
223 t = now->tv_sec - conn->cur->query_time;
224 if (t < PLPROXY_IDLE_CONN_CHECK)
225 return true;
226
227 /*
228 * Simple way to check if old connection is stable - look if there
229 * are events pending. If there are drop the connection.
230 */
231 intr_loop:
232 pfd.fd = PQsocket(conn->cur->db);
233 pfd.events = POLLIN;
234 pfd.revents = 0;
235
236 res = poll(&pfd, 1, 0);
237 if (res > 0)
238 {
239 elog(WARNING, "PL/Proxy: detected unstable connection");
240 return false;
241 }
242 else if (res < 0)
243 {
244 if (errno == EINTR)
245 goto intr_loop;
246 plproxy_error(func, "check_old_conn: select failed: %s",
247 strerror(errno));
248 }
249
250 /* seems ok */
251 return true;
252 }
253
254 static void
handle_notice(void * arg,const PGresult * res)255 handle_notice(void *arg, const PGresult *res)
256 {
257 ProxyConnection *conn = arg;
258 ProxyCluster *cluster = conn->cluster;
259 plproxy_remote_error(cluster->cur_func, conn, res, false);
260 }
261
262 static const char *
get_connstr(ProxyConnection * conn)263 get_connstr(ProxyConnection *conn)
264 {
265 StringInfoData cstr;
266 ConnUserInfo *info = conn->cluster->cur_userinfo;
267
268 if (strstr(conn->connstr, "user=") != NULL)
269 return pstrdup(conn->connstr);
270
271 initStringInfo(&cstr);
272 appendStringInfoString(&cstr, conn->connstr);
273 if (info->extra_connstr)
274 appendStringInfo(&cstr, " %s", info->extra_connstr);
275 else
276 plproxy_append_cstr_option(&cstr, "user", info->username);
277 return cstr.data;
278 }
279
280 /* check existing conn status or launch new conn */
281 static void
prepare_conn(ProxyFunction * func,ProxyConnection * conn)282 prepare_conn(ProxyFunction *func, ProxyConnection *conn)
283 {
284 struct timeval now;
285 const char *connstr;
286
287 gettimeofday(&now, NULL);
288
289 conn->cur->waitCancel = 0;
290
291 /* state should be C_READY or C_NONE */
292 switch (conn->cur->state)
293 {
294 case C_DONE:
295 conn->cur->state = C_READY;
296 /* fallthrough */
297 case C_READY:
298 if (check_old_conn(func, conn, &now))
299 return;
300 /* fallthrough */
301 case C_CONNECT_READ:
302 case C_CONNECT_WRITE:
303 case C_QUERY_READ:
304 case C_QUERY_WRITE:
305 /* close rotten connection */
306 elog(NOTICE, "PL/Proxy: dropping stale conn");
307 plproxy_disconnect(conn->cur);
308 case C_NONE:
309 break;
310 }
311
312 conn->cur->connect_time = now.tv_sec;
313
314 /* launch new connection */
315 connstr = get_connstr(conn);
316 conn->cur->db = PQconnectStart(connstr);
317 if (conn->cur->db == NULL)
318 plproxy_error(func, "No memory for PGconn");
319
320 /* tag connection dirty */
321 conn->cur->state = C_CONNECT_WRITE;
322
323 if (PQstatus(conn->cur->db) == CONNECTION_BAD)
324 conn_error(func, conn, "PQconnectStart");
325
326 /* override default notice handler */
327 PQsetNoticeReceiver(conn->cur->db, handle_notice, conn);
328 }
329
330 /*
331 * Connection has a resultset avalable, fetch it.
332 *
333 * Returns true if there may be more results coming,
334 * false if all done.
335 */
336 static bool
another_result(ProxyFunction * func,ProxyConnection * conn)337 another_result(ProxyFunction *func, ProxyConnection *conn)
338 {
339 PGresult *res;
340
341 /* got one */
342 res = PQgetResult(conn->cur->db);
343 if (res == NULL)
344 {
345 conn->cur->waitCancel = 0;
346 if (conn->cur->tuning)
347 conn->cur->state = C_READY;
348 else
349 conn->cur->state = C_DONE;
350 return false;
351 }
352
353 /* ignore result when waiting for cancel */
354 if (conn->cur->waitCancel)
355 {
356 PQclear(res);
357 return true;
358 }
359
360 switch (PQresultStatus(res))
361 {
362 case PGRES_TUPLES_OK:
363 if (conn->res)
364 {
365 PQclear(res);
366 conn_error(func, conn, "double result?");
367 }
368 conn->res = res;
369 break;
370 case PGRES_COMMAND_OK:
371 PQclear(res);
372 break;
373 case PGRES_FATAL_ERROR:
374 if (conn->res)
375 PQclear(conn->res);
376 conn->res = res;
377
378 plproxy_remote_error(func, conn, res, true);
379 break;
380 default:
381 if (conn->res)
382 PQclear(conn->res);
383 conn->res = res;
384
385 plproxy_error(func, "Unexpected result type: %s", PQresStatus(PQresultStatus(res)));
386 break;
387 }
388 return true;
389 }
390
391 /*
392 * Called when select() told that conn is avail for reading/writing.
393 *
394 * It should call postgres handlers and then change state if needed.
395 */
396 static void
handle_conn(ProxyFunction * func,ProxyConnection * conn)397 handle_conn(ProxyFunction *func, ProxyConnection *conn)
398 {
399 int res;
400 PostgresPollingStatusType poll_res;
401
402 switch (conn->cur->state)
403 {
404 case C_CONNECT_READ:
405 case C_CONNECT_WRITE:
406 poll_res = PQconnectPoll(conn->cur->db);
407 switch (poll_res)
408 {
409 case PGRES_POLLING_WRITING:
410 conn->cur->state = C_CONNECT_WRITE;
411 break;
412 case PGRES_POLLING_READING:
413 conn->cur->state = C_CONNECT_READ;
414 break;
415 case PGRES_POLLING_OK:
416 conn->cur->state = C_READY;
417 break;
418 case PGRES_POLLING_ACTIVE:
419 case PGRES_POLLING_FAILED:
420 conn_error(func, conn, "PQconnectPoll");
421 }
422 break;
423 case C_QUERY_WRITE:
424 flush_connection(func, conn);
425 break;
426 case C_QUERY_READ:
427 res = PQconsumeInput(conn->cur->db);
428 if (res == 0)
429 conn_error(func, conn, "PQconsumeInput");
430
431 /* loop until PQgetResult returns NULL */
432 while (1)
433 {
434 /* if PQisBusy, then incomplete result */
435 if (PQisBusy(conn->cur->db))
436 break;
437
438 /* got one */
439 if (!another_result(func, conn))
440 break;
441 }
442 case C_NONE:
443 case C_DONE:
444 case C_READY:
445 break;
446 }
447 }
448
449 /*
450 * Check if tagged connections have interesting events.
451 *
452 * Currenly uses select() as it should be enough
453 * on small number of sockets.
454 */
455 static int
poll_conns(ProxyFunction * func,ProxyCluster * cluster)456 poll_conns(ProxyFunction *func, ProxyCluster *cluster)
457 {
458 static struct pollfd *pfd_cache = NULL;
459 static int pfd_allocated = 0;
460
461 int i,
462 res,
463 fd;
464 ProxyConnection *conn;
465 struct pollfd *pf;
466 int numfds = 0;
467 int ev = 0;
468
469 if (pfd_allocated < cluster->active_count)
470 {
471 struct pollfd *tmp;
472 int num = cluster->active_count;
473 if (num < 64)
474 num = 64;
475 if (pfd_cache == NULL)
476 tmp = malloc(num * sizeof(struct pollfd));
477 else
478 tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
479 if (!tmp)
480 elog(ERROR, "no mem for pollfd cache");
481 pfd_cache = tmp;
482 pfd_allocated = num;
483 }
484
485 for (i = 0; i < cluster->active_count; i++)
486 {
487 conn = cluster->active_list[i];
488 if (!conn->run_tag)
489 continue;
490
491 /* decide what to do */
492 switch (conn->cur->state)
493 {
494 case C_DONE:
495 case C_READY:
496 case C_NONE:
497 continue;
498 case C_CONNECT_READ:
499 case C_QUERY_READ:
500 ev = POLLIN;
501 break;
502 case C_CONNECT_WRITE:
503 case C_QUERY_WRITE:
504 ev = POLLOUT;
505 break;
506 }
507
508 /* add fd to proper set */
509 pf = pfd_cache + numfds++;
510 pf->fd = PQsocket(conn->cur->db);
511 pf->events = ev;
512 pf->revents = 0;
513 }
514
515 /* wait for events */
516 res = poll(pfd_cache, numfds, 1000);
517 if (res == 0)
518 return 0;
519 if (res < 0)
520 {
521 if (errno == EINTR)
522 return 0;
523 plproxy_error(func, "poll() failed: %s", strerror(errno));
524 }
525
526 /* now recheck the conns */
527 pf = pfd_cache;
528 for (i = 0; i < cluster->active_count; i++)
529 {
530 conn = cluster->active_list[i];
531 if (!conn->run_tag)
532 continue;
533
534 switch (conn->cur->state)
535 {
536 case C_DONE:
537 case C_READY:
538 case C_NONE:
539 continue;
540 case C_CONNECT_READ:
541 case C_QUERY_READ:
542 case C_CONNECT_WRITE:
543 case C_QUERY_WRITE:
544 break;
545 }
546
547 /*
548 * they should be in same order as called,
549 */
550 fd = PQsocket(conn->cur->db);
551 if (pf->fd != fd)
552 elog(WARNING, "fd order from poll() is messed up?");
553
554 if (pf->revents)
555 handle_conn(func, conn);
556
557 pf++;
558 }
559 return 1;
560 }
561
562 /* Check if some operation has gone over limit */
563 static void
check_timeouts(ProxyFunction * func,ProxyCluster * cluster,ProxyConnection * conn,time_t now)564 check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn, time_t now)
565 {
566 ProxyConfig *cf = &cluster->config;
567
568 switch (conn->cur->state)
569 {
570 case C_CONNECT_READ:
571 case C_CONNECT_WRITE:
572 if (cf->connect_timeout <= 0)
573 break;
574 if (now - conn->cur->connect_time <= cf->connect_timeout)
575 break;
576 plproxy_error(func, "connect timeout to: %s", conn->connstr);
577 break;
578
579 case C_QUERY_READ:
580 case C_QUERY_WRITE:
581 if (cf->query_timeout <= 0)
582 break;
583 if (now - conn->cur->query_time <= cf->query_timeout)
584 break;
585 plproxy_error(func, "query timeout");
586 break;
587 default:
588 break;
589 }
590 }
591
592 /* Run the query on all tagged connections in parallel */
593 static void
remote_execute(ProxyFunction * func)594 remote_execute(ProxyFunction *func)
595 {
596 ExecStatusType err;
597 ProxyConnection *conn;
598 ProxyCluster *cluster = func->cur_cluster;
599 int i,
600 pending = 0;
601 struct timeval now;
602
603 /* either launch connection or send query */
604 for (i = 0; i < cluster->active_count; i++)
605 {
606 conn = cluster->active_list[i];
607 if (!conn->run_tag)
608 continue;
609
610 /* check if conn is alive, and launch if not */
611 prepare_conn(func, conn);
612 pending++;
613
614 /* if conn is ready, then send query away */
615 if (conn->cur->state == C_READY)
616 send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
617 }
618
619 /* now loop until all results are arrived */
620 while (pending)
621 {
622 /* allow postgres to cancel processing */
623 CHECK_FOR_INTERRUPTS();
624
625 /* wait for events */
626 if (poll_conns(func, cluster) == 0)
627 continue;
628
629 /* recheck */
630 pending = 0;
631 gettimeofday(&now, NULL);
632 for (i = 0; i < cluster->active_count; i++)
633 {
634 conn = cluster->active_list[i];
635 if (!conn->run_tag)
636 continue;
637
638 /* login finished, send query */
639 if (conn->cur->state == C_READY)
640 send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
641
642 if (conn->cur->state != C_DONE)
643 pending++;
644
645 check_timeouts(func, cluster, conn, now.tv_sec);
646 }
647 }
648
649 /* review results, calculate total */
650 for (i = 0; i < cluster->active_count; i++)
651 {
652 conn = cluster->active_list[i];
653
654 if ((conn->run_tag || conn->res)
655 && !(conn->run_tag && conn->res))
656 plproxy_error(func, "run_tag does not match res");
657
658 if (!conn->run_tag)
659 continue;
660
661 if (conn->cur->state != C_DONE)
662 plproxy_error(func, "Unfinished connection");
663 if (conn->res == NULL)
664 plproxy_error(func, "Lost result");
665
666 err = PQresultStatus(conn->res);
667 if (err != PGRES_TUPLES_OK)
668 plproxy_error(func, "Remote error: %s",
669 PQresultErrorMessage(conn->res));
670
671 cluster->ret_total += PQntuples(conn->res);
672 }
673 }
674
675 static void
remote_wait_for_cancel(ProxyFunction * func)676 remote_wait_for_cancel(ProxyFunction *func)
677 {
678 ProxyConnection *conn;
679 ProxyCluster *cluster = func->cur_cluster;
680 int i,
681 pending;
682 struct timeval now;
683
684 /* now loop until all results are arrived */
685 while (1)
686 {
687 /* allow postgres to cancel processing */
688 CHECK_FOR_INTERRUPTS();
689
690 /* recheck */
691 pending = 0;
692 gettimeofday(&now, NULL);
693 for (i = 0; i < cluster->active_count; i++)
694 {
695 conn = cluster->active_list[i];
696 if (!conn->run_tag)
697 continue;
698
699 if (conn->cur->state == C_QUERY_READ)
700 pending++;
701 check_timeouts(func, cluster, conn, now.tv_sec);
702 }
703 if (!pending)
704 break;
705
706 /* wait for events */
707 poll_conns(func, cluster);
708 }
709
710 /* review results, calculate total */
711 for (i = 0; i < cluster->active_count; i++)
712 {
713 conn = cluster->active_list[i];
714
715 if (!conn->run_tag)
716 continue;
717
718 if (conn->cur->state != C_DONE && conn->cur->state != C_NONE)
719 plproxy_error(func, "Unfinished connection: %d", conn->cur->state);
720 if (conn->res != NULL)
721 {
722 PQclear(conn->res);
723 conn->res = NULL;
724 }
725 }
726 }
727
728 static void
remote_cancel(ProxyFunction * func)729 remote_cancel(ProxyFunction *func)
730 {
731 ProxyConnection *conn;
732 ProxyCluster *cluster = func->cur_cluster;
733 PGcancel *cancel;
734 char errbuf[256];
735 int ret;
736 int i;
737
738 if (cluster == NULL)
739 return;
740
741 for (i = 0; i < cluster->active_count; i++)
742 {
743 conn = cluster->active_list[i];
744 switch (conn->cur->state)
745 {
746 case C_NONE:
747 case C_READY:
748 case C_DONE:
749 break;
750 case C_QUERY_WRITE:
751 case C_CONNECT_READ:
752 case C_CONNECT_WRITE:
753 plproxy_disconnect(conn->cur);
754 break;
755 case C_QUERY_READ:
756 cancel = PQgetCancel(conn->cur->db);
757 if (cancel == NULL)
758 {
759 elog(NOTICE, "Invalid connection!");
760 continue;
761 }
762 ret = PQcancel(cancel, errbuf, sizeof(errbuf));
763 PQfreeCancel(cancel);
764 if (ret == 0)
765 elog(NOTICE, "Cancel query failed!");
766 else
767 conn->cur->waitCancel = 1;
768 break;
769 }
770 }
771
772 remote_wait_for_cancel(func);
773 }
774
775 /*
776 * Tag & move tagged connections to active list
777 */
778
tag_part(struct ProxyCluster * cluster,int64 hash,int tag)779 static void tag_part(struct ProxyCluster *cluster, int64 hash, int tag)
780 {
781 ProxyConnection *conn;
782 int64 idx;
783
784 /* map hash to connection index */
785 if (cluster->config.modular_mapping) {
786 if (hash < 0)
787 idx = -(hash % cluster->part_count);
788 else
789 idx = hash % cluster->part_count;
790 } else {
791 idx = hash & cluster->part_mask;
792 }
793 conn = cluster->part_map[idx];
794
795 if (!conn->run_tag)
796 plproxy_activate_connection(conn);
797
798 conn->run_tag = tag;
799 }
800
801 /*
802 * Run hash function and tag connections. If any of the hash function
803 * arguments are mentioned in the split_arrays an element of the array
804 * is used instead of the actual array.
805 */
806 static void
tag_hash_partitions(ProxyFunction * func,FunctionCallInfo fcinfo,int tag,DatumArray ** array_params,int array_row)807 tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
808 DatumArray **array_params, int array_row)
809 {
810 int i;
811 TupleDesc desc;
812 Oid htype;
813 ProxyCluster *cluster = func->cur_cluster;
814
815 /* execute cached plan */
816 plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row);
817
818 /* get header */
819 desc = SPI_tuptable->tupdesc;
820 htype = SPI_gettypeid(desc, 1);
821
822 /* tag connections */
823 for (i = 0; i < SPI_processed; i++)
824 {
825 bool isnull;
826 int64 hashval = 0;
827 HeapTuple row = SPI_tuptable->vals[i];
828 Datum val = SPI_getbinval(row, desc, 1, &isnull);
829
830 if (isnull)
831 plproxy_error(func, "Hash function returned NULL");
832
833 if (htype == INT4OID)
834 hashval = DatumGetInt32(val);
835 else if (htype == INT8OID)
836 hashval = DatumGetInt64(val);
837 else if (htype == INT2OID)
838 hashval = DatumGetInt16(val);
839 else
840 plproxy_error(func, "Hash result must be int2, int4 or int8");
841
842 tag_part(cluster, hashval, tag);
843 }
844
845 /* sanity check */
846 if (SPI_processed == 0 || SPI_processed > 1)
847 if (!fcinfo->flinfo->fn_retset)
848 plproxy_error(func, "Only set-returning function"
849 " allows hashcount <> 1");
850 }
851
852 /*
853 * Deconstruct an array type to array of Datums, note NULL elements
854 * and determine the element type information.
855 */
856 static DatumArray *
make_datum_array(ProxyFunction * func,ArrayType * v,ProxyType * array_type)857 make_datum_array(ProxyFunction *func, ArrayType *v, ProxyType *array_type)
858 {
859 DatumArray *da = palloc0(sizeof(*da));
860
861 da->type = plproxy_get_elem_type(func, array_type, true);
862
863 if (v)
864 deconstruct_array(v,
865 da->type->type_oid, da->type->length, da->type->by_value,
866 da->type->alignment,
867 &da->values, &da->nulls, &da->elem_count);
868 return da;
869 }
870
871 /*
872 * Evaluate the run condition. Tag the matching connections with the specified
873 * tag.
874 *
875 * Note that we don't allow nested plproxy calls on the same cluster (ie.
876 * remote hash functions). The cluster and connection state are global and
877 * would easily get messed up.
878 */
879 static void
tag_run_on_partitions(ProxyFunction * func,FunctionCallInfo fcinfo,int tag,DatumArray ** array_params,int array_row)880 tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
881 DatumArray **array_params, int array_row)
882 {
883 ProxyCluster *cluster = func->cur_cluster;
884 int i;
885
886 switch (func->run_type)
887 {
888 case R_HASH:
889 tag_hash_partitions(func, fcinfo, tag, array_params, array_row);
890 break;
891 case R_ALL:
892 for (i = 0; i < cluster->part_count; i++)
893 tag_part(cluster, i, tag);
894 break;
895 case R_EXACT:
896 i = func->exact_nr;
897 if (i < 0 || i >= cluster->part_count)
898 plproxy_error(func, "part number out of range");
899 tag_part(cluster, i, tag);
900 break;
901 case R_ANY:
902 tag_part(cluster, random(), tag);
903 break;
904 default:
905 plproxy_error(func, "uninitialized run_type");
906 }
907 }
908
909 /*
910 * Tag the partitions to be run on, if split is requested prepare the
911 * per-partition split array parameters.
912 *
913 * This is done by looping over all of the split arrays side-by-side, for each
914 * tuple see if it satisfies the RUN ON condition. If so, copy the tuple
915 * to the partition's private array parameters.
916 */
917 static void
prepare_and_tag_partitions(ProxyFunction * func,FunctionCallInfo fcinfo)918 prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
919 {
920 int i, row, col;
921 int split_array_len = -1;
922 int split_array_count = 0;
923 ProxyCluster *cluster = func->cur_cluster;
924 DatumArray *arrays_to_split[FUNC_MAX_ARGS];
925
926 /*
927 * See if we have any arrays to split. If so, make them manageable by
928 * converting them to Datum arrays. During the process verify that all
929 * the arrays are of the same length.
930 */
931 for (i = 0; i < func->arg_count; i++)
932 {
933 ArrayType *v;
934
935 if (!IS_SPLIT_ARG(func, i))
936 {
937 arrays_to_split[i] = NULL;
938 continue;
939 }
940
941 if (PG_ARGISNULL(i))
942 v = NULL;
943 else
944 {
945 v = PG_GETARG_ARRAYTYPE_P(i);
946
947 if (ARR_NDIM(v) > 1)
948 plproxy_error(func, "split multi-dimensional arrays are not supported");
949 }
950
951 arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]);
952
953 /* Check that the element counts match */
954 if (split_array_len < 0)
955 split_array_len = arrays_to_split[i]->elem_count;
956 else if (arrays_to_split[i]->elem_count != split_array_len)
957 plproxy_error(func, "split arrays must be of identical lengths");
958
959 ++split_array_count;
960 }
961
962 /* If nothing to split, just tag the partitions and be done with it */
963 if (!split_array_count)
964 {
965 tag_run_on_partitions(func, fcinfo, 1, NULL, 0);
966 return;
967 }
968
969 /* Need to split, evaluate the RUN ON condition for each of the elements. */
970 for (row = 0; row < split_array_len; row++)
971 {
972 int part;
973 int my_tag = row+1;
974
975 /*
976 * Tag the run-on partitions with a tag that allows us us to identify
977 * which partitions need the set of elements from this row.
978 */
979 tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row);
980
981 /* Add the array elements to the partitions tagged in previous step */
982 for (part = 0; part < cluster->active_count; part++)
983 {
984 ProxyConnection *conn = cluster->active_list[part];
985
986 if (conn->run_tag != my_tag)
987 continue;
988
989 if (!conn->bstate)
990 conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate));
991
992 /* Add this set of elements to the partition specific arrays */
993 for (col = 0; col < func->arg_count; col++)
994 {
995 if (!IS_SPLIT_ARG(func, col))
996 continue;
997
998 conn->bstate[col] = accumArrayResult(conn->bstate[col],
999 arrays_to_split[col]->values[row],
1000 arrays_to_split[col]->nulls[row],
1001 arrays_to_split[col]->type->type_oid,
1002 CurrentMemoryContext);
1003 }
1004 }
1005 }
1006
1007 /*
1008 * Finally, copy the accumulated arrays to the actual connections
1009 * to be used as parameters.
1010 */
1011 for (i = 0; i < cluster->active_count; i++)
1012 {
1013 ProxyConnection *conn = cluster->active_list[i];
1014
1015 if (!conn->run_tag)
1016 continue;
1017
1018 conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params));
1019
1020 for (col = 0; col < func->arg_count; col++)
1021 {
1022 if (!IS_SPLIT_ARG(func, col))
1023 conn->split_params[col] = PointerGetDatum(NULL);
1024 else
1025 conn->split_params[col] = makeArrayResult(conn->bstate[col],
1026 CurrentMemoryContext);
1027 }
1028 }
1029 }
1030
1031 /*
1032 * Prepare parameters for the query.
1033 */
1034 static void
prepare_query_parameters(ProxyFunction * func,FunctionCallInfo fcinfo)1035 prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo)
1036 {
1037 int i;
1038 ProxyCluster *cluster = func->cur_cluster;
1039
1040 for (i = 0; i < func->remote_sql->arg_count; i++)
1041 {
1042 int idx = func->remote_sql->arg_lookup[i];
1043 bool bin = cluster->config.disable_binary ? 0 : 1;
1044 const char *fixed_param_val = NULL;
1045 int fixed_param_len, fixed_param_fmt;
1046 int part;
1047
1048 /* Avoid doing multiple conversions for fixed parameters */
1049 if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx))
1050 {
1051 fixed_param_val = plproxy_send_type(func->arg_types[idx],
1052 PG_GETARG_DATUM(idx),
1053 bin,
1054 &fixed_param_len,
1055 &fixed_param_fmt);
1056 }
1057
1058 /* Add the parameters to partitions */
1059 for (part = 0; part < cluster->active_count; part++)
1060 {
1061 ProxyConnection *conn = cluster->active_list[part];
1062
1063 if (!conn->run_tag)
1064 continue;
1065
1066 if (PG_ARGISNULL(idx))
1067 {
1068 conn->param_values[i] = NULL;
1069 conn->param_lengths[i] = 0;
1070 conn->param_formats[i] = 0;
1071 }
1072 else
1073 {
1074 if (IS_SPLIT_ARG(func, idx))
1075 {
1076 conn->param_values[i] = plproxy_send_type(func->arg_types[idx],
1077 conn->split_params[idx],
1078 bin,
1079 &conn->param_lengths[i],
1080 &conn->param_formats[i]);
1081 }
1082 else
1083 {
1084 conn->param_values[i] = fixed_param_val;
1085 conn->param_lengths[i] = fixed_param_len;
1086 conn->param_formats[i] = fixed_param_fmt;
1087 }
1088 }
1089 }
1090 }
1091 }
1092
1093 /* Clean old results and prepare for new one */
1094 void
plproxy_clean_results(ProxyCluster * cluster)1095 plproxy_clean_results(ProxyCluster *cluster)
1096 {
1097 int i;
1098 ProxyConnection *conn;
1099
1100 if (!cluster)
1101 return;
1102
1103 cluster->ret_total = 0;
1104 cluster->ret_cur_conn = 0;
1105
1106 for (i = 0; i < cluster->active_count; i++)
1107 {
1108 conn = cluster->active_list[i];
1109 if (conn->res)
1110 {
1111 PQclear(conn->res);
1112 conn->res = NULL;
1113 }
1114 conn->pos = 0;
1115 conn->run_tag = 0;
1116 conn->bstate = NULL;
1117 conn->cur = NULL;
1118 cluster->active_list[i] = NULL;
1119 }
1120
1121 /* reset active_list */
1122 cluster->active_count = 0;
1123
1124 /* conn state checks are done in prepare_conn */
1125 }
1126
1127 /* Drop one connection */
plproxy_disconnect(ProxyConnectionState * cur)1128 void plproxy_disconnect(ProxyConnectionState *cur)
1129 {
1130 if (cur->db)
1131 PQfinish(cur->db);
1132 cur->db = NULL;
1133 cur->state = C_NONE;
1134 cur->tuning = 0;
1135 cur->connect_time = 0;
1136 cur->query_time = 0;
1137 cur->same_ver = 0;
1138 cur->tuning = 0;
1139 cur->waitCancel = 0;
1140 }
1141
1142 /* Select partitions and execute query on them */
1143 void
plproxy_exec(ProxyFunction * func,FunctionCallInfo fcinfo)1144 plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
1145 {
1146 /*
1147 * Prepare parameters and run query. On cancel, send cancel request to
1148 * partitions too.
1149 */
1150 PG_TRY();
1151 {
1152 func->cur_cluster->busy = true;
1153 func->cur_cluster->cur_func = func;
1154
1155 /* clean old results */
1156 plproxy_clean_results(func->cur_cluster);
1157
1158 /* tag the partitions and prepare per-partition parameters */
1159 prepare_and_tag_partitions(func, fcinfo);
1160
1161 /* prepare the target query parameters */
1162 prepare_query_parameters(func, fcinfo);
1163
1164 remote_execute(func);
1165
1166 func->cur_cluster->busy = false;
1167 }
1168 PG_CATCH();
1169 {
1170 func->cur_cluster->busy = false;
1171
1172 if (geterrcode() == ERRCODE_QUERY_CANCELED)
1173 remote_cancel(func);
1174
1175 /* plproxy_remote_error() cannot clean itself, do it here */
1176 plproxy_clean_results(func->cur_cluster);
1177
1178 PG_RE_THROW();
1179 }
1180 PG_END_TRY();
1181 }
1182
1183