1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6
7 /*
8 * This file contains source code that was copied and/or modified from
9 * the PostgreSQL database, which is licensed under the open-source
10 * PostgreSQL License. Please see the NOTICE at the top level
11 * directory for a copy of the PostgreSQL License.
12 */
13 #include <postgres.h>
14 #include <access/htup_details.h>
15 #include <access/sysattr.h>
16 #include <catalog/pg_type.h>
17 #include <libpq-fe.h>
18 #include <miscadmin.h>
19 #include <optimizer/restrictinfo.h>
20 #include <parser/parsetree.h>
21 #include <utils/builtins.h>
22 #include <utils/float.h>
23 #include <utils/guc.h>
24 #include <utils/memutils.h>
25 #include <utils/rel.h>
26 #include <utils/syscache.h>
27
28 #include <guc.h>
29 #include "utils.h"
30 #include "compat/compat.h"
31 #include "remote/data_format.h"
32 #include "tuplefactory.h"
33
34 /*
35 * Identify the attribute where data conversion fails.
36 */
37 typedef struct ConversionLocation
38 {
39 Relation rel; /* foreign table's relcache entry. */
40 AttrNumber cur_attno; /* attribute number being processed, or 0 */
41
42 /*
43 * In case of foreign join push down, fdw_scan_tlist is used to identify
44 * the Var node corresponding to the error location and
45 * ss->ps.state gives access to the RTEs of corresponding relation
46 * to get the relation name and attribute name.
47 */
48 ScanState *ss;
49 } ConversionLocation;
50
51 typedef struct TupleFactory
52 {
53 MemoryContext temp_mctx;
54 TupleDesc tupdesc;
55 Datum *values;
56 bool *nulls;
57 List *retrieved_attrs;
58 AttConvInMetadata *attconv;
59 ConversionLocation errpos;
60 ErrorContextCallback errcallback;
61 bool per_tuple_mctx_reset;
62 } TupleFactory;
63
64 /*
65 * Callback function which is called when error occurs during column value
66 * conversion. Print names of column and relation.
67 */
68 static void
conversion_error_callback(void * arg)69 conversion_error_callback(void *arg)
70 {
71 const char *attname = NULL;
72 const char *relname = NULL;
73 bool is_wholerow = false;
74 ConversionLocation *errpos = (ConversionLocation *) arg;
75
76 if (errpos->rel)
77 {
78 /* error occurred in a scan against a foreign table */
79 TupleDesc tupdesc = RelationGetDescr(errpos->rel);
80 Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1);
81
82 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
83 attname = NameStr(attr->attname);
84 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
85 attname = "ctid";
86
87 relname = RelationGetRelationName(errpos->rel);
88 }
89 else
90 {
91 /* error occurred in a scan against a foreign join */
92 ScanState *ss = errpos->ss;
93 ForeignScan *fsplan;
94 EState *estate = ss->ps.state;
95 TargetEntry *tle;
96
97 if (IsA(ss->ps.plan, ForeignScan))
98 fsplan = (ForeignScan *) ss->ps.plan;
99 else if (IsA(ss->ps.plan, CustomScan))
100 {
101 CustomScan *csplan = (CustomScan *) ss->ps.plan;
102
103 fsplan = linitial(csplan->custom_private);
104 }
105 else
106 elog(ERROR, "unknown scan node type %u in error callback", nodeTag(ss->ps.plan));
107
108 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, errpos->cur_attno - 1);
109
110 /*
111 * Target list can have Vars and expressions. For Vars, we can get
112 * its relation, however for expressions we can't. Thus for
113 * expressions, just show generic context message.
114 */
115 if (IsA(tle->expr, Var))
116 {
117 RangeTblEntry *rte;
118 Var *var = (Var *) tle->expr;
119
120 rte = rt_fetch(var->varno, estate->es_range_table);
121
122 if (var->varattno == 0)
123 is_wholerow = true;
124 else
125 attname = get_attname(rte->relid, var->varattno, false);
126
127 relname = get_rel_name(rte->relid);
128 }
129 else
130 errcontext("processing expression at position %d in select list", errpos->cur_attno);
131 }
132
133 if (relname)
134 {
135 if (is_wholerow)
136 errcontext("whole-row reference to foreign table \"%s\"", relname);
137 else if (attname)
138 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
139 }
140 }
141
142 static TupleFactory *
tuplefactory_create_common(TupleDesc tupdesc,List * retrieved_attrs,bool force_text)143 tuplefactory_create_common(TupleDesc tupdesc, List *retrieved_attrs, bool force_text)
144 {
145 TupleFactory *tf = palloc0(sizeof(TupleFactory));
146
147 tf->temp_mctx = AllocSetContextCreate(CurrentMemoryContext,
148 "tuple factory temporary data",
149 ALLOCSET_DEFAULT_SIZES);
150
151 tf->tupdesc = tupdesc;
152 tf->retrieved_attrs = retrieved_attrs;
153 tf->attconv = data_format_create_att_conv_in_metadata(tf->tupdesc, force_text);
154 tf->values = (Datum *) palloc0(tf->tupdesc->natts * sizeof(Datum));
155 tf->nulls = (bool *) palloc(tf->tupdesc->natts * sizeof(bool));
156
157 /* Initialize to nulls for any columns not present in result */
158 memset(tf->nulls, true, tf->tupdesc->natts * sizeof(bool));
159
160 return tf;
161 }
162
163 TupleFactory *
tuplefactory_create_for_tupdesc(TupleDesc tupdesc,bool force_text)164 tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text)
165 {
166 List *retrieved_attrs = NIL;
167 int i;
168
169 for (i = 0; i < tupdesc->natts; i++)
170 {
171 if (!TupleDescAttr(tupdesc, i)->attisdropped)
172 retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
173 }
174
175 return tuplefactory_create_common(tupdesc, retrieved_attrs, force_text);
176 }
177
178 static TupleFactory *
tuplefactory_create(Relation rel,ScanState * ss,List * retrieved_attrs)179 tuplefactory_create(Relation rel, ScanState *ss, List *retrieved_attrs)
180 {
181 TupleFactory *tf;
182 TupleDesc tupdesc;
183
184 Assert(!(rel && ss) && (rel || ss));
185
186 if (NULL != rel)
187 tupdesc = RelationGetDescr(rel);
188 else
189 tupdesc = ss->ss_ScanTupleSlot->tts_tupleDescriptor;
190
191 tf =
192 tuplefactory_create_common(tupdesc, retrieved_attrs, !ts_guc_enable_connection_binary_data);
193 tf->errpos.rel = rel;
194 tf->errpos.cur_attno = 0;
195 tf->errpos.ss = ss;
196 tf->errcallback.callback = conversion_error_callback;
197 tf->errcallback.arg = (void *) &tf->errpos;
198 tf->errcallback.previous = error_context_stack;
199 tf->per_tuple_mctx_reset = true;
200
201 return tf;
202 }
203
204 TupleFactory *
tuplefactory_create_for_rel(Relation rel,List * retrieved_attrs)205 tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs)
206 {
207 return tuplefactory_create(rel, NULL, retrieved_attrs);
208 }
209
210 TupleFactory *
tuplefactory_create_for_scan(ScanState * ss,List * retrieved_attrs)211 tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs)
212 {
213 return tuplefactory_create(NULL, ss, retrieved_attrs);
214 }
215
216 bool
tuplefactory_is_binary(TupleFactory * tf)217 tuplefactory_is_binary(TupleFactory *tf)
218 {
219 return tf->attconv->binary;
220 }
221
222 void
tuplefactory_set_per_tuple_mctx_reset(TupleFactory * tf,bool reset)223 tuplefactory_set_per_tuple_mctx_reset(TupleFactory *tf, bool reset)
224 {
225 tf->per_tuple_mctx_reset = reset;
226 }
227
228 void
tuplefactory_reset_mctx(TupleFactory * tf)229 tuplefactory_reset_mctx(TupleFactory *tf)
230 {
231 MemoryContextReset(tf->temp_mctx);
232 }
233
234 HeapTuple
tuplefactory_make_tuple(TupleFactory * tf,PGresult * res,int row,int format)235 tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format)
236 {
237 HeapTuple tuple;
238 ItemPointer ctid = NULL;
239 MemoryContext oldcontext;
240 ListCell *lc;
241 int j;
242 StringInfo buf;
243
244 Assert(row < PQntuples(res));
245
246 /*
247 * Do the following work in a temp context that we reset after each tuple.
248 * This cleans up not only the data we have direct access to, but any
249 * cruft the I/O functions might leak.
250 */
251 oldcontext = MemoryContextSwitchTo(tf->temp_mctx);
252 buf = makeStringInfo();
253
254 /* Install error callback */
255 if (tf->errcallback.callback != NULL)
256 {
257 tf->errcallback.previous = error_context_stack;
258 error_context_stack = &tf->errcallback;
259 }
260
261 /*
262 * i indexes columns in the relation, j indexes columns in the PGresult.
263 */
264 j = 0;
265 foreach (lc, tf->retrieved_attrs)
266 {
267 int i = lfirst_int(lc);
268 char *valstr;
269
270 resetStringInfo(buf);
271
272 buf->len = PQgetlength(res, row, j);
273 /* we assume that value is NULL is length is 0 */
274 if (buf->len == 0)
275 valstr = NULL;
276 else
277 {
278 valstr = PQgetvalue(res, row, j);
279 buf->data = valstr;
280 }
281
282 /*
283 * convert value to internal representation
284 *
285 * Note: we ignore system columns other than ctid and oid in result
286 */
287 tf->errpos.cur_attno = i;
288
289 if (i > 0)
290 {
291 /* ordinary column */
292 Assert(i <= tf->tupdesc->natts);
293 tf->nulls[i - 1] = (valstr == NULL);
294
295 if (format == FORMAT_TEXT)
296 {
297 Assert(!tf->attconv->binary);
298 /* Apply the input function even to nulls, to support domains */
299 tf->values[i - 1] = InputFunctionCall(&tf->attconv->conv_funcs[i - 1],
300 valstr,
301 tf->attconv->ioparams[i - 1],
302 tf->attconv->typmods[i - 1]);
303 }
304 else
305 {
306 Assert(tf->attconv->binary);
307 if (valstr != NULL)
308 tf->values[i - 1] = ReceiveFunctionCall(&tf->attconv->conv_funcs[i - 1],
309 buf,
310 tf->attconv->ioparams[i - 1],
311 tf->attconv->typmods[i - 1]);
312 else
313 tf->values[i - 1] = PointerGetDatum(NULL);
314 }
315 }
316 else if (i == SelfItemPointerAttributeNumber)
317 {
318 /* ctid */
319 if (valstr != NULL)
320 {
321 Datum datum;
322 if (format == FORMAT_TEXT)
323 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
324 else
325 datum = DirectFunctionCall1(tidrecv, PointerGetDatum(buf));
326 ctid = (ItemPointer) DatumGetPointer(datum);
327 }
328 }
329 tf->errpos.cur_attno = 0;
330 j++;
331 }
332
333 /* Uninstall error context callback. */
334 if (tf->errcallback.callback != NULL)
335 error_context_stack = tf->errcallback.previous;
336
337 /*
338 * Check we got the expected number of columns. Note: j == 0 and
339 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
340 */
341 if (j > 0 && j != PQnfields(res))
342 elog(ERROR, "remote query result does not match the foreign table");
343
344 /*
345 * Build the result tuple in caller's memory context.
346 */
347 MemoryContextSwitchTo(oldcontext);
348
349 tuple = heap_form_tuple(tf->tupdesc, tf->values, tf->nulls);
350
351 /*
352 * If we have a CTID to return, install it in both t_self and t_ctid.
353 * t_self is the normal place, but if the tuple is converted to a
354 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
355 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
356 */
357 if (ctid)
358 tuple->t_self = tuple->t_data->t_ctid = *ctid;
359
360 /*
361 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
362 * heap_form_tuple. heap_form_tuple actually creates the tuple with
363 * DatumTupleFields, not HeapTupleFields, but the executor expects
364 * HeapTupleFields and will happily extract system columns on that
365 * assumption. If we don't do this then, for example, the tuple length
366 * ends up in the xmin field, which isn't what we want.
367 */
368 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
369 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
370 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
371
372 /* Clean up */
373 if (tf->per_tuple_mctx_reset)
374 MemoryContextReset(tf->temp_mctx);
375
376 return tuple;
377 }
378