1 /*-------------------------------------------------------------------------
2 *
3 * copyfrom.c
4 * COPY <table> FROM file/program/client
5 *
6 * This file contains routines needed to efficiently load tuples into a
7 * table. That includes looking up the correct partition, firing triggers,
8 * calling the table AM function to insert the data, and updating indexes.
9 * Reading data from the input file or client and parsing it into Datums
10 * is handled in copyfromparse.c.
11 *
12 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
13 * Portions Copyright (c) 1994, Regents of the University of California
14 *
15 *
16 * IDENTIFICATION
17 * src/backend/commands/copyfrom.c
18 *
19 *-------------------------------------------------------------------------
20 */
21 #include "postgres.h"
22
23 #include <ctype.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/heapam.h"
28 #include "access/htup_details.h"
29 #include "access/tableam.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "catalog/namespace.h"
33 #include "commands/copy.h"
34 #include "commands/copyfrom_internal.h"
35 #include "commands/progress.h"
36 #include "commands/trigger.h"
37 #include "executor/execPartition.h"
38 #include "executor/executor.h"
39 #include "executor/nodeModifyTable.h"
40 #include "executor/tuptable.h"
41 #include "foreign/fdwapi.h"
42 #include "libpq/libpq.h"
43 #include "libpq/pqformat.h"
44 #include "miscadmin.h"
45 #include "optimizer/optimizer.h"
46 #include "pgstat.h"
47 #include "rewrite/rewriteHandler.h"
48 #include "storage/fd.h"
49 #include "tcop/tcopprot.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/portal.h"
53 #include "utils/rel.h"
54 #include "utils/snapmgr.h"
55
56 /*
57 * No more than this many tuples per CopyMultiInsertBuffer
58 *
59 * Caution: Don't make this too big, as we could end up with this many
60 * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
61 * multiInsertBuffers list. Increasing this can cause quadratic growth in
62 * memory requirements during copies into partitioned tables with a large
63 * number of partitions.
64 */
65 #define MAX_BUFFERED_TUPLES 1000
66
67 /*
68 * Flush buffers if there are >= this many bytes, as counted by the input
69 * size, of tuples stored.
70 */
71 #define MAX_BUFFERED_BYTES 65535
72
73 /* Trim the list of buffers back down to this number after flushing */
74 #define MAX_PARTITION_BUFFERS 32
75
76 /* Stores multi-insert data related to a single relation in CopyFrom. */
77 typedef struct CopyMultiInsertBuffer
78 {
79 TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
80 ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
81 BulkInsertState bistate; /* BulkInsertState for this rel */
82 int nused; /* number of 'slots' containing tuples */
83 uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
84 * stream */
85 } CopyMultiInsertBuffer;
86
87 /*
88 * Stores one or many CopyMultiInsertBuffers and details about the size and
89 * number of tuples which are stored in them. This allows multiple buffers to
90 * exist at once when COPYing into a partitioned table.
91 */
92 typedef struct CopyMultiInsertInfo
93 {
94 List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
95 int bufferedTuples; /* number of tuples buffered over all buffers */
96 int bufferedBytes; /* number of bytes from all buffered tuples */
97 CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
98 EState *estate; /* Executor state used for COPY */
99 CommandId mycid; /* Command Id used for COPY */
100 int ti_options; /* table insert options */
101 } CopyMultiInsertInfo;
102
103
104 /* non-export function prototypes */
105 static char *limit_printout_length(const char *str);
106
107 static void ClosePipeFromProgram(CopyFromState cstate);
108
109 /*
110 * error context callback for COPY FROM
111 *
112 * The argument for the error context must be CopyFromState.
113 */
114 void
CopyFromErrorCallback(void * arg)115 CopyFromErrorCallback(void *arg)
116 {
117 CopyFromState cstate = (CopyFromState) arg;
118 char curlineno_str[32];
119
120 snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
121 cstate->cur_lineno);
122
123 if (cstate->opts.binary)
124 {
125 /* can't usefully display the data */
126 if (cstate->cur_attname)
127 errcontext("COPY %s, line %s, column %s",
128 cstate->cur_relname, curlineno_str,
129 cstate->cur_attname);
130 else
131 errcontext("COPY %s, line %s",
132 cstate->cur_relname, curlineno_str);
133 }
134 else
135 {
136 if (cstate->cur_attname && cstate->cur_attval)
137 {
138 /* error is relevant to a particular column */
139 char *attval;
140
141 attval = limit_printout_length(cstate->cur_attval);
142 errcontext("COPY %s, line %s, column %s: \"%s\"",
143 cstate->cur_relname, curlineno_str,
144 cstate->cur_attname, attval);
145 pfree(attval);
146 }
147 else if (cstate->cur_attname)
148 {
149 /* error is relevant to a particular column, value is NULL */
150 errcontext("COPY %s, line %s, column %s: null input",
151 cstate->cur_relname, curlineno_str,
152 cstate->cur_attname);
153 }
154 else
155 {
156 /*
157 * Error is relevant to a particular line.
158 *
159 * If line_buf still contains the correct line, print it.
160 */
161 if (cstate->line_buf_valid)
162 {
163 char *lineval;
164
165 lineval = limit_printout_length(cstate->line_buf.data);
166 errcontext("COPY %s, line %s: \"%s\"",
167 cstate->cur_relname, curlineno_str, lineval);
168 pfree(lineval);
169 }
170 else
171 {
172 errcontext("COPY %s, line %s",
173 cstate->cur_relname, curlineno_str);
174 }
175 }
176 }
177 }
178
179 /*
180 * Make sure we don't print an unreasonable amount of COPY data in a message.
181 *
182 * Returns a pstrdup'd copy of the input.
183 */
184 static char *
limit_printout_length(const char * str)185 limit_printout_length(const char *str)
186 {
187 #define MAX_COPY_DATA_DISPLAY 100
188
189 int slen = strlen(str);
190 int len;
191 char *res;
192
193 /* Fast path if definitely okay */
194 if (slen <= MAX_COPY_DATA_DISPLAY)
195 return pstrdup(str);
196
197 /* Apply encoding-dependent truncation */
198 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
199
200 /*
201 * Truncate, and add "..." to show we truncated the input.
202 */
203 res = (char *) palloc(len + 4);
204 memcpy(res, str, len);
205 strcpy(res + len, "...");
206
207 return res;
208 }
209
210 /*
211 * Allocate memory and initialize a new CopyMultiInsertBuffer for this
212 * ResultRelInfo.
213 */
214 static CopyMultiInsertBuffer *
CopyMultiInsertBufferInit(ResultRelInfo * rri)215 CopyMultiInsertBufferInit(ResultRelInfo *rri)
216 {
217 CopyMultiInsertBuffer *buffer;
218
219 buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
220 memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
221 buffer->resultRelInfo = rri;
222 buffer->bistate = GetBulkInsertState();
223 buffer->nused = 0;
224
225 return buffer;
226 }
227
228 /*
229 * Make a new buffer for this ResultRelInfo.
230 */
231 static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri)232 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
233 ResultRelInfo *rri)
234 {
235 CopyMultiInsertBuffer *buffer;
236
237 buffer = CopyMultiInsertBufferInit(rri);
238
239 /* Setup back-link so we can easily find this buffer again */
240 rri->ri_CopyMultiInsertBuffer = buffer;
241 /* Record that we're tracking this buffer */
242 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
243 }
244
245 /*
246 * Initialize an already allocated CopyMultiInsertInfo.
247 *
248 * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
249 * for that table.
250 */
251 static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri,CopyFromState cstate,EState * estate,CommandId mycid,int ti_options)252 CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
253 CopyFromState cstate, EState *estate, CommandId mycid,
254 int ti_options)
255 {
256 miinfo->multiInsertBuffers = NIL;
257 miinfo->bufferedTuples = 0;
258 miinfo->bufferedBytes = 0;
259 miinfo->cstate = cstate;
260 miinfo->estate = estate;
261 miinfo->mycid = mycid;
262 miinfo->ti_options = ti_options;
263
264 /*
265 * Only setup the buffer when not dealing with a partitioned table.
266 * Buffers for partitioned tables will just be setup when we need to send
267 * tuples their way for the first time.
268 */
269 if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
270 CopyMultiInsertInfoSetupBuffer(miinfo, rri);
271 }
272
273 /*
274 * Returns true if the buffers are full
275 */
276 static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo * miinfo)277 CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
278 {
279 if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
280 miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
281 return true;
282 return false;
283 }
284
285 /*
286 * Returns true if we have no buffered tuples
287 */
288 static inline bool
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo * miinfo)289 CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
290 {
291 return miinfo->bufferedTuples == 0;
292 }
293
294 /*
295 * Write the tuples stored in 'buffer' out to the table.
296 */
297 static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo * miinfo,CopyMultiInsertBuffer * buffer)298 CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
299 CopyMultiInsertBuffer *buffer)
300 {
301 MemoryContext oldcontext;
302 int i;
303 uint64 save_cur_lineno;
304 CopyFromState cstate = miinfo->cstate;
305 EState *estate = miinfo->estate;
306 CommandId mycid = miinfo->mycid;
307 int ti_options = miinfo->ti_options;
308 bool line_buf_valid = cstate->line_buf_valid;
309 int nused = buffer->nused;
310 ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
311 TupleTableSlot **slots = buffer->slots;
312
313 /*
314 * Print error context information correctly, if one of the operations
315 * below fail.
316 */
317 cstate->line_buf_valid = false;
318 save_cur_lineno = cstate->cur_lineno;
319
320 /*
321 * table_multi_insert may leak memory, so switch to short-lived memory
322 * context before calling it.
323 */
324 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
325 table_multi_insert(resultRelInfo->ri_RelationDesc,
326 slots,
327 nused,
328 mycid,
329 ti_options,
330 buffer->bistate);
331 MemoryContextSwitchTo(oldcontext);
332
333 for (i = 0; i < nused; i++)
334 {
335 /*
336 * If there are any indexes, update them for all the inserted tuples,
337 * and run AFTER ROW INSERT triggers.
338 */
339 if (resultRelInfo->ri_NumIndices > 0)
340 {
341 List *recheckIndexes;
342
343 cstate->cur_lineno = buffer->linenos[i];
344 recheckIndexes =
345 ExecInsertIndexTuples(resultRelInfo,
346 buffer->slots[i], estate, false, false,
347 NULL, NIL);
348 ExecARInsertTriggers(estate, resultRelInfo,
349 slots[i], recheckIndexes,
350 cstate->transition_capture);
351 list_free(recheckIndexes);
352 }
353
354 /*
355 * There's no indexes, but see if we need to run AFTER ROW INSERT
356 * triggers anyway.
357 */
358 else if (resultRelInfo->ri_TrigDesc != NULL &&
359 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
360 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
361 {
362 cstate->cur_lineno = buffer->linenos[i];
363 ExecARInsertTriggers(estate, resultRelInfo,
364 slots[i], NIL, cstate->transition_capture);
365 }
366
367 ExecClearTuple(slots[i]);
368 }
369
370 /* Mark that all slots are free */
371 buffer->nused = 0;
372
373 /* reset cur_lineno and line_buf_valid to what they were */
374 cstate->line_buf_valid = line_buf_valid;
375 cstate->cur_lineno = save_cur_lineno;
376 }
377
378 /*
379 * Drop used slots and free member for this buffer.
380 *
381 * The buffer must be flushed before cleanup.
382 */
383 static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo * miinfo,CopyMultiInsertBuffer * buffer)384 CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
385 CopyMultiInsertBuffer *buffer)
386 {
387 int i;
388
389 /* Ensure buffer was flushed */
390 Assert(buffer->nused == 0);
391
392 /* Remove back-link to ourself */
393 buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
394
395 FreeBulkInsertState(buffer->bistate);
396
397 /* Since we only create slots on demand, just drop the non-null ones. */
398 for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
399 ExecDropSingleTupleTableSlot(buffer->slots[i]);
400
401 table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
402 miinfo->ti_options);
403
404 pfree(buffer);
405 }
406
407 /*
408 * Write out all stored tuples in all buffers out to the tables.
409 *
410 * Once flushed we also trim the tracked buffers list down to size by removing
411 * the buffers created earliest first.
412 *
413 * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
414 * used. When cleaning up old buffers we'll never remove the one for
415 * 'curr_rri'.
416 */
417 static inline void
CopyMultiInsertInfoFlush(CopyMultiInsertInfo * miinfo,ResultRelInfo * curr_rri)418 CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
419 {
420 ListCell *lc;
421
422 foreach(lc, miinfo->multiInsertBuffers)
423 {
424 CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
425
426 CopyMultiInsertBufferFlush(miinfo, buffer);
427 }
428
429 miinfo->bufferedTuples = 0;
430 miinfo->bufferedBytes = 0;
431
432 /*
433 * Trim the list of tracked buffers down if it exceeds the limit. Here we
434 * remove buffers starting with the ones we created first. It seems less
435 * likely that these older ones will be needed than the ones that were
436 * just created.
437 */
438 while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
439 {
440 CopyMultiInsertBuffer *buffer;
441
442 buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
443
444 /*
445 * We never want to remove the buffer that's currently being used, so
446 * if we happen to find that then move it to the end of the list.
447 */
448 if (buffer->resultRelInfo == curr_rri)
449 {
450 miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
451 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
452 buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
453 }
454
455 CopyMultiInsertBufferCleanup(miinfo, buffer);
456 miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
457 }
458 }
459
460 /*
461 * Cleanup allocated buffers and free memory
462 */
463 static inline void
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo * miinfo)464 CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
465 {
466 ListCell *lc;
467
468 foreach(lc, miinfo->multiInsertBuffers)
469 CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
470
471 list_free(miinfo->multiInsertBuffers);
472 }
473
474 /*
475 * Get the next TupleTableSlot that the next tuple should be stored in.
476 *
477 * Callers must ensure that the buffer is not full.
478 *
479 * Note: 'miinfo' is unused but has been included for consistency with the
480 * other functions in this area.
481 */
482 static inline TupleTableSlot *
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri)483 CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
484 ResultRelInfo *rri)
485 {
486 CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
487 int nused = buffer->nused;
488
489 Assert(buffer != NULL);
490 Assert(nused < MAX_BUFFERED_TUPLES);
491
492 if (buffer->slots[nused] == NULL)
493 buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
494 return buffer->slots[nused];
495 }
496
497 /*
498 * Record the previously reserved TupleTableSlot that was reserved by
499 * CopyMultiInsertInfoNextFreeSlot as being consumed.
500 */
501 static inline void
CopyMultiInsertInfoStore(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri,TupleTableSlot * slot,int tuplen,uint64 lineno)502 CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
503 TupleTableSlot *slot, int tuplen, uint64 lineno)
504 {
505 CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
506
507 Assert(buffer != NULL);
508 Assert(slot == buffer->slots[buffer->nused]);
509
510 /* Store the line number so we can properly report any errors later */
511 buffer->linenos[buffer->nused] = lineno;
512
513 /* Record this slot as being used */
514 buffer->nused++;
515
516 /* Update how many tuples are stored and their size */
517 miinfo->bufferedTuples++;
518 miinfo->bufferedBytes += tuplen;
519 }
520
521 /*
522 * Copy FROM file to relation.
523 */
524 uint64
CopyFrom(CopyFromState cstate)525 CopyFrom(CopyFromState cstate)
526 {
527 ResultRelInfo *resultRelInfo;
528 ResultRelInfo *target_resultRelInfo;
529 ResultRelInfo *prevResultRelInfo = NULL;
530 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
531 ModifyTableState *mtstate;
532 ExprContext *econtext;
533 TupleTableSlot *singleslot = NULL;
534 MemoryContext oldcontext = CurrentMemoryContext;
535
536 PartitionTupleRouting *proute = NULL;
537 ErrorContextCallback errcallback;
538 CommandId mycid = GetCurrentCommandId(true);
539 int ti_options = 0; /* start with default options for insert */
540 BulkInsertState bistate = NULL;
541 CopyInsertMethod insertMethod;
542 CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
543 int64 processed = 0;
544 int64 excluded = 0;
545 bool has_before_insert_row_trig;
546 bool has_instead_insert_row_trig;
547 bool leafpart_use_multi_insert = false;
548
549 Assert(cstate->rel);
550 Assert(list_length(cstate->range_table) == 1);
551
552 /*
553 * The target must be a plain, foreign, or partitioned relation, or have
554 * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
555 * allowed on views, so we only hint about them in the view case.)
556 */
557 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
558 cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
559 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
560 !(cstate->rel->trigdesc &&
561 cstate->rel->trigdesc->trig_insert_instead_row))
562 {
563 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
564 ereport(ERROR,
565 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
566 errmsg("cannot copy to view \"%s\"",
567 RelationGetRelationName(cstate->rel)),
568 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
569 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
570 ereport(ERROR,
571 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
572 errmsg("cannot copy to materialized view \"%s\"",
573 RelationGetRelationName(cstate->rel))));
574 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
575 ereport(ERROR,
576 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
577 errmsg("cannot copy to sequence \"%s\"",
578 RelationGetRelationName(cstate->rel))));
579 else
580 ereport(ERROR,
581 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
582 errmsg("cannot copy to non-table relation \"%s\"",
583 RelationGetRelationName(cstate->rel))));
584 }
585
586 /*
587 * If the target file is new-in-transaction, we assume that checking FSM
588 * for free space is a waste of time. This could possibly be wrong, but
589 * it's unlikely.
590 */
591 if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
592 (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
593 cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
594 ti_options |= TABLE_INSERT_SKIP_FSM;
595
596 /*
597 * Optimize if new relfilenode was created in this subxact or one of its
598 * committed children and we won't see those rows later as part of an
599 * earlier scan or command. The subxact test ensures that if this subxact
600 * aborts then the frozen rows won't be visible after xact cleanup. Note
601 * that the stronger test of exactly which subtransaction created it is
602 * crucial for correctness of this optimization. The test for an earlier
603 * scan or command tolerates false negatives. FREEZE causes other sessions
604 * to see rows they would not see under MVCC, and a false negative merely
605 * spreads that anomaly to the current session.
606 */
607 if (cstate->opts.freeze)
608 {
609 /*
610 * We currently disallow COPY FREEZE on partitioned tables. The
611 * reason for this is that we've simply not yet opened the partitions
612 * to determine if the optimization can be applied to them. We could
613 * go and open them all here, but doing so may be quite a costly
614 * overhead for small copies. In any case, we may just end up routing
615 * tuples to a small number of partitions. It seems better just to
616 * raise an ERROR for partitioned tables.
617 */
618 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
619 {
620 ereport(ERROR,
621 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
622 errmsg("cannot perform COPY FREEZE on a partitioned table")));
623 }
624
625 /*
626 * Tolerate one registration for the benefit of FirstXactSnapshot.
627 * Scan-bearing queries generally create at least two registrations,
628 * though relying on that is fragile, as is ignoring ActiveSnapshot.
629 * Clear CatalogSnapshot to avoid counting its registration. We'll
630 * still detect ongoing catalog scans, each of which separately
631 * registers the snapshot it uses.
632 */
633 InvalidateCatalogSnapshot();
634 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
635 ereport(ERROR,
636 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
637 errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
638
639 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
640 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
641 ereport(ERROR,
642 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
643 errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
644
645 ti_options |= TABLE_INSERT_FROZEN;
646 }
647
648 /*
649 * We need a ResultRelInfo so we can use the regular executor's
650 * index-entry-making machinery. (There used to be a huge amount of code
651 * here that basically duplicated execUtils.c ...)
652 */
653 ExecInitRangeTable(estate, cstate->range_table);
654 resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
655 ExecInitResultRelation(estate, resultRelInfo, 1);
656
657 /* Verify the named relation is a valid target for INSERT */
658 CheckValidResultRel(resultRelInfo, CMD_INSERT);
659
660 ExecOpenIndices(resultRelInfo, false);
661
662 /*
663 * Set up a ModifyTableState so we can let FDW(s) init themselves for
664 * foreign-table result relation(s).
665 */
666 mtstate = makeNode(ModifyTableState);
667 mtstate->ps.plan = NULL;
668 mtstate->ps.state = estate;
669 mtstate->operation = CMD_INSERT;
670 mtstate->mt_nrels = 1;
671 mtstate->resultRelInfo = resultRelInfo;
672 mtstate->rootResultRelInfo = resultRelInfo;
673
674 if (resultRelInfo->ri_FdwRoutine != NULL &&
675 resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
676 resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
677 resultRelInfo);
678
679 /* Prepare to catch AFTER triggers. */
680 AfterTriggerBeginQuery();
681
682 /*
683 * If there are any triggers with transition tables on the named relation,
684 * we need to be prepared to capture transition tuples.
685 *
686 * Because partition tuple routing would like to know about whether
687 * transition capture is active, we also set it in mtstate, which is
688 * passed to ExecFindPartition() below.
689 */
690 cstate->transition_capture = mtstate->mt_transition_capture =
691 MakeTransitionCaptureState(cstate->rel->trigdesc,
692 RelationGetRelid(cstate->rel),
693 CMD_INSERT);
694
695 /*
696 * If the named relation is a partitioned table, initialize state for
697 * CopyFrom tuple routing.
698 */
699 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
700 proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
701
702 if (cstate->whereClause)
703 cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
704 &mtstate->ps);
705
706 /*
707 * It's generally more efficient to prepare a bunch of tuples for
708 * insertion, and insert them in one table_multi_insert() call, than call
709 * table_tuple_insert() separately for every tuple. However, there are a
710 * number of reasons why we might not be able to do this. These are
711 * explained below.
712 */
713 if (resultRelInfo->ri_TrigDesc != NULL &&
714 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
715 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
716 {
717 /*
718 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
719 * triggers on the table. Such triggers might query the table we're
720 * inserting into and act differently if the tuples that have already
721 * been processed and prepared for insertion are not there.
722 */
723 insertMethod = CIM_SINGLE;
724 }
725 else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
726 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
727 {
728 /*
729 * For partitioned tables we can't support multi-inserts when there
730 * are any statement level insert triggers. It might be possible to
731 * allow partitioned tables with such triggers in the future, but for
732 * now, CopyMultiInsertInfoFlush expects that any before row insert
733 * and statement level insert triggers are on the same relation.
734 */
735 insertMethod = CIM_SINGLE;
736 }
737 else if (resultRelInfo->ri_FdwRoutine != NULL ||
738 cstate->volatile_defexprs)
739 {
740 /*
741 * Can't support multi-inserts to foreign tables or if there are any
742 * volatile default expressions in the table. Similarly to the
743 * trigger case above, such expressions may query the table we're
744 * inserting into.
745 *
746 * Note: It does not matter if any partitions have any volatile
747 * default expressions as we use the defaults from the target of the
748 * COPY command.
749 */
750 insertMethod = CIM_SINGLE;
751 }
752 else if (contain_volatile_functions(cstate->whereClause))
753 {
754 /*
755 * Can't support multi-inserts if there are any volatile function
756 * expressions in WHERE clause. Similarly to the trigger case above,
757 * such expressions may query the table we're inserting into.
758 */
759 insertMethod = CIM_SINGLE;
760 }
761 else
762 {
763 /*
764 * For partitioned tables, we may still be able to perform bulk
765 * inserts. However, the possibility of this depends on which types
766 * of triggers exist on the partition. We must disable bulk inserts
767 * if the partition is a foreign table or it has any before row insert
768 * or insert instead triggers (same as we checked above for the parent
769 * table). Since the partition's resultRelInfos are initialized only
770 * when we actually need to insert the first tuple into them, we must
771 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
772 * flag that we must later determine if we can use bulk-inserts for
773 * the partition being inserted into.
774 */
775 if (proute)
776 insertMethod = CIM_MULTI_CONDITIONAL;
777 else
778 insertMethod = CIM_MULTI;
779
780 CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
781 estate, mycid, ti_options);
782 }
783
784 /*
785 * If not using batch mode (which allocates slots as needed) set up a
786 * tuple slot too. When inserting into a partitioned table, we also need
787 * one, even if we might batch insert, to read the tuple in the root
788 * partition's form.
789 */
790 if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
791 {
792 singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
793 &estate->es_tupleTable);
794 bistate = GetBulkInsertState();
795 }
796
797 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
798 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
799
800 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
801 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
802
803 /*
804 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
805 * should do this for COPY, since it's not really an "INSERT" statement as
806 * such. However, executing these triggers maintains consistency with the
807 * EACH ROW triggers that we already fire on COPY.
808 */
809 ExecBSInsertTriggers(estate, resultRelInfo);
810
811 econtext = GetPerTupleExprContext(estate);
812
813 /* Set up callback to identify error line number */
814 errcallback.callback = CopyFromErrorCallback;
815 errcallback.arg = (void *) cstate;
816 errcallback.previous = error_context_stack;
817 error_context_stack = &errcallback;
818
819 for (;;)
820 {
821 TupleTableSlot *myslot;
822 bool skip_tuple;
823
824 CHECK_FOR_INTERRUPTS();
825
826 /*
827 * Reset the per-tuple exprcontext. We do this after every tuple, to
828 * clean-up after expression evaluations etc.
829 */
830 ResetPerTupleExprContext(estate);
831
832 /* select slot to (initially) load row into */
833 if (insertMethod == CIM_SINGLE || proute)
834 {
835 myslot = singleslot;
836 Assert(myslot != NULL);
837 }
838 else
839 {
840 Assert(resultRelInfo == target_resultRelInfo);
841 Assert(insertMethod == CIM_MULTI);
842
843 myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
844 resultRelInfo);
845 }
846
847 /*
848 * Switch to per-tuple context before calling NextCopyFrom, which does
849 * evaluate default expressions etc. and requires per-tuple context.
850 */
851 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
852
853 ExecClearTuple(myslot);
854
855 /* Directly store the values/nulls array in the slot */
856 if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
857 break;
858
859 ExecStoreVirtualTuple(myslot);
860
861 /*
862 * Constraints and where clause might reference the tableoid column,
863 * so (re-)initialize tts_tableOid before evaluating them.
864 */
865 myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
866
867 /* Triggers and stuff need to be invoked in query context. */
868 MemoryContextSwitchTo(oldcontext);
869
870 if (cstate->whereClause)
871 {
872 econtext->ecxt_scantuple = myslot;
873 /* Skip items that don't match COPY's WHERE clause */
874 if (!ExecQual(cstate->qualexpr, econtext))
875 {
876 /*
877 * Report that this tuple was filtered out by the WHERE
878 * clause.
879 */
880 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
881 ++excluded);
882 continue;
883 }
884 }
885
886 /* Determine the partition to insert the tuple into */
887 if (proute)
888 {
889 TupleConversionMap *map;
890
891 /*
892 * Attempt to find a partition suitable for this tuple.
893 * ExecFindPartition() will raise an error if none can be found or
894 * if the found partition is not suitable for INSERTs.
895 */
896 resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
897 proute, myslot, estate);
898
899 if (prevResultRelInfo != resultRelInfo)
900 {
901 /* Determine which triggers exist on this partition */
902 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
903 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
904
905 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
906 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
907
908 /*
909 * Disable multi-inserts when the partition has BEFORE/INSTEAD
910 * OF triggers, or if the partition is a foreign partition.
911 */
912 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
913 !has_before_insert_row_trig &&
914 !has_instead_insert_row_trig &&
915 resultRelInfo->ri_FdwRoutine == NULL;
916
917 /* Set the multi-insert buffer to use for this partition. */
918 if (leafpart_use_multi_insert)
919 {
920 if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
921 CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
922 resultRelInfo);
923 }
924 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
925 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
926 {
927 /*
928 * Flush pending inserts if this partition can't use
929 * batching, so rows are visible to triggers etc.
930 */
931 CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
932 }
933
934 if (bistate != NULL)
935 ReleaseBulkInsertStatePin(bistate);
936 prevResultRelInfo = resultRelInfo;
937 }
938
939 /*
940 * If we're capturing transition tuples, we might need to convert
941 * from the partition rowtype to root rowtype. But if there are no
942 * BEFORE triggers on the partition that could change the tuple,
943 * we can just remember the original unconverted tuple to avoid a
944 * needless round trip conversion.
945 */
946 if (cstate->transition_capture != NULL)
947 cstate->transition_capture->tcs_original_insert_tuple =
948 !has_before_insert_row_trig ? myslot : NULL;
949
950 /*
951 * We might need to convert from the root rowtype to the partition
952 * rowtype.
953 */
954 map = resultRelInfo->ri_RootToPartitionMap;
955 if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
956 {
957 /* non batch insert */
958 if (map != NULL)
959 {
960 TupleTableSlot *new_slot;
961
962 new_slot = resultRelInfo->ri_PartitionTupleSlot;
963 myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
964 }
965 }
966 else
967 {
968 /*
969 * Prepare to queue up tuple for later batch insert into
970 * current partition.
971 */
972 TupleTableSlot *batchslot;
973
974 /* no other path available for partitioned table */
975 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
976
977 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
978 resultRelInfo);
979
980 if (map != NULL)
981 myslot = execute_attr_map_slot(map->attrMap, myslot,
982 batchslot);
983 else
984 {
985 /*
986 * This looks more expensive than it is (Believe me, I
987 * optimized it away. Twice.). The input is in virtual
988 * form, and we'll materialize the slot below - for most
989 * slot types the copy performs the work materialization
990 * would later require anyway.
991 */
992 ExecCopySlot(batchslot, myslot);
993 myslot = batchslot;
994 }
995 }
996
997 /* ensure that triggers etc see the right relation */
998 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
999 }
1000
1001 skip_tuple = false;
1002
1003 /* BEFORE ROW INSERT Triggers */
1004 if (has_before_insert_row_trig)
1005 {
1006 if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1007 skip_tuple = true; /* "do nothing" */
1008 }
1009
1010 if (!skip_tuple)
1011 {
1012 /*
1013 * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1014 * tuple. Otherwise, proceed with inserting the tuple into the
1015 * table or foreign table.
1016 */
1017 if (has_instead_insert_row_trig)
1018 {
1019 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1020 }
1021 else
1022 {
1023 /* Compute stored generated columns */
1024 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1025 resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1026 ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1027 CMD_INSERT);
1028
1029 /*
1030 * If the target is a plain table, check the constraints of
1031 * the tuple.
1032 */
1033 if (resultRelInfo->ri_FdwRoutine == NULL &&
1034 resultRelInfo->ri_RelationDesc->rd_att->constr)
1035 ExecConstraints(resultRelInfo, myslot, estate);
1036
1037 /*
1038 * Also check the tuple against the partition constraint, if
1039 * there is one; except that if we got here via tuple-routing,
1040 * we don't need to if there's no BR trigger defined on the
1041 * partition.
1042 */
1043 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1044 (proute == NULL || has_before_insert_row_trig))
1045 ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1046
1047 /* Store the slot in the multi-insert buffer, when enabled. */
1048 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1049 {
1050 /*
1051 * The slot previously might point into the per-tuple
1052 * context. For batching it needs to be longer lived.
1053 */
1054 ExecMaterializeSlot(myslot);
1055
1056 /* Add this tuple to the tuple buffer */
1057 CopyMultiInsertInfoStore(&multiInsertInfo,
1058 resultRelInfo, myslot,
1059 cstate->line_buf.len,
1060 cstate->cur_lineno);
1061
1062 /*
1063 * If enough inserts have queued up, then flush all
1064 * buffers out to their tables.
1065 */
1066 if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1067 CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
1068 }
1069 else
1070 {
1071 List *recheckIndexes = NIL;
1072
1073 /* OK, store the tuple */
1074 if (resultRelInfo->ri_FdwRoutine != NULL)
1075 {
1076 myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1077 resultRelInfo,
1078 myslot,
1079 NULL);
1080
1081 if (myslot == NULL) /* "do nothing" */
1082 continue; /* next tuple please */
1083
1084 /*
1085 * AFTER ROW Triggers might reference the tableoid
1086 * column, so (re-)initialize tts_tableOid before
1087 * evaluating them.
1088 */
1089 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1090 }
1091 else
1092 {
1093 /* OK, store the tuple and create index entries for it */
1094 table_tuple_insert(resultRelInfo->ri_RelationDesc,
1095 myslot, mycid, ti_options, bistate);
1096
1097 if (resultRelInfo->ri_NumIndices > 0)
1098 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1099 myslot,
1100 estate,
1101 false,
1102 false,
1103 NULL,
1104 NIL);
1105 }
1106
1107 /* AFTER ROW INSERT Triggers */
1108 ExecARInsertTriggers(estate, resultRelInfo, myslot,
1109 recheckIndexes, cstate->transition_capture);
1110
1111 list_free(recheckIndexes);
1112 }
1113 }
1114
1115 /*
1116 * We count only tuples not suppressed by a BEFORE INSERT trigger
1117 * or FDW; this is the same definition used by nodeModifyTable.c
1118 * for counting tuples inserted by an INSERT command. Update
1119 * progress of the COPY command as well.
1120 */
1121 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1122 ++processed);
1123 }
1124 }
1125
1126 /* Flush any remaining buffered tuples */
1127 if (insertMethod != CIM_SINGLE)
1128 {
1129 if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1130 CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
1131 }
1132
1133 /* Done, clean up */
1134 error_context_stack = errcallback.previous;
1135
1136 if (bistate != NULL)
1137 FreeBulkInsertState(bistate);
1138
1139 MemoryContextSwitchTo(oldcontext);
1140
1141 /* Execute AFTER STATEMENT insertion triggers */
1142 ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1143
1144 /* Handle queued AFTER triggers */
1145 AfterTriggerEndQuery(estate);
1146
1147 ExecResetTupleTable(estate->es_tupleTable, false);
1148
1149 /* Allow the FDW to shut down */
1150 if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1151 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1152 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1153 target_resultRelInfo);
1154
1155 /* Tear down the multi-insert buffer data */
1156 if (insertMethod != CIM_SINGLE)
1157 CopyMultiInsertInfoCleanup(&multiInsertInfo);
1158
1159 /* Close all the partitioned tables, leaf partitions, and their indices */
1160 if (proute)
1161 ExecCleanupTupleRouting(mtstate, proute);
1162
1163 /* Close the result relations, including any trigger target relations */
1164 ExecCloseResultRelations(estate);
1165 ExecCloseRangeTableRelations(estate);
1166
1167 FreeExecutorState(estate);
1168
1169 return processed;
1170 }
1171
1172 /*
1173 * Setup to read tuples from a file for COPY FROM.
1174 *
1175 * 'rel': Used as a template for the tuples
1176 * 'whereClause': WHERE clause from the COPY FROM command
1177 * 'filename': Name of server-local file to read, NULL for STDIN
1178 * 'is_program': true if 'filename' is program to execute
1179 * 'data_source_cb': callback that provides the input data
1180 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1181 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1182 *
1183 * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1184 */
1185 CopyFromState
BeginCopyFrom(ParseState * pstate,Relation rel,Node * whereClause,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)1186 BeginCopyFrom(ParseState *pstate,
1187 Relation rel,
1188 Node *whereClause,
1189 const char *filename,
1190 bool is_program,
1191 copy_data_source_cb data_source_cb,
1192 List *attnamelist,
1193 List *options)
1194 {
1195 CopyFromState cstate;
1196 bool pipe = (filename == NULL);
1197 TupleDesc tupDesc;
1198 AttrNumber num_phys_attrs,
1199 num_defaults;
1200 FmgrInfo *in_functions;
1201 Oid *typioparams;
1202 int attnum;
1203 Oid in_func_oid;
1204 int *defmap;
1205 ExprState **defexprs;
1206 MemoryContext oldcontext;
1207 bool volatile_defexprs;
1208 const int progress_cols[] = {
1209 PROGRESS_COPY_COMMAND,
1210 PROGRESS_COPY_TYPE,
1211 PROGRESS_COPY_BYTES_TOTAL
1212 };
1213 int64 progress_vals[] = {
1214 PROGRESS_COPY_COMMAND_FROM,
1215 0,
1216 0
1217 };
1218
1219 /* Allocate workspace and zero all fields */
1220 cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1221
1222 /*
1223 * We allocate everything used by a cstate in a new memory context. This
1224 * avoids memory leaks during repeated use of COPY in a query.
1225 */
1226 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1227 "COPY",
1228 ALLOCSET_DEFAULT_SIZES);
1229
1230 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1231
1232 /* Extract options from the statement node tree */
1233 ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1234
1235 /* Process the target relation */
1236 cstate->rel = rel;
1237
1238 tupDesc = RelationGetDescr(cstate->rel);
1239
1240 /* process commmon options or initialization */
1241
1242 /* Generate or convert list of attributes to process */
1243 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1244
1245 num_phys_attrs = tupDesc->natts;
1246
1247 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1248 cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1249 if (cstate->opts.force_notnull)
1250 {
1251 List *attnums;
1252 ListCell *cur;
1253
1254 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1255
1256 foreach(cur, attnums)
1257 {
1258 int attnum = lfirst_int(cur);
1259 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1260
1261 if (!list_member_int(cstate->attnumlist, attnum))
1262 ereport(ERROR,
1263 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1264 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1265 NameStr(attr->attname))));
1266 cstate->opts.force_notnull_flags[attnum - 1] = true;
1267 }
1268 }
1269
1270 /* Convert FORCE_NULL name list to per-column flags, check validity */
1271 cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1272 if (cstate->opts.force_null)
1273 {
1274 List *attnums;
1275 ListCell *cur;
1276
1277 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1278
1279 foreach(cur, attnums)
1280 {
1281 int attnum = lfirst_int(cur);
1282 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1283
1284 if (!list_member_int(cstate->attnumlist, attnum))
1285 ereport(ERROR,
1286 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1287 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1288 NameStr(attr->attname))));
1289 cstate->opts.force_null_flags[attnum - 1] = true;
1290 }
1291 }
1292
1293 /* Convert convert_selectively name list to per-column flags */
1294 if (cstate->opts.convert_selectively)
1295 {
1296 List *attnums;
1297 ListCell *cur;
1298
1299 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1300
1301 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1302
1303 foreach(cur, attnums)
1304 {
1305 int attnum = lfirst_int(cur);
1306 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1307
1308 if (!list_member_int(cstate->attnumlist, attnum))
1309 ereport(ERROR,
1310 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1311 errmsg_internal("selected column \"%s\" not referenced by COPY",
1312 NameStr(attr->attname))));
1313 cstate->convert_select_flags[attnum - 1] = true;
1314 }
1315 }
1316
1317 /* Use client encoding when ENCODING option is not specified. */
1318 if (cstate->opts.file_encoding < 0)
1319 cstate->file_encoding = pg_get_client_encoding();
1320 else
1321 cstate->file_encoding = cstate->opts.file_encoding;
1322
1323 /*
1324 * Look up encoding conversion function.
1325 */
1326 if (cstate->file_encoding == GetDatabaseEncoding() ||
1327 cstate->file_encoding == PG_SQL_ASCII ||
1328 GetDatabaseEncoding() == PG_SQL_ASCII)
1329 {
1330 cstate->need_transcoding = false;
1331 }
1332 else
1333 {
1334 cstate->need_transcoding = true;
1335 cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1336 GetDatabaseEncoding());
1337 }
1338
1339 cstate->copy_src = COPY_FILE; /* default */
1340
1341 cstate->whereClause = whereClause;
1342
1343 MemoryContextSwitchTo(oldcontext);
1344
1345 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1346
1347 /* Initialize state variables */
1348 cstate->eol_type = EOL_UNKNOWN;
1349 cstate->cur_relname = RelationGetRelationName(cstate->rel);
1350 cstate->cur_lineno = 0;
1351 cstate->cur_attname = NULL;
1352 cstate->cur_attval = NULL;
1353
1354 /*
1355 * Allocate buffers for the input pipeline.
1356 *
1357 * attribute_buf and raw_buf are used in both text and binary modes, but
1358 * input_buf and line_buf only in text mode.
1359 */
1360 cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1361 cstate->raw_buf_index = cstate->raw_buf_len = 0;
1362 cstate->raw_reached_eof = false;
1363
1364 if (!cstate->opts.binary)
1365 {
1366 /*
1367 * If encoding conversion is needed, we need another buffer to hold
1368 * the converted input data. Otherwise, we can just point input_buf
1369 * to the same buffer as raw_buf.
1370 */
1371 if (cstate->need_transcoding)
1372 {
1373 cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1374 cstate->input_buf_index = cstate->input_buf_len = 0;
1375 }
1376 else
1377 cstate->input_buf = cstate->raw_buf;
1378 cstate->input_reached_eof = false;
1379
1380 initStringInfo(&cstate->line_buf);
1381 }
1382
1383 initStringInfo(&cstate->attribute_buf);
1384
1385 /* Assign range table, we'll need it in CopyFrom. */
1386 if (pstate)
1387 cstate->range_table = pstate->p_rtable;
1388
1389 tupDesc = RelationGetDescr(cstate->rel);
1390 num_phys_attrs = tupDesc->natts;
1391 num_defaults = 0;
1392 volatile_defexprs = false;
1393
1394 /*
1395 * Pick up the required catalog information for each attribute in the
1396 * relation, including the input function, the element type (to pass to
1397 * the input function), and info about defaults and constraints. (Which
1398 * input function we use depends on text/binary format choice.)
1399 */
1400 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1401 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1402 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1403 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1404
1405 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1406 {
1407 Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1408
1409 /* We don't need info for dropped attributes */
1410 if (att->attisdropped)
1411 continue;
1412
1413 /* Fetch the input function and typioparam info */
1414 if (cstate->opts.binary)
1415 getTypeBinaryInputInfo(att->atttypid,
1416 &in_func_oid, &typioparams[attnum - 1]);
1417 else
1418 getTypeInputInfo(att->atttypid,
1419 &in_func_oid, &typioparams[attnum - 1]);
1420 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1421
1422 /* Get default info if needed */
1423 if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
1424 {
1425 /* attribute is NOT to be copied from input */
1426 /* use default value if one exists */
1427 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1428 attnum);
1429
1430 if (defexpr != NULL)
1431 {
1432 /* Run the expression through planner */
1433 defexpr = expression_planner(defexpr);
1434
1435 /* Initialize executable expression in copycontext */
1436 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1437 defmap[num_defaults] = attnum - 1;
1438 num_defaults++;
1439
1440 /*
1441 * If a default expression looks at the table being loaded,
1442 * then it could give the wrong answer when using
1443 * multi-insert. Since database access can be dynamic this is
1444 * hard to test for exactly, so we use the much wider test of
1445 * whether the default expression is volatile. We allow for
1446 * the special case of when the default expression is the
1447 * nextval() of a sequence which in this specific case is
1448 * known to be safe for use with the multi-insert
1449 * optimization. Hence we use this special case function
1450 * checker rather than the standard check for
1451 * contain_volatile_functions().
1452 */
1453 if (!volatile_defexprs)
1454 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1455 }
1456 }
1457 }
1458
1459
1460 /* initialize progress */
1461 pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1462 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1463 cstate->bytes_processed = 0;
1464
1465 /* We keep those variables in cstate. */
1466 cstate->in_functions = in_functions;
1467 cstate->typioparams = typioparams;
1468 cstate->defmap = defmap;
1469 cstate->defexprs = defexprs;
1470 cstate->volatile_defexprs = volatile_defexprs;
1471 cstate->num_defaults = num_defaults;
1472 cstate->is_program = is_program;
1473
1474 if (data_source_cb)
1475 {
1476 progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1477 cstate->copy_src = COPY_CALLBACK;
1478 cstate->data_source_cb = data_source_cb;
1479 }
1480 else if (pipe)
1481 {
1482 progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1483 Assert(!is_program); /* the grammar does not allow this */
1484 if (whereToSendOutput == DestRemote)
1485 ReceiveCopyBegin(cstate);
1486 else
1487 cstate->copy_file = stdin;
1488 }
1489 else
1490 {
1491 cstate->filename = pstrdup(filename);
1492
1493 if (cstate->is_program)
1494 {
1495 progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1496 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1497 if (cstate->copy_file == NULL)
1498 ereport(ERROR,
1499 (errcode_for_file_access(),
1500 errmsg("could not execute command \"%s\": %m",
1501 cstate->filename)));
1502 }
1503 else
1504 {
1505 struct stat st;
1506
1507 progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1508 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1509 if (cstate->copy_file == NULL)
1510 {
1511 /* copy errno because ereport subfunctions might change it */
1512 int save_errno = errno;
1513
1514 ereport(ERROR,
1515 (errcode_for_file_access(),
1516 errmsg("could not open file \"%s\" for reading: %m",
1517 cstate->filename),
1518 (save_errno == ENOENT || save_errno == EACCES) ?
1519 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1520 "You may want a client-side facility such as psql's \\copy.") : 0));
1521 }
1522
1523 if (fstat(fileno(cstate->copy_file), &st))
1524 ereport(ERROR,
1525 (errcode_for_file_access(),
1526 errmsg("could not stat file \"%s\": %m",
1527 cstate->filename)));
1528
1529 if (S_ISDIR(st.st_mode))
1530 ereport(ERROR,
1531 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1532 errmsg("\"%s\" is a directory", cstate->filename)));
1533
1534 progress_vals[2] = st.st_size;
1535 }
1536 }
1537
1538 pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1539
1540 if (cstate->opts.binary)
1541 {
1542 /* Read and verify binary header */
1543 ReceiveCopyBinaryHeader(cstate);
1544 }
1545
1546 /* create workspace for CopyReadAttributes results */
1547 if (!cstate->opts.binary)
1548 {
1549 AttrNumber attr_count = list_length(cstate->attnumlist);
1550
1551 cstate->max_fields = attr_count;
1552 cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1553 }
1554
1555 MemoryContextSwitchTo(oldcontext);
1556
1557 return cstate;
1558 }
1559
1560 /*
1561 * Clean up storage and release resources for COPY FROM.
1562 */
1563 void
EndCopyFrom(CopyFromState cstate)1564 EndCopyFrom(CopyFromState cstate)
1565 {
1566 /* No COPY FROM related resources except memory. */
1567 if (cstate->is_program)
1568 {
1569 ClosePipeFromProgram(cstate);
1570 }
1571 else
1572 {
1573 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1574 ereport(ERROR,
1575 (errcode_for_file_access(),
1576 errmsg("could not close file \"%s\": %m",
1577 cstate->filename)));
1578 }
1579
1580 pgstat_progress_end_command();
1581
1582 MemoryContextDelete(cstate->copycontext);
1583 pfree(cstate);
1584 }
1585
1586 /*
1587 * Closes the pipe from an external program, checking the pclose() return code.
1588 */
1589 static void
ClosePipeFromProgram(CopyFromState cstate)1590 ClosePipeFromProgram(CopyFromState cstate)
1591 {
1592 int pclose_rc;
1593
1594 Assert(cstate->is_program);
1595
1596 pclose_rc = ClosePipeStream(cstate->copy_file);
1597 if (pclose_rc == -1)
1598 ereport(ERROR,
1599 (errcode_for_file_access(),
1600 errmsg("could not close pipe to external command: %m")));
1601 else if (pclose_rc != 0)
1602 {
1603 /*
1604 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1605 * expectable for the called program to fail with SIGPIPE, and we
1606 * should not report that as an error. Otherwise, SIGPIPE indicates a
1607 * problem.
1608 */
1609 if (!cstate->raw_reached_eof &&
1610 wait_result_is_signal(pclose_rc, SIGPIPE))
1611 return;
1612
1613 ereport(ERROR,
1614 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1615 errmsg("program \"%s\" failed",
1616 cstate->filename),
1617 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1618 }
1619 }
1620