1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <utils/fmgrprotos.h>
8 #include <utils/builtins.h>
9 #include <utils/memutils.h>
10 #include <storage/procarray.h>
11 #include <foreign/foreign.h>
12 #include <foreign/fdwapi.h>
13 #include <miscadmin.h>
14 #include <access/reloptions.h>
15 #include <access/htup_details.h>
16 #include <catalog/pg_foreign_server.h>
17 #include <commands/dbcommands.h>
18 #include <nodes/makefuncs.h>
19 #include <nodes/pg_list.h>
20 #include <utils/guc.h>
21 #include <fmgr.h>
22 #include <funcapi.h>
23 
24 #include "export.h"
25 #include "test_utils.h"
26 #include "connection.h"
27 
28 TSConnection *
get_connection()29 get_connection()
30 {
31 	return remote_connection_open_with_options(
32 		"testdb",
33 		list_make4(makeDefElem("user",
34 							   (Node *) makeString(GetUserNameFromId(GetUserId(), false)),
35 							   -1),
36 				   makeDefElem("host", (Node *) makeString("localhost"), -1),
37 				   makeDefElem("dbname", (Node *) makeString(get_database_name(MyDatabaseId)), -1),
38 				   makeDefElem("port",
39 							   (Node *) makeString(pstrdup(GetConfigOption("port", false, false))),
40 							   -1)),
41 		false);
42 }
43 
44 static void
test_options()45 test_options()
46 {
47 	TestAssertTrue(remote_connection_valid_user_option("user"));
48 	TestAssertTrue(!remote_connection_valid_user_option("port"));
49 	TestAssertTrue(!remote_connection_valid_user_option("xxx"));
50 	TestAssertTrue(!remote_connection_valid_user_option("fallback_application_name"));
51 
52 	TestAssertTrue(remote_connection_valid_node_option("port"));
53 	TestAssertTrue(!remote_connection_valid_node_option("user"));
54 	TestAssertTrue(!remote_connection_valid_node_option("xxx"));
55 	TestAssertTrue(!remote_connection_valid_node_option("fallback_application_name"));
56 }
57 
58 static void
test_numbers_associated_with_connections()59 test_numbers_associated_with_connections()
60 {
61 	TSConnection *conn = get_connection();
62 	TestAssertTrue(remote_connection_get_cursor_number() == 1);
63 	TestAssertTrue(remote_connection_get_cursor_number() == 2);
64 	TestAssertTrue(remote_connection_get_cursor_number() == 3);
65 	remote_connection_reset_cursor_number();
66 	TestAssertTrue(remote_connection_get_cursor_number() == 1);
67 	TestAssertTrue(remote_connection_get_cursor_number() == 2);
68 
69 	TestAssertTrue(remote_connection_get_prep_stmt_number() == 1);
70 	TestAssertTrue(remote_connection_get_prep_stmt_number() == 2);
71 	TestAssertTrue(remote_connection_get_prep_stmt_number() == 3);
72 	remote_connection_close(conn);
73 }
74 
75 static void
test_simple_queries()76 test_simple_queries()
77 {
78 	TSConnection *conn = get_connection();
79 	PGresult *res;
80 	remote_connection_exec(conn, "SELECT 1");
81 	remote_connection_exec(conn, "SET search_path = pg_catalog");
82 
83 	res = remote_connection_exec(conn, "SELECT 1");
84 	TestAssertTrue(PQresultStatus(res) == PGRES_TUPLES_OK);
85 	PQclear(res);
86 	res = remote_connection_exec(conn, "SELECT abc");
87 	TestAssertTrue(PQresultStatus(res) != PGRES_TUPLES_OK);
88 	PQclear(res);
89 	res = remote_connection_exec(conn, "SET search_path = pg_catalog");
90 	TestAssertTrue(PQresultStatus(res) == PGRES_COMMAND_OK);
91 	PQclear(res);
92 	res = remote_connection_exec(conn, "SET 123 = 123");
93 	TestAssertTrue(PQresultStatus(res) != PGRES_COMMAND_OK);
94 	PQclear(res);
95 
96 	remote_connection_cmd_ok(conn, "SET search_path = pg_catalog");
97 	/* not a command */
98 	TestEnsureError(remote_connection_cmd_ok(conn, "SELECT 1"));
99 	remote_connection_close(conn);
100 }
101 
102 #define ASSERT_NUM_OPEN_CONNECTIONS(stats, num)                                                    \
103 	TestAssertTrue((((stats)->connections_created - (stats)->connections_closed) == num))
104 #define ASSERT_NUM_OPEN_RESULTS(stats, num)                                                        \
105 	TestAssertTrue((((stats)->results_created - (stats)->results_cleared) == num))
106 
107 static void
test_connection_and_result_leaks()108 test_connection_and_result_leaks()
109 {
110 	TSConnection *conn, *subconn;
111 	PGresult *res;
112 	RemoteConnectionStats *stats;
113 
114 	stats = remote_connection_stats_get();
115 	remote_connection_stats_reset();
116 
117 	conn = get_connection();
118 	res = remote_connection_exec(conn, "SELECT 1");
119 	remote_connection_close(conn);
120 
121 	ASSERT_NUM_OPEN_CONNECTIONS(stats, 0);
122 	ASSERT_NUM_OPEN_RESULTS(stats, 0);
123 
124 	conn = get_connection();
125 
126 	ASSERT_NUM_OPEN_CONNECTIONS(stats, 1);
127 
128 	BeginInternalSubTransaction("conn leak test");
129 
130 	subconn = get_connection();
131 	ASSERT_NUM_OPEN_CONNECTIONS(stats, 2);
132 
133 	remote_connection_exec(conn, "SELECT 1");
134 	ASSERT_NUM_OPEN_RESULTS(stats, 1);
135 
136 	BeginInternalSubTransaction("conn leak test 2");
137 
138 	res = remote_connection_exec(subconn, "SELECT 1");
139 	ASSERT_NUM_OPEN_RESULTS(stats, 2);
140 
141 	/* Explicitly close one result */
142 	remote_result_close(res);
143 
144 	ASSERT_NUM_OPEN_RESULTS(stats, 1);
145 
146 	remote_connection_exec(subconn, "SELECT 1");
147 	remote_connection_exec(conn, "SELECT 1");
148 
149 	ASSERT_NUM_OPEN_RESULTS(stats, 3);
150 
151 	RollbackAndReleaseCurrentSubTransaction();
152 
153 	/* Rollback should have cleared the two results created in the
154 	 * sub-transaction, but not the one created before the sub-transaction */
155 	ASSERT_NUM_OPEN_RESULTS(stats, 1);
156 
157 	remote_connection_exec(subconn, "SELECT 1");
158 	ASSERT_NUM_OPEN_RESULTS(stats, 2);
159 
160 	ReleaseCurrentSubTransaction();
161 
162 	/* Should only leave the original connection created before the first
163 	 * sub-transaction, but no results */
164 	ASSERT_NUM_OPEN_CONNECTIONS(stats, 1);
165 	ASSERT_NUM_OPEN_RESULTS(stats, 0);
166 
167 	remote_connection_stats_reset();
168 }
169 
170 TS_FUNCTION_INFO_V1(ts_test_bad_remote_query);
171 
172 /* Send a bad query that throws an exception without cleaning up connection or
173  * results. Together with get_connection_stats(), this should show that
174  * connections and results are automatically cleaned up. */
175 Datum
ts_test_bad_remote_query(PG_FUNCTION_ARGS)176 ts_test_bad_remote_query(PG_FUNCTION_ARGS)
177 {
178 	TSConnection *conn;
179 	PGresult *result;
180 
181 	conn = get_connection();
182 	result = remote_connection_exec(conn, "BADY QUERY SHOULD THROW ERROR");
183 	TestAssertTrue(PQresultStatus(result) == PGRES_FATAL_ERROR);
184 	elog(ERROR, "bad query error thrown from test");
185 
186 	PG_RETURN_VOID();
187 }
188 
189 enum Anum_connection_stats
190 {
191 	Anum_connection_stats_connections_created = 1,
192 	Anum_connection_stats_connections_closed,
193 	Anum_connection_stats_results_created,
194 	Anum_connection_stats_results_cleared,
195 	Anum_connection_stats_max,
196 };
197 
198 TS_FUNCTION_INFO_V1(ts_test_get_connection_stats);
199 
200 Datum
ts_test_get_connection_stats(PG_FUNCTION_ARGS)201 ts_test_get_connection_stats(PG_FUNCTION_ARGS)
202 {
203 	TupleDesc tupdesc;
204 	RemoteConnectionStats *stats = remote_connection_stats_get();
205 	Datum values[Anum_connection_stats_max];
206 	bool nulls[Anum_connection_stats_max] = { false };
207 	HeapTuple tuple;
208 
209 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
210 		ereport(ERROR,
211 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
212 				 errmsg("function returning record called in context "
213 						"that cannot accept type record")));
214 
215 	values[Anum_connection_stats_connections_created - 1] =
216 		Int64GetDatum((int64) stats->connections_created);
217 	values[Anum_connection_stats_connections_closed - 1] =
218 		Int64GetDatum((int64) stats->connections_closed);
219 	values[Anum_connection_stats_results_created - 1] =
220 		Int64GetDatum((int64) stats->results_created);
221 	values[Anum_connection_stats_results_cleared - 1] =
222 		Int64GetDatum((int64) stats->results_cleared);
223 
224 	tuple = heap_form_tuple(tupdesc, values, nulls);
225 
226 	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
227 }
228 
229 TS_FUNCTION_INFO_V1(ts_test_remote_connection);
230 
231 Datum
ts_test_remote_connection(PG_FUNCTION_ARGS)232 ts_test_remote_connection(PG_FUNCTION_ARGS)
233 {
234 	test_options();
235 	test_numbers_associated_with_connections();
236 	test_simple_queries();
237 	test_connection_and_result_leaks();
238 
239 	PG_RETURN_VOID();
240 }
241 
242 pid_t
remote_connection_get_remote_pid(const TSConnection * conn)243 remote_connection_get_remote_pid(const TSConnection *conn)
244 {
245 	PGresult *res;
246 	char *pid_string;
247 	unsigned long pid_long;
248 
249 	res = PQexec(remote_connection_get_pg_conn(conn), "SELECT pg_backend_pid()");
250 
251 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
252 		return -1;
253 
254 	Assert(1 == PQntuples(res));
255 	Assert(1 == PQnfields(res));
256 
257 	pid_string = PQgetvalue(res, 0, 0);
258 	pid_long = strtol(pid_string, NULL, 10);
259 
260 	PQclear(res);
261 	return pid_long;
262 }
263 
264 char *
remote_connection_get_application_name(const TSConnection * conn)265 remote_connection_get_application_name(const TSConnection *conn)
266 {
267 	PGresult *res;
268 	char *app_name;
269 
270 	res = PQexec(remote_connection_get_pg_conn(conn),
271 				 "SELECT application_name "
272 				 "FROM pg_stat_activity "
273 				 "WHERE pid = pg_backend_pid()");
274 
275 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
276 		return 0;
277 
278 	Assert(1 == PQntuples(res));
279 	Assert(1 == PQnfields(res));
280 
281 	app_name = pstrdup(PQgetvalue(res, 0, 0));
282 
283 	PQclear(res);
284 	return app_name;
285 }
286