1 /*
2  * PL/Proxy - easy access to partitioned database.
3  *
4  * Copyright (c) 2006-2020 PL/Proxy Authors
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Actual execution logic is here.
21  *
22  * - Tag particural databases, where query must be sent.
23  * - Send the query.
24  * - Fetch the results.
25  */
26 
27 #include "plproxy.h"
28 
29 #include <sys/time.h>
30 
31 /* find poll() */
32 #if defined(HAVE_POLL_H)
33 #include <poll.h>
34 #elif defined(WIN32)
35 #define poll(fds, nfds, timeout_ms) WSAPoll(fds, nfds, timeout_ms)
36 #elif !defined(POLLIN)
37 #error "PL/Proxy requires poll() API"
38 #endif
39 
40 /* some error happened */
41 static void
conn_error(ProxyFunction * func,ProxyConnection * conn,const char * desc)42 conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
43 {
44 	plproxy_error(func, "[%s] %s: %s",
45 				  PQdb(conn->cur->db), desc, PQerrorMessage(conn->cur->db));
46 }
47 
48 /* Compare if major/minor match. Works on "MAJ.MIN.*" */
49 static bool
cmp_branch(const char * this,const char * that)50 cmp_branch(const char *this, const char *that)
51 {
52 	int dot = 0;
53 	int i;
54 
55 	for (i = 0; this[i] || that[i]; i++)
56 	{
57 		/* allow just maj.min verson */
58 		if (dot && this[i] == '.' && !that[i])
59 			return true;
60 		if (dot && that[i] == '.' && !this[i])
61 			return true;
62 
63 		/* compare, different length is also handled here */
64 		if (this[i] != that[i])
65 			return false;
66 
67 		/* stop on second dot */
68 		if (this[i] == '.' && dot++)
69 			return true;
70 	}
71 	return true;
72 }
73 
74 static void
flush_connection(ProxyFunction * func,ProxyConnection * conn)75 flush_connection(ProxyFunction *func, ProxyConnection *conn)
76 {
77 	int res;
78 
79 	/* flush it down */
80 	res = PQflush(conn->cur->db);
81 
82 	/* set actual state */
83 	if (res > 0)
84 		conn->cur->state = C_QUERY_WRITE;
85 	else if (res == 0)
86 		conn->cur->state = C_QUERY_READ;
87 	else
88 		conn_error(func, conn, "PQflush");
89 }
90 
91 /*
92  * Small sanity checking for new connections.
93  *
94  * Current checks:
95  * - Does there happen any encoding conversations?
96  * - Difference in standard_conforming_strings.
97  */
98 static int
tune_connection(ProxyFunction * func,ProxyConnection * conn)99 tune_connection(ProxyFunction *func, ProxyConnection *conn)
100 {
101 	const char *this_enc, *dst_enc;
102 	const char *dst_ver;
103 	StringInfo	sql = NULL;
104 
105 	/*
106 	 * check if target server has same backend version.
107 	 */
108 	dst_ver = PQparameterStatus(conn->cur->db, "server_version");
109 	conn->cur->same_ver = cmp_branch(dst_ver, PG_VERSION);
110 
111 	/*
112 	 * Make sure remote I/O is done using local server_encoding.
113 	 */
114 	this_enc = GetDatabaseEncodingName();
115 	dst_enc = PQparameterStatus(conn->cur->db, "client_encoding");
116 	if (dst_enc && strcmp(this_enc, dst_enc))
117 	{
118 		if (!sql)
119 			sql = makeStringInfo();
120 		appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc);
121 	}
122 
123 	/*
124 	 * if second time in this function, they should be active already.
125 	 */
126 	if (sql && conn->cur->tuning)
127 	{
128 		/* display SET query */
129 		appendStringInfo(sql, "-- does not seem to apply");
130 		conn_error(func, conn, sql->data);
131 	}
132 
133 	/*
134 	 * send tuning query
135 	 */
136 	if (sql)
137 	{
138 		conn->cur->tuning = 1;
139 		conn->cur->state = C_QUERY_WRITE;
140 		if (!PQsendQuery(conn->cur->db, sql->data))
141 			conn_error(func, conn, "PQsendQuery");
142 		pfree(sql->data);
143 		pfree(sql);
144 
145 		flush_connection(func, conn);
146 		return 1;
147 	}
148 
149 	conn->cur->tuning = 0;
150 	return 0;
151 }
152 
153 /* send the query to server connection */
154 static void
send_query(ProxyFunction * func,ProxyConnection * conn,const char ** values,int * plengths,int * pformats)155 send_query(ProxyFunction *func, ProxyConnection *conn,
156 		   const char **values, int *plengths, int *pformats)
157 {
158 	int			res;
159 	struct timeval now;
160 	ProxyQuery *q = func->remote_sql;
161 	ProxyConfig *cf = &func->cur_cluster->config;
162 	int			binary_result = 0;
163 
164 	gettimeofday(&now, NULL);
165 	conn->cur->query_time = now.tv_sec;
166 
167 	tune_connection(func, conn);
168 	if (conn->cur->tuning)
169 		return;
170 
171 	/* use binary result only on same backend ver */
172 	if (cf->disable_binary == 0 && conn->cur->same_ver)
173 	{
174 		/* binary recv for non-record types */
175 		if (func->ret_scalar)
176 		{
177 			if (func->ret_scalar->has_recv)
178 				binary_result = 1;
179 		}
180 		else
181 		{
182 			if (func->ret_composite->use_binary)
183 				binary_result = 1;
184 		}
185 	}
186 
187 	/* send query */
188 	conn->cur->state = C_QUERY_WRITE;
189 	res = PQsendQueryParams(conn->cur->db, q->sql, q->arg_count,
190 							NULL,		/* paramTypes */
191 							values,		/* paramValues */
192 							plengths,	/* paramLengths */
193 							pformats,	/* paramFormats */
194 							binary_result);		/* resultformat, 0-text, 1-bin */
195 	if (!res)
196 		conn_error(func, conn, "PQsendQueryParams");
197 
198 	/* flush it down */
199 	flush_connection(func, conn);
200 }
201 
202 /* returns false of conn should be dropped */
203 static bool
check_old_conn(ProxyFunction * func,ProxyConnection * conn,struct timeval * now)204 check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
205 {
206 	time_t		t;
207 	int			res;
208 	struct pollfd	pfd;
209 	ProxyConfig *cf = &func->cur_cluster->config;
210 
211 	if (PQstatus(conn->cur->db) != CONNECTION_OK)
212 		return false;
213 
214 	/* check if too old */
215 	if (cf->connection_lifetime > 0)
216 	{
217 		t = now->tv_sec - conn->cur->connect_time;
218 		if (t >= cf->connection_lifetime)
219 			return false;
220 	}
221 
222 	/* how long ts been idle */
223 	t = now->tv_sec - conn->cur->query_time;
224 	if (t < PLPROXY_IDLE_CONN_CHECK)
225 		return true;
226 
227 	/*
228 	 * Simple way to check if old connection is stable - look if there
229 	 * are events pending.  If there are drop the connection.
230 	 */
231 intr_loop:
232 	pfd.fd = PQsocket(conn->cur->db);
233 	pfd.events = POLLIN;
234 	pfd.revents = 0;
235 
236 	res = poll(&pfd, 1, 0);
237 	if (res > 0)
238 	{
239 		elog(WARNING, "PL/Proxy: detected unstable connection");
240 		return false;
241 	}
242 	else if (res < 0)
243 	{
244 		if (errno == EINTR)
245 			goto intr_loop;
246 		plproxy_error(func, "check_old_conn: select failed: %s",
247 					  strerror(errno));
248 	}
249 
250 	/* seems ok */
251 	return true;
252 }
253 
254 static void
handle_notice(void * arg,const PGresult * res)255 handle_notice(void *arg, const PGresult *res)
256 {
257 	ProxyConnection *conn = arg;
258 	ProxyCluster *cluster = conn->cluster;
259 	plproxy_remote_error(cluster->cur_func, conn, res, false);
260 }
261 
262 static const char *
get_connstr(ProxyConnection * conn)263 get_connstr(ProxyConnection *conn)
264 {
265 	StringInfoData cstr;
266 	ConnUserInfo *info = conn->cluster->cur_userinfo;
267 
268 	if (strstr(conn->connstr, "user=") != NULL)
269 		return pstrdup(conn->connstr);
270 
271 	initStringInfo(&cstr);
272 	appendStringInfoString(&cstr, conn->connstr);
273 	if (info->extra_connstr)
274 		appendStringInfo(&cstr, " %s", info->extra_connstr);
275 	else
276 		plproxy_append_cstr_option(&cstr, "user", info->username);
277 	return cstr.data;
278 }
279 
280 /* check existing conn status or launch new conn */
281 static void
prepare_conn(ProxyFunction * func,ProxyConnection * conn)282 prepare_conn(ProxyFunction *func, ProxyConnection *conn)
283 {
284 	struct timeval now;
285 	const char *connstr;
286 
287 	gettimeofday(&now, NULL);
288 
289 	conn->cur->waitCancel = 0;
290 
291 	/* state should be C_READY or C_NONE */
292 	switch (conn->cur->state)
293 	{
294 		case C_DONE:
295 			conn->cur->state = C_READY;
296 			/* fallthrough */
297 		case C_READY:
298 			if (check_old_conn(func, conn, &now))
299 				return;
300 			/* fallthrough */
301 		case C_CONNECT_READ:
302 		case C_CONNECT_WRITE:
303 		case C_QUERY_READ:
304 		case C_QUERY_WRITE:
305 			/* close rotten connection */
306 			elog(NOTICE, "PL/Proxy: dropping stale conn");
307 			plproxy_disconnect(conn->cur);
308 		case C_NONE:
309 			break;
310 	}
311 
312 	conn->cur->connect_time = now.tv_sec;
313 
314 	/* launch new connection */
315 	connstr = get_connstr(conn);
316 	conn->cur->db = PQconnectStart(connstr);
317 	if (conn->cur->db == NULL)
318 		plproxy_error(func, "No memory for PGconn");
319 
320 	/* tag connection dirty */
321 	conn->cur->state = C_CONNECT_WRITE;
322 
323 	if (PQstatus(conn->cur->db) == CONNECTION_BAD)
324 		conn_error(func, conn, "PQconnectStart");
325 
326 	/* override default notice handler */
327 	PQsetNoticeReceiver(conn->cur->db, handle_notice, conn);
328 }
329 
330 /*
331  * Connection has a resultset avalable, fetch it.
332  *
333  * Returns true if there may be more results coming,
334  * false if all done.
335  */
336 static bool
another_result(ProxyFunction * func,ProxyConnection * conn)337 another_result(ProxyFunction *func, ProxyConnection *conn)
338 {
339 	PGresult   *res;
340 
341 	/* got one */
342 	res = PQgetResult(conn->cur->db);
343 	if (res == NULL)
344 	{
345 		conn->cur->waitCancel = 0;
346 		if (conn->cur->tuning)
347 			conn->cur->state = C_READY;
348 		else
349 			conn->cur->state = C_DONE;
350 		return false;
351 	}
352 
353 	/* ignore result when waiting for cancel */
354 	if (conn->cur->waitCancel)
355 	{
356 		PQclear(res);
357 		return true;
358 	}
359 
360 	switch (PQresultStatus(res))
361 	{
362 		case PGRES_TUPLES_OK:
363 			if (conn->res)
364 			{
365 				PQclear(res);
366 				conn_error(func, conn, "double result?");
367 			}
368 			conn->res = res;
369 			break;
370 		case PGRES_COMMAND_OK:
371 			PQclear(res);
372 			break;
373 		case PGRES_FATAL_ERROR:
374 			if (conn->res)
375 				PQclear(conn->res);
376 			conn->res = res;
377 
378 			plproxy_remote_error(func, conn, res, true);
379 			break;
380 		default:
381 			if (conn->res)
382 				PQclear(conn->res);
383 			conn->res = res;
384 
385 			plproxy_error(func, "Unexpected result type: %s", PQresStatus(PQresultStatus(res)));
386 			break;
387 	}
388 	return true;
389 }
390 
391 /*
392  * Called when select() told that conn is avail for reading/writing.
393  *
394  * It should call postgres handlers and then change state if needed.
395  */
396 static void
handle_conn(ProxyFunction * func,ProxyConnection * conn)397 handle_conn(ProxyFunction *func, ProxyConnection *conn)
398 {
399 	int			res;
400 	PostgresPollingStatusType poll_res;
401 
402 	switch (conn->cur->state)
403 	{
404 		case C_CONNECT_READ:
405 		case C_CONNECT_WRITE:
406 			poll_res = PQconnectPoll(conn->cur->db);
407 			switch (poll_res)
408 			{
409 				case PGRES_POLLING_WRITING:
410 					conn->cur->state = C_CONNECT_WRITE;
411 					break;
412 				case PGRES_POLLING_READING:
413 					conn->cur->state = C_CONNECT_READ;
414 					break;
415 				case PGRES_POLLING_OK:
416 					conn->cur->state = C_READY;
417 					break;
418 				case PGRES_POLLING_ACTIVE:
419 				case PGRES_POLLING_FAILED:
420 					conn_error(func, conn, "PQconnectPoll");
421 			}
422 			break;
423 		case C_QUERY_WRITE:
424 			flush_connection(func, conn);
425 			break;
426 		case C_QUERY_READ:
427 			res = PQconsumeInput(conn->cur->db);
428 			if (res == 0)
429 				conn_error(func, conn, "PQconsumeInput");
430 
431 			/* loop until PQgetResult returns NULL */
432 			while (1)
433 			{
434 				/* if PQisBusy, then incomplete result */
435 				if (PQisBusy(conn->cur->db))
436 					break;
437 
438 				/* got one */
439 				if (!another_result(func, conn))
440 					break;
441 			}
442 		case C_NONE:
443 		case C_DONE:
444 		case C_READY:
445 			break;
446 	}
447 }
448 
449 /*
450  * Check if tagged connections have interesting events.
451  *
452  * Currenly uses select() as it should be enough
453  * on small number of sockets.
454  */
455 static int
poll_conns(ProxyFunction * func,ProxyCluster * cluster)456 poll_conns(ProxyFunction *func, ProxyCluster *cluster)
457 {
458 	static struct pollfd *pfd_cache = NULL;
459 	static int pfd_allocated = 0;
460 
461 	int			i,
462 				res,
463 				fd;
464 	ProxyConnection *conn;
465 	struct pollfd *pf;
466 	int numfds = 0;
467 	int ev = 0;
468 
469 	if (pfd_allocated < cluster->active_count)
470 	{
471 		struct pollfd *tmp;
472 		int num = cluster->active_count;
473 		if (num < 64)
474 			num = 64;
475 		if (pfd_cache == NULL)
476 			tmp = malloc(num * sizeof(struct pollfd));
477 		else
478 			tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
479 		if (!tmp)
480 			elog(ERROR, "no mem for pollfd cache");
481 		pfd_cache = tmp;
482 		pfd_allocated = num;
483 	}
484 
485 	for (i = 0; i < cluster->active_count; i++)
486 	{
487 		conn = cluster->active_list[i];
488 		if (!conn->run_tag)
489 			continue;
490 
491 		/* decide what to do */
492 		switch (conn->cur->state)
493 		{
494 			case C_DONE:
495 			case C_READY:
496 			case C_NONE:
497 				continue;
498 			case C_CONNECT_READ:
499 			case C_QUERY_READ:
500 				ev = POLLIN;
501 				break;
502 			case C_CONNECT_WRITE:
503 			case C_QUERY_WRITE:
504 				ev = POLLOUT;
505 				break;
506 		}
507 
508 		/* add fd to proper set */
509 		pf = pfd_cache + numfds++;
510 		pf->fd = PQsocket(conn->cur->db);
511 		pf->events = ev;
512 		pf->revents = 0;
513 	}
514 
515 	/* wait for events */
516 	res = poll(pfd_cache, numfds, 1000);
517 	if (res == 0)
518 		return 0;
519 	if (res < 0)
520 	{
521 		if (errno == EINTR)
522 			return 0;
523 		plproxy_error(func, "poll() failed: %s", strerror(errno));
524 	}
525 
526 	/* now recheck the conns */
527 	pf = pfd_cache;
528 	for (i = 0; i < cluster->active_count; i++)
529 	{
530 		conn = cluster->active_list[i];
531 		if (!conn->run_tag)
532 			continue;
533 
534 		switch (conn->cur->state)
535 		{
536 			case C_DONE:
537 			case C_READY:
538 			case C_NONE:
539 				continue;
540 			case C_CONNECT_READ:
541 			case C_QUERY_READ:
542 			case C_CONNECT_WRITE:
543 			case C_QUERY_WRITE:
544 				break;
545 		}
546 
547 		/*
548 		 * they should be in same order as called,
549 		 */
550 		fd = PQsocket(conn->cur->db);
551 		if (pf->fd != fd)
552 			elog(WARNING, "fd order from poll() is messed up?");
553 
554 		if (pf->revents)
555 			handle_conn(func, conn);
556 
557 		pf++;
558 	}
559 	return 1;
560 }
561 
562 /* Check if some operation has gone over limit */
563 static void
check_timeouts(ProxyFunction * func,ProxyCluster * cluster,ProxyConnection * conn,time_t now)564 check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn, time_t now)
565 {
566 	ProxyConfig *cf = &cluster->config;
567 
568 	switch (conn->cur->state)
569 	{
570 		case C_CONNECT_READ:
571 		case C_CONNECT_WRITE:
572 			if (cf->connect_timeout <= 0)
573 				break;
574 			if (now - conn->cur->connect_time <= cf->connect_timeout)
575 				break;
576 			plproxy_error(func, "connect timeout to: %s", conn->connstr);
577 			break;
578 
579 		case C_QUERY_READ:
580 		case C_QUERY_WRITE:
581 			if (cf->query_timeout <= 0)
582 				break;
583 			if (now - conn->cur->query_time <= cf->query_timeout)
584 				break;
585 			plproxy_error(func, "query timeout");
586 			break;
587 		default:
588 			break;
589 	}
590 }
591 
592 /* Run the query on all tagged connections in parallel */
593 static void
remote_execute(ProxyFunction * func)594 remote_execute(ProxyFunction *func)
595 {
596 	ExecStatusType err;
597 	ProxyConnection *conn;
598 	ProxyCluster *cluster = func->cur_cluster;
599 	int			i,
600 				pending = 0;
601 	struct timeval now;
602 
603 	/* either launch connection or send query */
604 	for (i = 0; i < cluster->active_count; i++)
605 	{
606 		conn = cluster->active_list[i];
607 		if (!conn->run_tag)
608 			continue;
609 
610 		/* check if conn is alive, and launch if not */
611 		prepare_conn(func, conn);
612 		pending++;
613 
614 		/* if conn is ready, then send query away */
615 		if (conn->cur->state == C_READY)
616 			send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
617 	}
618 
619 	/* now loop until all results are arrived */
620 	while (pending)
621 	{
622 		/* allow postgres to cancel processing */
623 		CHECK_FOR_INTERRUPTS();
624 
625 		/* wait for events */
626 		if (poll_conns(func, cluster) == 0)
627 			continue;
628 
629 		/* recheck */
630 		pending = 0;
631 		gettimeofday(&now, NULL);
632 		for (i = 0; i < cluster->active_count; i++)
633 		{
634 			conn = cluster->active_list[i];
635 			if (!conn->run_tag)
636 				continue;
637 
638 			/* login finished, send query */
639 			if (conn->cur->state == C_READY)
640 				send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
641 
642 			if (conn->cur->state != C_DONE)
643 				pending++;
644 
645 			check_timeouts(func, cluster, conn, now.tv_sec);
646 		}
647 	}
648 
649 	/* review results, calculate total */
650 	for (i = 0; i < cluster->active_count; i++)
651 	{
652 		conn = cluster->active_list[i];
653 
654 		if ((conn->run_tag || conn->res)
655 			&& !(conn->run_tag && conn->res))
656 			plproxy_error(func, "run_tag does not match res");
657 
658 		if (!conn->run_tag)
659 			continue;
660 
661 		if (conn->cur->state != C_DONE)
662 			plproxy_error(func, "Unfinished connection");
663 		if (conn->res == NULL)
664 			plproxy_error(func, "Lost result");
665 
666 		err = PQresultStatus(conn->res);
667 		if (err != PGRES_TUPLES_OK)
668 			plproxy_error(func, "Remote error: %s",
669 						  PQresultErrorMessage(conn->res));
670 
671 		cluster->ret_total += PQntuples(conn->res);
672 	}
673 }
674 
675 static void
remote_wait_for_cancel(ProxyFunction * func)676 remote_wait_for_cancel(ProxyFunction *func)
677 {
678 	ProxyConnection *conn;
679 	ProxyCluster *cluster = func->cur_cluster;
680 	int			i,
681 				pending;
682 	struct timeval now;
683 
684 	/* now loop until all results are arrived */
685 	while (1)
686 	{
687 		/* allow postgres to cancel processing */
688 		CHECK_FOR_INTERRUPTS();
689 
690 		/* recheck */
691 		pending = 0;
692 		gettimeofday(&now, NULL);
693 		for (i = 0; i < cluster->active_count; i++)
694 		{
695 			conn = cluster->active_list[i];
696 			if (!conn->run_tag)
697 				continue;
698 
699 			if (conn->cur->state == C_QUERY_READ)
700 				pending++;
701 			check_timeouts(func, cluster, conn, now.tv_sec);
702 		}
703 		if (!pending)
704 			break;
705 
706 		/* wait for events */
707 		poll_conns(func, cluster);
708 	}
709 
710 	/* review results, calculate total */
711 	for (i = 0; i < cluster->active_count; i++)
712 	{
713 		conn = cluster->active_list[i];
714 
715 		if (!conn->run_tag)
716 			continue;
717 
718 		if (conn->cur->state != C_DONE && conn->cur->state != C_NONE)
719 			plproxy_error(func, "Unfinished connection: %d", conn->cur->state);
720 		if (conn->res != NULL)
721 		{
722 			PQclear(conn->res);
723 			conn->res = NULL;
724 		}
725 	}
726 }
727 
728 static void
remote_cancel(ProxyFunction * func)729 remote_cancel(ProxyFunction *func)
730 {
731 	ProxyConnection *conn;
732 	ProxyCluster *cluster = func->cur_cluster;
733 	PGcancel *cancel;
734 	char errbuf[256];
735 	int ret;
736 	int i;
737 
738 	if (cluster == NULL)
739 		return;
740 
741 	for (i = 0; i < cluster->active_count; i++)
742 	{
743 		conn = cluster->active_list[i];
744 		switch (conn->cur->state)
745 		{
746 			case C_NONE:
747 			case C_READY:
748 			case C_DONE:
749 				break;
750 			case C_QUERY_WRITE:
751 			case C_CONNECT_READ:
752 			case C_CONNECT_WRITE:
753 				plproxy_disconnect(conn->cur);
754 				break;
755 			case C_QUERY_READ:
756 				cancel = PQgetCancel(conn->cur->db);
757 				if (cancel == NULL)
758 				{
759 					elog(NOTICE, "Invalid connection!");
760 					continue;
761 				}
762 				ret = PQcancel(cancel, errbuf, sizeof(errbuf));
763 				PQfreeCancel(cancel);
764 				if (ret == 0)
765 					elog(NOTICE, "Cancel query failed!");
766 				else
767 					conn->cur->waitCancel = 1;
768 				break;
769 		}
770 	}
771 
772 	remote_wait_for_cancel(func);
773 }
774 
775 /*
776  * Tag & move tagged connections to active list
777  */
778 
tag_part(struct ProxyCluster * cluster,int64 hash,int tag)779 static void tag_part(struct ProxyCluster *cluster, int64 hash, int tag)
780 {
781 	ProxyConnection *conn;
782 	int64 idx;
783 
784 	/* map hash to connection index */
785 	if (cluster->config.modular_mapping) {
786 		if (hash < 0)
787 			idx = -(hash % cluster->part_count);
788 		else
789 			idx = hash % cluster->part_count;
790 	} else {
791 		idx = hash & cluster->part_mask;
792 	}
793 	conn = cluster->part_map[idx];
794 
795 	if (!conn->run_tag)
796 		plproxy_activate_connection(conn);
797 
798 	conn->run_tag = tag;
799 }
800 
801 /*
802  * Run hash function and tag connections. If any of the hash function
803  * arguments are mentioned in the split_arrays an element of the array
804  * is used instead of the actual array.
805  */
806 static void
tag_hash_partitions(ProxyFunction * func,FunctionCallInfo fcinfo,int tag,DatumArray ** array_params,int array_row)807 tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
808 					DatumArray **array_params, int array_row)
809 {
810 	int			i;
811 	TupleDesc	desc;
812 	Oid			htype;
813 	ProxyCluster *cluster = func->cur_cluster;
814 
815 	/* execute cached plan */
816 	plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row);
817 
818 	/* get header */
819 	desc = SPI_tuptable->tupdesc;
820 	htype = SPI_gettypeid(desc, 1);
821 
822 	/* tag connections */
823 	for (i = 0; i < SPI_processed; i++)
824 	{
825 		bool		isnull;
826 		int64		hashval = 0;
827 		HeapTuple	row = SPI_tuptable->vals[i];
828 		Datum		val = SPI_getbinval(row, desc, 1, &isnull);
829 
830 		if (isnull)
831 			plproxy_error(func, "Hash function returned NULL");
832 
833 		if (htype == INT4OID)
834 			hashval = DatumGetInt32(val);
835 		else if (htype == INT8OID)
836 			hashval = DatumGetInt64(val);
837 		else if (htype == INT2OID)
838 			hashval = DatumGetInt16(val);
839 		else
840 			plproxy_error(func, "Hash result must be int2, int4 or int8");
841 
842 		tag_part(cluster, hashval, tag);
843 	}
844 
845 	/* sanity check */
846 	if (SPI_processed == 0 || SPI_processed > 1)
847 		if (!fcinfo->flinfo->fn_retset)
848 			plproxy_error(func, "Only set-returning function"
849 						  " allows hashcount <> 1");
850 }
851 
852 /*
853  * Deconstruct an array type to array of Datums, note NULL elements
854  * and determine the element type information.
855  */
856 static DatumArray *
make_datum_array(ProxyFunction * func,ArrayType * v,ProxyType * array_type)857 make_datum_array(ProxyFunction *func, ArrayType *v, ProxyType *array_type)
858 {
859 	DatumArray	   *da = palloc0(sizeof(*da));
860 
861 	da->type = plproxy_get_elem_type(func, array_type, true);
862 
863 	if (v)
864 		deconstruct_array(v,
865 						  da->type->type_oid, da->type->length, da->type->by_value,
866 						  da->type->alignment,
867 						  &da->values, &da->nulls, &da->elem_count);
868 	return da;
869 }
870 
871 /*
872  * Evaluate the run condition. Tag the matching connections with the specified
873  * tag.
874  *
875  * Note that we don't allow nested plproxy calls on the same cluster (ie.
876  * remote hash functions). The cluster and connection state are global and
877  * would easily get messed up.
878  */
879 static void
tag_run_on_partitions(ProxyFunction * func,FunctionCallInfo fcinfo,int tag,DatumArray ** array_params,int array_row)880 tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
881 					  DatumArray **array_params, int array_row)
882 {
883 	ProxyCluster   *cluster = func->cur_cluster;
884 	int				i;
885 
886 	switch (func->run_type)
887 	{
888 		case R_HASH:
889 			tag_hash_partitions(func, fcinfo, tag, array_params, array_row);
890 			break;
891 		case R_ALL:
892 			for (i = 0; i < cluster->part_count; i++)
893 				tag_part(cluster, i, tag);
894 			break;
895 		case R_EXACT:
896 			i = func->exact_nr;
897 			if (i < 0 || i >= cluster->part_count)
898 				plproxy_error(func, "part number out of range");
899 			tag_part(cluster, i, tag);
900 			break;
901 		case R_ANY:
902 			tag_part(cluster, random(), tag);
903 			break;
904 		default:
905 			plproxy_error(func, "uninitialized run_type");
906 	}
907 }
908 
909 /*
910  * Tag the partitions to be run on, if split is requested prepare the
911  * per-partition split array parameters.
912  *
913  * This is done by looping over all of the split arrays side-by-side, for each
914  * tuple see if it satisfies the RUN ON condition. If so, copy the tuple
915  * to the partition's private array parameters.
916  */
917 static void
prepare_and_tag_partitions(ProxyFunction * func,FunctionCallInfo fcinfo)918 prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
919 {
920 	int					i, row, col;
921 	int					split_array_len = -1;
922 	int					split_array_count = 0;
923 	ProxyCluster	   *cluster = func->cur_cluster;
924 	DatumArray		   *arrays_to_split[FUNC_MAX_ARGS];
925 
926 	/*
927 	 * See if we have any arrays to split. If so, make them manageable by
928 	 * converting them to Datum arrays. During the process verify that all
929 	 * the arrays are of the same length.
930 	 */
931 	for (i = 0; i < func->arg_count; i++)
932 	{
933 		ArrayType	   *v;
934 
935 		if (!IS_SPLIT_ARG(func, i))
936 		{
937 			arrays_to_split[i] = NULL;
938 			continue;
939 		}
940 
941 		if (PG_ARGISNULL(i))
942 			v = NULL;
943 		else
944 		{
945 			v = PG_GETARG_ARRAYTYPE_P(i);
946 
947 			if (ARR_NDIM(v) > 1)
948 				plproxy_error(func, "split multi-dimensional arrays are not supported");
949 		}
950 
951 		arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]);
952 
953 		/* Check that the element counts match */
954 		if (split_array_len < 0)
955 			split_array_len = arrays_to_split[i]->elem_count;
956 		else if (arrays_to_split[i]->elem_count != split_array_len)
957 			plproxy_error(func, "split arrays must be of identical lengths");
958 
959 		++split_array_count;
960 	}
961 
962 	/* If nothing to split, just tag the partitions and be done with it */
963 	if (!split_array_count)
964 	{
965 		tag_run_on_partitions(func, fcinfo, 1, NULL, 0);
966 		return;
967 	}
968 
969 	/* Need to split, evaluate the RUN ON condition for each of the elements. */
970 	for (row = 0; row < split_array_len; row++)
971 	{
972 		int		part;
973 		int		my_tag = row+1;
974 
975 		/*
976 		 * Tag the run-on partitions with a tag that allows us us to identify
977 		 * which partitions need the set of elements from this row.
978 		 */
979 		tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row);
980 
981 		/* Add the array elements to the partitions tagged in previous step */
982 		for (part = 0; part < cluster->active_count; part++)
983 		{
984 			ProxyConnection	   *conn = cluster->active_list[part];
985 
986 			if (conn->run_tag != my_tag)
987 				continue;
988 
989 			if (!conn->bstate)
990 				conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate));
991 
992 			/* Add this set of elements to the partition specific arrays */
993 			for (col = 0; col < func->arg_count; col++)
994 			{
995 				if (!IS_SPLIT_ARG(func, col))
996 					continue;
997 
998 				conn->bstate[col] = accumArrayResult(conn->bstate[col],
999 													 arrays_to_split[col]->values[row],
1000 													 arrays_to_split[col]->nulls[row],
1001 													 arrays_to_split[col]->type->type_oid,
1002 													 CurrentMemoryContext);
1003 			}
1004 		}
1005 	}
1006 
1007 	/*
1008 	 * Finally, copy the accumulated arrays to the actual connections
1009 	 * to be used as parameters.
1010 	 */
1011 	for (i = 0; i < cluster->active_count; i++)
1012 	{
1013 		ProxyConnection *conn = cluster->active_list[i];
1014 
1015 		if (!conn->run_tag)
1016 			continue;
1017 
1018 		conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params));
1019 
1020 		for (col = 0; col < func->arg_count; col++)
1021 		{
1022 			if (!IS_SPLIT_ARG(func, col))
1023 				conn->split_params[col] = PointerGetDatum(NULL);
1024 			else
1025 				conn->split_params[col] = makeArrayResult(conn->bstate[col],
1026 														  CurrentMemoryContext);
1027 		}
1028 	}
1029 }
1030 
1031 /*
1032  * Prepare parameters for the query.
1033  */
1034 static void
prepare_query_parameters(ProxyFunction * func,FunctionCallInfo fcinfo)1035 prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo)
1036 {
1037 	int				i;
1038 	ProxyCluster   *cluster = func->cur_cluster;
1039 
1040 	for (i = 0; i < func->remote_sql->arg_count; i++)
1041 	{
1042 		int			idx = func->remote_sql->arg_lookup[i];
1043 		bool		bin = cluster->config.disable_binary ? 0 : 1;
1044 		const char *fixed_param_val = NULL;
1045 		int			fixed_param_len, fixed_param_fmt;
1046 		int			part;
1047 
1048 		/* Avoid doing multiple conversions for fixed parameters */
1049 		if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx))
1050 		{
1051 			fixed_param_val = plproxy_send_type(func->arg_types[idx],
1052 												PG_GETARG_DATUM(idx),
1053 												bin,
1054 												&fixed_param_len,
1055 												&fixed_param_fmt);
1056 		}
1057 
1058 		/* Add the parameters to partitions */
1059 		for (part = 0; part < cluster->active_count; part++)
1060 		{
1061 			ProxyConnection *conn = cluster->active_list[part];
1062 
1063 			if (!conn->run_tag)
1064 				continue;
1065 
1066 			if (PG_ARGISNULL(idx))
1067 			{
1068 				conn->param_values[i] = NULL;
1069 				conn->param_lengths[i] = 0;
1070 				conn->param_formats[i] = 0;
1071 			}
1072 			else
1073 			{
1074 				if (IS_SPLIT_ARG(func, idx))
1075 				{
1076 					conn->param_values[i] = plproxy_send_type(func->arg_types[idx],
1077 															  conn->split_params[idx],
1078 															  bin,
1079 															  &conn->param_lengths[i],
1080 															  &conn->param_formats[i]);
1081 				}
1082 				else
1083 				{
1084 					conn->param_values[i] = fixed_param_val;
1085 					conn->param_lengths[i] = fixed_param_len;
1086 					conn->param_formats[i] = fixed_param_fmt;
1087 				}
1088 			}
1089 		}
1090 	}
1091 }
1092 
1093 /* Clean old results and prepare for new one */
1094 void
plproxy_clean_results(ProxyCluster * cluster)1095 plproxy_clean_results(ProxyCluster *cluster)
1096 {
1097 	int					i;
1098 	ProxyConnection	   *conn;
1099 
1100 	if (!cluster)
1101 		return;
1102 
1103 	cluster->ret_total = 0;
1104 	cluster->ret_cur_conn = 0;
1105 
1106 	for (i = 0; i < cluster->active_count; i++)
1107 	{
1108 		conn = cluster->active_list[i];
1109 		if (conn->res)
1110 		{
1111 			PQclear(conn->res);
1112 			conn->res = NULL;
1113 		}
1114 		conn->pos = 0;
1115 		conn->run_tag = 0;
1116 		conn->bstate = NULL;
1117 		conn->cur = NULL;
1118 		cluster->active_list[i] = NULL;
1119 	}
1120 
1121 	/* reset active_list */
1122 	cluster->active_count = 0;
1123 
1124 	/* conn state checks are done in prepare_conn */
1125 }
1126 
1127 /* Drop one connection */
plproxy_disconnect(ProxyConnectionState * cur)1128 void plproxy_disconnect(ProxyConnectionState *cur)
1129 {
1130 	if (cur->db)
1131 		PQfinish(cur->db);
1132 	cur->db = NULL;
1133 	cur->state = C_NONE;
1134 	cur->tuning = 0;
1135 	cur->connect_time = 0;
1136 	cur->query_time = 0;
1137 	cur->same_ver = 0;
1138 	cur->tuning = 0;
1139 	cur->waitCancel = 0;
1140 }
1141 
1142 /* Select partitions and execute query on them */
1143 void
plproxy_exec(ProxyFunction * func,FunctionCallInfo fcinfo)1144 plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
1145 {
1146 	/*
1147 	 * Prepare parameters and run query.  On cancel, send cancel request to
1148 	 * partitions too.
1149 	 */
1150 	PG_TRY();
1151 	{
1152 		func->cur_cluster->busy = true;
1153 		func->cur_cluster->cur_func = func;
1154 
1155 		/* clean old results */
1156 		plproxy_clean_results(func->cur_cluster);
1157 
1158 		/* tag the partitions and prepare per-partition parameters */
1159 		prepare_and_tag_partitions(func, fcinfo);
1160 
1161 		/* prepare the target query parameters */
1162 		prepare_query_parameters(func, fcinfo);
1163 
1164 		remote_execute(func);
1165 
1166 		func->cur_cluster->busy = false;
1167 	}
1168 	PG_CATCH();
1169 	{
1170 		func->cur_cluster->busy = false;
1171 
1172 		if (geterrcode() == ERRCODE_QUERY_CANCELED)
1173 			remote_cancel(func);
1174 
1175 		/* plproxy_remote_error() cannot clean itself, do it here */
1176 		plproxy_clean_results(func->cur_cluster);
1177 
1178 		PG_RE_THROW();
1179 	}
1180 	PG_END_TRY();
1181 }
1182 
1183