1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 
7 /*
8  * This file contains source code that was copied and/or modified from
9  * the PostgreSQL database, which is licensed under the open-source
10  * PostgreSQL License. Please see the NOTICE at the top level
11  * directory for a copy of the PostgreSQL License.
12  */
13 #include <postgres.h>
14 #include <access/htup_details.h>
15 #include <access/sysattr.h>
16 #include <catalog/pg_type.h>
17 #include <libpq-fe.h>
18 #include <miscadmin.h>
19 #include <optimizer/restrictinfo.h>
20 #include <parser/parsetree.h>
21 #include <utils/builtins.h>
22 #include <utils/float.h>
23 #include <utils/guc.h>
24 #include <utils/memutils.h>
25 #include <utils/rel.h>
26 #include <utils/syscache.h>
27 
28 #include <guc.h>
29 #include "utils.h"
30 #include "compat/compat.h"
31 #include "remote/data_format.h"
32 #include "tuplefactory.h"
33 
34 /*
35  * Identify the attribute where data conversion fails.
36  */
37 typedef struct ConversionLocation
38 {
39 	Relation rel;		  /* foreign table's relcache entry. */
40 	AttrNumber cur_attno; /* attribute number being processed, or 0 */
41 
42 	/*
43 	 * In case of foreign join push down, fdw_scan_tlist is used to identify
44 	 * the Var node corresponding to the error location and
45 	 * ss->ps.state gives access to the RTEs of corresponding relation
46 	 * to get the relation name and attribute name.
47 	 */
48 	ScanState *ss;
49 } ConversionLocation;
50 
51 typedef struct TupleFactory
52 {
53 	MemoryContext temp_mctx;
54 	TupleDesc tupdesc;
55 	Datum *values;
56 	bool *nulls;
57 	List *retrieved_attrs;
58 	AttConvInMetadata *attconv;
59 	ConversionLocation errpos;
60 	ErrorContextCallback errcallback;
61 	bool per_tuple_mctx_reset;
62 } TupleFactory;
63 
64 /*
65  * Callback function which is called when error occurs during column value
66  * conversion.  Print names of column and relation.
67  */
68 static void
conversion_error_callback(void * arg)69 conversion_error_callback(void *arg)
70 {
71 	const char *attname = NULL;
72 	const char *relname = NULL;
73 	bool is_wholerow = false;
74 	ConversionLocation *errpos = (ConversionLocation *) arg;
75 
76 	if (errpos->rel)
77 	{
78 		/* error occurred in a scan against a foreign table */
79 		TupleDesc tupdesc = RelationGetDescr(errpos->rel);
80 		Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1);
81 
82 		if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
83 			attname = NameStr(attr->attname);
84 		else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
85 			attname = "ctid";
86 
87 		relname = RelationGetRelationName(errpos->rel);
88 	}
89 	else
90 	{
91 		/* error occurred in a scan against a foreign join */
92 		ScanState *ss = errpos->ss;
93 		ForeignScan *fsplan;
94 		EState *estate = ss->ps.state;
95 		TargetEntry *tle;
96 
97 		if (IsA(ss->ps.plan, ForeignScan))
98 			fsplan = (ForeignScan *) ss->ps.plan;
99 		else if (IsA(ss->ps.plan, CustomScan))
100 		{
101 			CustomScan *csplan = (CustomScan *) ss->ps.plan;
102 
103 			fsplan = linitial(csplan->custom_private);
104 		}
105 		else
106 			elog(ERROR, "unknown scan node type %u in error callback", nodeTag(ss->ps.plan));
107 
108 		tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, errpos->cur_attno - 1);
109 
110 		/*
111 		 * Target list can have Vars and expressions.  For Vars, we can get
112 		 * its relation, however for expressions we can't.  Thus for
113 		 * expressions, just show generic context message.
114 		 */
115 		if (IsA(tle->expr, Var))
116 		{
117 			RangeTblEntry *rte;
118 			Var *var = (Var *) tle->expr;
119 
120 			rte = rt_fetch(var->varno, estate->es_range_table);
121 
122 			if (var->varattno == 0)
123 				is_wholerow = true;
124 			else
125 				attname = get_attname(rte->relid, var->varattno, false);
126 
127 			relname = get_rel_name(rte->relid);
128 		}
129 		else
130 			errcontext("processing expression at position %d in select list", errpos->cur_attno);
131 	}
132 
133 	if (relname)
134 	{
135 		if (is_wholerow)
136 			errcontext("whole-row reference to foreign table \"%s\"", relname);
137 		else if (attname)
138 			errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
139 	}
140 }
141 
142 static TupleFactory *
tuplefactory_create_common(TupleDesc tupdesc,List * retrieved_attrs,bool force_text)143 tuplefactory_create_common(TupleDesc tupdesc, List *retrieved_attrs, bool force_text)
144 {
145 	TupleFactory *tf = palloc0(sizeof(TupleFactory));
146 
147 	tf->temp_mctx = AllocSetContextCreate(CurrentMemoryContext,
148 										  "tuple factory temporary data",
149 										  ALLOCSET_DEFAULT_SIZES);
150 
151 	tf->tupdesc = tupdesc;
152 	tf->retrieved_attrs = retrieved_attrs;
153 	tf->attconv = data_format_create_att_conv_in_metadata(tf->tupdesc, force_text);
154 	tf->values = (Datum *) palloc0(tf->tupdesc->natts * sizeof(Datum));
155 	tf->nulls = (bool *) palloc(tf->tupdesc->natts * sizeof(bool));
156 
157 	/* Initialize to nulls for any columns not present in result */
158 	memset(tf->nulls, true, tf->tupdesc->natts * sizeof(bool));
159 
160 	return tf;
161 }
162 
163 TupleFactory *
tuplefactory_create_for_tupdesc(TupleDesc tupdesc,bool force_text)164 tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text)
165 {
166 	List *retrieved_attrs = NIL;
167 	int i;
168 
169 	for (i = 0; i < tupdesc->natts; i++)
170 	{
171 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
172 			retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
173 	}
174 
175 	return tuplefactory_create_common(tupdesc, retrieved_attrs, force_text);
176 }
177 
178 static TupleFactory *
tuplefactory_create(Relation rel,ScanState * ss,List * retrieved_attrs)179 tuplefactory_create(Relation rel, ScanState *ss, List *retrieved_attrs)
180 {
181 	TupleFactory *tf;
182 	TupleDesc tupdesc;
183 
184 	Assert(!(rel && ss) && (rel || ss));
185 
186 	if (NULL != rel)
187 		tupdesc = RelationGetDescr(rel);
188 	else
189 		tupdesc = ss->ss_ScanTupleSlot->tts_tupleDescriptor;
190 
191 	tf =
192 		tuplefactory_create_common(tupdesc, retrieved_attrs, !ts_guc_enable_connection_binary_data);
193 	tf->errpos.rel = rel;
194 	tf->errpos.cur_attno = 0;
195 	tf->errpos.ss = ss;
196 	tf->errcallback.callback = conversion_error_callback;
197 	tf->errcallback.arg = (void *) &tf->errpos;
198 	tf->errcallback.previous = error_context_stack;
199 	tf->per_tuple_mctx_reset = true;
200 
201 	return tf;
202 }
203 
204 TupleFactory *
tuplefactory_create_for_rel(Relation rel,List * retrieved_attrs)205 tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs)
206 {
207 	return tuplefactory_create(rel, NULL, retrieved_attrs);
208 }
209 
210 TupleFactory *
tuplefactory_create_for_scan(ScanState * ss,List * retrieved_attrs)211 tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs)
212 {
213 	return tuplefactory_create(NULL, ss, retrieved_attrs);
214 }
215 
216 bool
tuplefactory_is_binary(TupleFactory * tf)217 tuplefactory_is_binary(TupleFactory *tf)
218 {
219 	return tf->attconv->binary;
220 }
221 
222 void
tuplefactory_set_per_tuple_mctx_reset(TupleFactory * tf,bool reset)223 tuplefactory_set_per_tuple_mctx_reset(TupleFactory *tf, bool reset)
224 {
225 	tf->per_tuple_mctx_reset = reset;
226 }
227 
228 void
tuplefactory_reset_mctx(TupleFactory * tf)229 tuplefactory_reset_mctx(TupleFactory *tf)
230 {
231 	MemoryContextReset(tf->temp_mctx);
232 }
233 
234 HeapTuple
tuplefactory_make_tuple(TupleFactory * tf,PGresult * res,int row,int format)235 tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format)
236 {
237 	HeapTuple tuple;
238 	ItemPointer ctid = NULL;
239 	MemoryContext oldcontext;
240 	ListCell *lc;
241 	int j;
242 	StringInfo buf;
243 
244 	Assert(row < PQntuples(res));
245 
246 	/*
247 	 * Do the following work in a temp context that we reset after each tuple.
248 	 * This cleans up not only the data we have direct access to, but any
249 	 * cruft the I/O functions might leak.
250 	 */
251 	oldcontext = MemoryContextSwitchTo(tf->temp_mctx);
252 	buf = makeStringInfo();
253 
254 	/* Install error callback */
255 	if (tf->errcallback.callback != NULL)
256 	{
257 		tf->errcallback.previous = error_context_stack;
258 		error_context_stack = &tf->errcallback;
259 	}
260 
261 	/*
262 	 * i indexes columns in the relation, j indexes columns in the PGresult.
263 	 */
264 	j = 0;
265 	foreach (lc, tf->retrieved_attrs)
266 	{
267 		int i = lfirst_int(lc);
268 		char *valstr;
269 
270 		resetStringInfo(buf);
271 
272 		buf->len = PQgetlength(res, row, j);
273 		/* we assume that value is NULL is length is 0 */
274 		if (buf->len == 0)
275 			valstr = NULL;
276 		else
277 		{
278 			valstr = PQgetvalue(res, row, j);
279 			buf->data = valstr;
280 		}
281 
282 		/*
283 		 * convert value to internal representation
284 		 *
285 		 * Note: we ignore system columns other than ctid and oid in result
286 		 */
287 		tf->errpos.cur_attno = i;
288 
289 		if (i > 0)
290 		{
291 			/* ordinary column */
292 			Assert(i <= tf->tupdesc->natts);
293 			tf->nulls[i - 1] = (valstr == NULL);
294 
295 			if (format == FORMAT_TEXT)
296 			{
297 				Assert(!tf->attconv->binary);
298 				/* Apply the input function even to nulls, to support domains */
299 				tf->values[i - 1] = InputFunctionCall(&tf->attconv->conv_funcs[i - 1],
300 													  valstr,
301 													  tf->attconv->ioparams[i - 1],
302 													  tf->attconv->typmods[i - 1]);
303 			}
304 			else
305 			{
306 				Assert(tf->attconv->binary);
307 				if (valstr != NULL)
308 					tf->values[i - 1] = ReceiveFunctionCall(&tf->attconv->conv_funcs[i - 1],
309 															buf,
310 															tf->attconv->ioparams[i - 1],
311 															tf->attconv->typmods[i - 1]);
312 				else
313 					tf->values[i - 1] = PointerGetDatum(NULL);
314 			}
315 		}
316 		else if (i == SelfItemPointerAttributeNumber)
317 		{
318 			/* ctid */
319 			if (valstr != NULL)
320 			{
321 				Datum datum;
322 				if (format == FORMAT_TEXT)
323 					datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
324 				else
325 					datum = DirectFunctionCall1(tidrecv, PointerGetDatum(buf));
326 				ctid = (ItemPointer) DatumGetPointer(datum);
327 			}
328 		}
329 		tf->errpos.cur_attno = 0;
330 		j++;
331 	}
332 
333 	/* Uninstall error context callback. */
334 	if (tf->errcallback.callback != NULL)
335 		error_context_stack = tf->errcallback.previous;
336 
337 	/*
338 	 * Check we got the expected number of columns.  Note: j == 0 and
339 	 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
340 	 */
341 	if (j > 0 && j != PQnfields(res))
342 		elog(ERROR, "remote query result does not match the foreign table");
343 
344 	/*
345 	 * Build the result tuple in caller's memory context.
346 	 */
347 	MemoryContextSwitchTo(oldcontext);
348 
349 	tuple = heap_form_tuple(tf->tupdesc, tf->values, tf->nulls);
350 
351 	/*
352 	 * If we have a CTID to return, install it in both t_self and t_ctid.
353 	 * t_self is the normal place, but if the tuple is converted to a
354 	 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
355 	 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
356 	 */
357 	if (ctid)
358 		tuple->t_self = tuple->t_data->t_ctid = *ctid;
359 
360 	/*
361 	 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
362 	 * heap_form_tuple.  heap_form_tuple actually creates the tuple with
363 	 * DatumTupleFields, not HeapTupleFields, but the executor expects
364 	 * HeapTupleFields and will happily extract system columns on that
365 	 * assumption.  If we don't do this then, for example, the tuple length
366 	 * ends up in the xmin field, which isn't what we want.
367 	 */
368 	HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
369 	HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
370 	HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
371 
372 	/* Clean up */
373 	if (tf->per_tuple_mctx_reset)
374 		MemoryContextReset(tf->temp_mctx);
375 
376 	return tuple;
377 }
378