1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <utils/fmgrprotos.h>
8 #include <utils/builtins.h>
9 #include <utils/memutils.h>
10 #include <storage/procarray.h>
11 #include <foreign/foreign.h>
12 #include <foreign/fdwapi.h>
13 #include <miscadmin.h>
14 #include <access/reloptions.h>
15 #include <access/htup_details.h>
16 #include <catalog/pg_foreign_server.h>
17 #include <commands/dbcommands.h>
18 #include <nodes/makefuncs.h>
19 #include <nodes/pg_list.h>
20 #include <utils/guc.h>
21 #include <fmgr.h>
22 #include <funcapi.h>
23
24 #include "export.h"
25 #include "test_utils.h"
26 #include "connection.h"
27
28 TSConnection *
get_connection()29 get_connection()
30 {
31 return remote_connection_open_with_options(
32 "testdb",
33 list_make4(makeDefElem("user",
34 (Node *) makeString(GetUserNameFromId(GetUserId(), false)),
35 -1),
36 makeDefElem("host", (Node *) makeString("localhost"), -1),
37 makeDefElem("dbname", (Node *) makeString(get_database_name(MyDatabaseId)), -1),
38 makeDefElem("port",
39 (Node *) makeString(pstrdup(GetConfigOption("port", false, false))),
40 -1)),
41 false);
42 }
43
44 static void
test_options()45 test_options()
46 {
47 TestAssertTrue(remote_connection_valid_user_option("user"));
48 TestAssertTrue(!remote_connection_valid_user_option("port"));
49 TestAssertTrue(!remote_connection_valid_user_option("xxx"));
50 TestAssertTrue(!remote_connection_valid_user_option("fallback_application_name"));
51
52 TestAssertTrue(remote_connection_valid_node_option("port"));
53 TestAssertTrue(!remote_connection_valid_node_option("user"));
54 TestAssertTrue(!remote_connection_valid_node_option("xxx"));
55 TestAssertTrue(!remote_connection_valid_node_option("fallback_application_name"));
56 }
57
58 static void
test_numbers_associated_with_connections()59 test_numbers_associated_with_connections()
60 {
61 TSConnection *conn = get_connection();
62 TestAssertTrue(remote_connection_get_cursor_number() == 1);
63 TestAssertTrue(remote_connection_get_cursor_number() == 2);
64 TestAssertTrue(remote_connection_get_cursor_number() == 3);
65 remote_connection_reset_cursor_number();
66 TestAssertTrue(remote_connection_get_cursor_number() == 1);
67 TestAssertTrue(remote_connection_get_cursor_number() == 2);
68
69 TestAssertTrue(remote_connection_get_prep_stmt_number() == 1);
70 TestAssertTrue(remote_connection_get_prep_stmt_number() == 2);
71 TestAssertTrue(remote_connection_get_prep_stmt_number() == 3);
72 remote_connection_close(conn);
73 }
74
75 static void
test_simple_queries()76 test_simple_queries()
77 {
78 TSConnection *conn = get_connection();
79 PGresult *res;
80 remote_connection_exec(conn, "SELECT 1");
81 remote_connection_exec(conn, "SET search_path = pg_catalog");
82
83 res = remote_connection_exec(conn, "SELECT 1");
84 TestAssertTrue(PQresultStatus(res) == PGRES_TUPLES_OK);
85 PQclear(res);
86 res = remote_connection_exec(conn, "SELECT abc");
87 TestAssertTrue(PQresultStatus(res) != PGRES_TUPLES_OK);
88 PQclear(res);
89 res = remote_connection_exec(conn, "SET search_path = pg_catalog");
90 TestAssertTrue(PQresultStatus(res) == PGRES_COMMAND_OK);
91 PQclear(res);
92 res = remote_connection_exec(conn, "SET 123 = 123");
93 TestAssertTrue(PQresultStatus(res) != PGRES_COMMAND_OK);
94 PQclear(res);
95
96 remote_connection_cmd_ok(conn, "SET search_path = pg_catalog");
97 /* not a command */
98 TestEnsureError(remote_connection_cmd_ok(conn, "SELECT 1"));
99 remote_connection_close(conn);
100 }
101
102 #define ASSERT_NUM_OPEN_CONNECTIONS(stats, num) \
103 TestAssertTrue((((stats)->connections_created - (stats)->connections_closed) == num))
104 #define ASSERT_NUM_OPEN_RESULTS(stats, num) \
105 TestAssertTrue((((stats)->results_created - (stats)->results_cleared) == num))
106
107 static void
test_connection_and_result_leaks()108 test_connection_and_result_leaks()
109 {
110 TSConnection *conn, *subconn;
111 PGresult *res;
112 RemoteConnectionStats *stats;
113
114 stats = remote_connection_stats_get();
115 remote_connection_stats_reset();
116
117 conn = get_connection();
118 res = remote_connection_exec(conn, "SELECT 1");
119 remote_connection_close(conn);
120
121 ASSERT_NUM_OPEN_CONNECTIONS(stats, 0);
122 ASSERT_NUM_OPEN_RESULTS(stats, 0);
123
124 conn = get_connection();
125
126 ASSERT_NUM_OPEN_CONNECTIONS(stats, 1);
127
128 BeginInternalSubTransaction("conn leak test");
129
130 subconn = get_connection();
131 ASSERT_NUM_OPEN_CONNECTIONS(stats, 2);
132
133 remote_connection_exec(conn, "SELECT 1");
134 ASSERT_NUM_OPEN_RESULTS(stats, 1);
135
136 BeginInternalSubTransaction("conn leak test 2");
137
138 res = remote_connection_exec(subconn, "SELECT 1");
139 ASSERT_NUM_OPEN_RESULTS(stats, 2);
140
141 /* Explicitly close one result */
142 remote_result_close(res);
143
144 ASSERT_NUM_OPEN_RESULTS(stats, 1);
145
146 remote_connection_exec(subconn, "SELECT 1");
147 remote_connection_exec(conn, "SELECT 1");
148
149 ASSERT_NUM_OPEN_RESULTS(stats, 3);
150
151 RollbackAndReleaseCurrentSubTransaction();
152
153 /* Rollback should have cleared the two results created in the
154 * sub-transaction, but not the one created before the sub-transaction */
155 ASSERT_NUM_OPEN_RESULTS(stats, 1);
156
157 remote_connection_exec(subconn, "SELECT 1");
158 ASSERT_NUM_OPEN_RESULTS(stats, 2);
159
160 ReleaseCurrentSubTransaction();
161
162 /* Should only leave the original connection created before the first
163 * sub-transaction, but no results */
164 ASSERT_NUM_OPEN_CONNECTIONS(stats, 1);
165 ASSERT_NUM_OPEN_RESULTS(stats, 0);
166
167 remote_connection_stats_reset();
168 }
169
170 TS_FUNCTION_INFO_V1(ts_test_bad_remote_query);
171
172 /* Send a bad query that throws an exception without cleaning up connection or
173 * results. Together with get_connection_stats(), this should show that
174 * connections and results are automatically cleaned up. */
175 Datum
ts_test_bad_remote_query(PG_FUNCTION_ARGS)176 ts_test_bad_remote_query(PG_FUNCTION_ARGS)
177 {
178 TSConnection *conn;
179 PGresult *result;
180
181 conn = get_connection();
182 result = remote_connection_exec(conn, "BADY QUERY SHOULD THROW ERROR");
183 TestAssertTrue(PQresultStatus(result) == PGRES_FATAL_ERROR);
184 elog(ERROR, "bad query error thrown from test");
185
186 PG_RETURN_VOID();
187 }
188
189 enum Anum_connection_stats
190 {
191 Anum_connection_stats_connections_created = 1,
192 Anum_connection_stats_connections_closed,
193 Anum_connection_stats_results_created,
194 Anum_connection_stats_results_cleared,
195 Anum_connection_stats_max,
196 };
197
198 TS_FUNCTION_INFO_V1(ts_test_get_connection_stats);
199
200 Datum
ts_test_get_connection_stats(PG_FUNCTION_ARGS)201 ts_test_get_connection_stats(PG_FUNCTION_ARGS)
202 {
203 TupleDesc tupdesc;
204 RemoteConnectionStats *stats = remote_connection_stats_get();
205 Datum values[Anum_connection_stats_max];
206 bool nulls[Anum_connection_stats_max] = { false };
207 HeapTuple tuple;
208
209 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
210 ereport(ERROR,
211 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
212 errmsg("function returning record called in context "
213 "that cannot accept type record")));
214
215 values[Anum_connection_stats_connections_created - 1] =
216 Int64GetDatum((int64) stats->connections_created);
217 values[Anum_connection_stats_connections_closed - 1] =
218 Int64GetDatum((int64) stats->connections_closed);
219 values[Anum_connection_stats_results_created - 1] =
220 Int64GetDatum((int64) stats->results_created);
221 values[Anum_connection_stats_results_cleared - 1] =
222 Int64GetDatum((int64) stats->results_cleared);
223
224 tuple = heap_form_tuple(tupdesc, values, nulls);
225
226 PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
227 }
228
229 TS_FUNCTION_INFO_V1(ts_test_remote_connection);
230
231 Datum
ts_test_remote_connection(PG_FUNCTION_ARGS)232 ts_test_remote_connection(PG_FUNCTION_ARGS)
233 {
234 test_options();
235 test_numbers_associated_with_connections();
236 test_simple_queries();
237 test_connection_and_result_leaks();
238
239 PG_RETURN_VOID();
240 }
241
242 pid_t
remote_connection_get_remote_pid(const TSConnection * conn)243 remote_connection_get_remote_pid(const TSConnection *conn)
244 {
245 PGresult *res;
246 char *pid_string;
247 unsigned long pid_long;
248
249 res = PQexec(remote_connection_get_pg_conn(conn), "SELECT pg_backend_pid()");
250
251 if (PQresultStatus(res) != PGRES_TUPLES_OK)
252 return -1;
253
254 Assert(1 == PQntuples(res));
255 Assert(1 == PQnfields(res));
256
257 pid_string = PQgetvalue(res, 0, 0);
258 pid_long = strtol(pid_string, NULL, 10);
259
260 PQclear(res);
261 return pid_long;
262 }
263
264 char *
remote_connection_get_application_name(const TSConnection * conn)265 remote_connection_get_application_name(const TSConnection *conn)
266 {
267 PGresult *res;
268 char *app_name;
269
270 res = PQexec(remote_connection_get_pg_conn(conn),
271 "SELECT application_name "
272 "FROM pg_stat_activity "
273 "WHERE pid = pg_backend_pid()");
274
275 if (PQresultStatus(res) != PGRES_TUPLES_OK)
276 return 0;
277
278 Assert(1 == PQntuples(res));
279 Assert(1 == PQnfields(res));
280
281 app_name = pstrdup(PQgetvalue(res, 0, 0));
282
283 PQclear(res);
284 return app_name;
285 }
286