1 /*-------------------------------------------------------------------------
2  *
3  * copyfrom.c
4  *		COPY <table> FROM file/program/client
5  *
6  * This file contains routines needed to efficiently load tuples into a
7  * table.  That includes looking up the correct partition, firing triggers,
8  * calling the table AM function to insert the data, and updating indexes.
9  * Reading data from the input file or client and parsing it into Datums
10  * is handled in copyfromparse.c.
11  *
12  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *
16  * IDENTIFICATION
17  *	  src/backend/commands/copyfrom.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 #include "postgres.h"
22 
23 #include <ctype.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26 
27 #include "access/heapam.h"
28 #include "access/htup_details.h"
29 #include "access/tableam.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "catalog/namespace.h"
33 #include "commands/copy.h"
34 #include "commands/copyfrom_internal.h"
35 #include "commands/progress.h"
36 #include "commands/trigger.h"
37 #include "executor/execPartition.h"
38 #include "executor/executor.h"
39 #include "executor/nodeModifyTable.h"
40 #include "executor/tuptable.h"
41 #include "foreign/fdwapi.h"
42 #include "libpq/libpq.h"
43 #include "libpq/pqformat.h"
44 #include "miscadmin.h"
45 #include "optimizer/optimizer.h"
46 #include "pgstat.h"
47 #include "rewrite/rewriteHandler.h"
48 #include "storage/fd.h"
49 #include "tcop/tcopprot.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/portal.h"
53 #include "utils/rel.h"
54 #include "utils/snapmgr.h"
55 
56 /*
57  * No more than this many tuples per CopyMultiInsertBuffer
58  *
59  * Caution: Don't make this too big, as we could end up with this many
60  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
61  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
62  * memory requirements during copies into partitioned tables with a large
63  * number of partitions.
64  */
65 #define MAX_BUFFERED_TUPLES		1000
66 
67 /*
68  * Flush buffers if there are >= this many bytes, as counted by the input
69  * size, of tuples stored.
70  */
71 #define MAX_BUFFERED_BYTES		65535
72 
73 /* Trim the list of buffers back down to this number after flushing */
74 #define MAX_PARTITION_BUFFERS	32
75 
76 /* Stores multi-insert data related to a single relation in CopyFrom. */
77 typedef struct CopyMultiInsertBuffer
78 {
79 	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
80 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
81 	BulkInsertState bistate;	/* BulkInsertState for this rel */
82 	int			nused;			/* number of 'slots' containing tuples */
83 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
84 												 * stream */
85 } CopyMultiInsertBuffer;
86 
87 /*
88  * Stores one or many CopyMultiInsertBuffers and details about the size and
89  * number of tuples which are stored in them.  This allows multiple buffers to
90  * exist at once when COPYing into a partitioned table.
91  */
92 typedef struct CopyMultiInsertInfo
93 {
94 	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
95 	int			bufferedTuples; /* number of tuples buffered over all buffers */
96 	int			bufferedBytes;	/* number of bytes from all buffered tuples */
97 	CopyFromState cstate;		/* Copy state for this CopyMultiInsertInfo */
98 	EState	   *estate;			/* Executor state used for COPY */
99 	CommandId	mycid;			/* Command Id used for COPY */
100 	int			ti_options;		/* table insert options */
101 } CopyMultiInsertInfo;
102 
103 
104 /* non-export function prototypes */
105 static char *limit_printout_length(const char *str);
106 
107 static void ClosePipeFromProgram(CopyFromState cstate);
108 
109 /*
110  * error context callback for COPY FROM
111  *
112  * The argument for the error context must be CopyFromState.
113  */
114 void
CopyFromErrorCallback(void * arg)115 CopyFromErrorCallback(void *arg)
116 {
117 	CopyFromState cstate = (CopyFromState) arg;
118 	char		curlineno_str[32];
119 
120 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
121 			 cstate->cur_lineno);
122 
123 	if (cstate->opts.binary)
124 	{
125 		/* can't usefully display the data */
126 		if (cstate->cur_attname)
127 			errcontext("COPY %s, line %s, column %s",
128 					   cstate->cur_relname, curlineno_str,
129 					   cstate->cur_attname);
130 		else
131 			errcontext("COPY %s, line %s",
132 					   cstate->cur_relname, curlineno_str);
133 	}
134 	else
135 	{
136 		if (cstate->cur_attname && cstate->cur_attval)
137 		{
138 			/* error is relevant to a particular column */
139 			char	   *attval;
140 
141 			attval = limit_printout_length(cstate->cur_attval);
142 			errcontext("COPY %s, line %s, column %s: \"%s\"",
143 					   cstate->cur_relname, curlineno_str,
144 					   cstate->cur_attname, attval);
145 			pfree(attval);
146 		}
147 		else if (cstate->cur_attname)
148 		{
149 			/* error is relevant to a particular column, value is NULL */
150 			errcontext("COPY %s, line %s, column %s: null input",
151 					   cstate->cur_relname, curlineno_str,
152 					   cstate->cur_attname);
153 		}
154 		else
155 		{
156 			/*
157 			 * Error is relevant to a particular line.
158 			 *
159 			 * If line_buf still contains the correct line, print it.
160 			 */
161 			if (cstate->line_buf_valid)
162 			{
163 				char	   *lineval;
164 
165 				lineval = limit_printout_length(cstate->line_buf.data);
166 				errcontext("COPY %s, line %s: \"%s\"",
167 						   cstate->cur_relname, curlineno_str, lineval);
168 				pfree(lineval);
169 			}
170 			else
171 			{
172 				errcontext("COPY %s, line %s",
173 						   cstate->cur_relname, curlineno_str);
174 			}
175 		}
176 	}
177 }
178 
179 /*
180  * Make sure we don't print an unreasonable amount of COPY data in a message.
181  *
182  * Returns a pstrdup'd copy of the input.
183  */
184 static char *
limit_printout_length(const char * str)185 limit_printout_length(const char *str)
186 {
187 #define MAX_COPY_DATA_DISPLAY 100
188 
189 	int			slen = strlen(str);
190 	int			len;
191 	char	   *res;
192 
193 	/* Fast path if definitely okay */
194 	if (slen <= MAX_COPY_DATA_DISPLAY)
195 		return pstrdup(str);
196 
197 	/* Apply encoding-dependent truncation */
198 	len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
199 
200 	/*
201 	 * Truncate, and add "..." to show we truncated the input.
202 	 */
203 	res = (char *) palloc(len + 4);
204 	memcpy(res, str, len);
205 	strcpy(res + len, "...");
206 
207 	return res;
208 }
209 
210 /*
211  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
212  * ResultRelInfo.
213  */
214 static CopyMultiInsertBuffer *
CopyMultiInsertBufferInit(ResultRelInfo * rri)215 CopyMultiInsertBufferInit(ResultRelInfo *rri)
216 {
217 	CopyMultiInsertBuffer *buffer;
218 
219 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
220 	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
221 	buffer->resultRelInfo = rri;
222 	buffer->bistate = GetBulkInsertState();
223 	buffer->nused = 0;
224 
225 	return buffer;
226 }
227 
228 /*
229  * Make a new buffer for this ResultRelInfo.
230  */
231 static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri)232 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
233 							   ResultRelInfo *rri)
234 {
235 	CopyMultiInsertBuffer *buffer;
236 
237 	buffer = CopyMultiInsertBufferInit(rri);
238 
239 	/* Setup back-link so we can easily find this buffer again */
240 	rri->ri_CopyMultiInsertBuffer = buffer;
241 	/* Record that we're tracking this buffer */
242 	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
243 }
244 
245 /*
246  * Initialize an already allocated CopyMultiInsertInfo.
247  *
248  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
249  * for that table.
250  */
251 static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri,CopyFromState cstate,EState * estate,CommandId mycid,int ti_options)252 CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
253 						CopyFromState cstate, EState *estate, CommandId mycid,
254 						int ti_options)
255 {
256 	miinfo->multiInsertBuffers = NIL;
257 	miinfo->bufferedTuples = 0;
258 	miinfo->bufferedBytes = 0;
259 	miinfo->cstate = cstate;
260 	miinfo->estate = estate;
261 	miinfo->mycid = mycid;
262 	miinfo->ti_options = ti_options;
263 
264 	/*
265 	 * Only setup the buffer when not dealing with a partitioned table.
266 	 * Buffers for partitioned tables will just be setup when we need to send
267 	 * tuples their way for the first time.
268 	 */
269 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
270 		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
271 }
272 
273 /*
274  * Returns true if the buffers are full
275  */
276 static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo * miinfo)277 CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
278 {
279 	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
280 		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
281 		return true;
282 	return false;
283 }
284 
285 /*
286  * Returns true if we have no buffered tuples
287  */
288 static inline bool
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo * miinfo)289 CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
290 {
291 	return miinfo->bufferedTuples == 0;
292 }
293 
294 /*
295  * Write the tuples stored in 'buffer' out to the table.
296  */
297 static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo * miinfo,CopyMultiInsertBuffer * buffer)298 CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
299 						   CopyMultiInsertBuffer *buffer)
300 {
301 	MemoryContext oldcontext;
302 	int			i;
303 	uint64		save_cur_lineno;
304 	CopyFromState cstate = miinfo->cstate;
305 	EState	   *estate = miinfo->estate;
306 	CommandId	mycid = miinfo->mycid;
307 	int			ti_options = miinfo->ti_options;
308 	bool		line_buf_valid = cstate->line_buf_valid;
309 	int			nused = buffer->nused;
310 	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
311 	TupleTableSlot **slots = buffer->slots;
312 
313 	/*
314 	 * Print error context information correctly, if one of the operations
315 	 * below fail.
316 	 */
317 	cstate->line_buf_valid = false;
318 	save_cur_lineno = cstate->cur_lineno;
319 
320 	/*
321 	 * table_multi_insert may leak memory, so switch to short-lived memory
322 	 * context before calling it.
323 	 */
324 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
325 	table_multi_insert(resultRelInfo->ri_RelationDesc,
326 					   slots,
327 					   nused,
328 					   mycid,
329 					   ti_options,
330 					   buffer->bistate);
331 	MemoryContextSwitchTo(oldcontext);
332 
333 	for (i = 0; i < nused; i++)
334 	{
335 		/*
336 		 * If there are any indexes, update them for all the inserted tuples,
337 		 * and run AFTER ROW INSERT triggers.
338 		 */
339 		if (resultRelInfo->ri_NumIndices > 0)
340 		{
341 			List	   *recheckIndexes;
342 
343 			cstate->cur_lineno = buffer->linenos[i];
344 			recheckIndexes =
345 				ExecInsertIndexTuples(resultRelInfo,
346 									  buffer->slots[i], estate, false, false,
347 									  NULL, NIL);
348 			ExecARInsertTriggers(estate, resultRelInfo,
349 								 slots[i], recheckIndexes,
350 								 cstate->transition_capture);
351 			list_free(recheckIndexes);
352 		}
353 
354 		/*
355 		 * There's no indexes, but see if we need to run AFTER ROW INSERT
356 		 * triggers anyway.
357 		 */
358 		else if (resultRelInfo->ri_TrigDesc != NULL &&
359 				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
360 				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
361 		{
362 			cstate->cur_lineno = buffer->linenos[i];
363 			ExecARInsertTriggers(estate, resultRelInfo,
364 								 slots[i], NIL, cstate->transition_capture);
365 		}
366 
367 		ExecClearTuple(slots[i]);
368 	}
369 
370 	/* Mark that all slots are free */
371 	buffer->nused = 0;
372 
373 	/* reset cur_lineno and line_buf_valid to what they were */
374 	cstate->line_buf_valid = line_buf_valid;
375 	cstate->cur_lineno = save_cur_lineno;
376 }
377 
378 /*
379  * Drop used slots and free member for this buffer.
380  *
381  * The buffer must be flushed before cleanup.
382  */
383 static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo * miinfo,CopyMultiInsertBuffer * buffer)384 CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
385 							 CopyMultiInsertBuffer *buffer)
386 {
387 	int			i;
388 
389 	/* Ensure buffer was flushed */
390 	Assert(buffer->nused == 0);
391 
392 	/* Remove back-link to ourself */
393 	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
394 
395 	FreeBulkInsertState(buffer->bistate);
396 
397 	/* Since we only create slots on demand, just drop the non-null ones. */
398 	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
399 		ExecDropSingleTupleTableSlot(buffer->slots[i]);
400 
401 	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
402 							 miinfo->ti_options);
403 
404 	pfree(buffer);
405 }
406 
407 /*
408  * Write out all stored tuples in all buffers out to the tables.
409  *
410  * Once flushed we also trim the tracked buffers list down to size by removing
411  * the buffers created earliest first.
412  *
413  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
414  * used.  When cleaning up old buffers we'll never remove the one for
415  * 'curr_rri'.
416  */
417 static inline void
CopyMultiInsertInfoFlush(CopyMultiInsertInfo * miinfo,ResultRelInfo * curr_rri)418 CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
419 {
420 	ListCell   *lc;
421 
422 	foreach(lc, miinfo->multiInsertBuffers)
423 	{
424 		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
425 
426 		CopyMultiInsertBufferFlush(miinfo, buffer);
427 	}
428 
429 	miinfo->bufferedTuples = 0;
430 	miinfo->bufferedBytes = 0;
431 
432 	/*
433 	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
434 	 * remove buffers starting with the ones we created first.  It seems less
435 	 * likely that these older ones will be needed than the ones that were
436 	 * just created.
437 	 */
438 	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
439 	{
440 		CopyMultiInsertBuffer *buffer;
441 
442 		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
443 
444 		/*
445 		 * We never want to remove the buffer that's currently being used, so
446 		 * if we happen to find that then move it to the end of the list.
447 		 */
448 		if (buffer->resultRelInfo == curr_rri)
449 		{
450 			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
451 			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
452 			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
453 		}
454 
455 		CopyMultiInsertBufferCleanup(miinfo, buffer);
456 		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
457 	}
458 }
459 
460 /*
461  * Cleanup allocated buffers and free memory
462  */
463 static inline void
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo * miinfo)464 CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
465 {
466 	ListCell   *lc;
467 
468 	foreach(lc, miinfo->multiInsertBuffers)
469 		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
470 
471 	list_free(miinfo->multiInsertBuffers);
472 }
473 
474 /*
475  * Get the next TupleTableSlot that the next tuple should be stored in.
476  *
477  * Callers must ensure that the buffer is not full.
478  *
479  * Note: 'miinfo' is unused but has been included for consistency with the
480  * other functions in this area.
481  */
482 static inline TupleTableSlot *
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri)483 CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
484 								ResultRelInfo *rri)
485 {
486 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
487 	int			nused = buffer->nused;
488 
489 	Assert(buffer != NULL);
490 	Assert(nused < MAX_BUFFERED_TUPLES);
491 
492 	if (buffer->slots[nused] == NULL)
493 		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
494 	return buffer->slots[nused];
495 }
496 
497 /*
498  * Record the previously reserved TupleTableSlot that was reserved by
499  * CopyMultiInsertInfoNextFreeSlot as being consumed.
500  */
501 static inline void
CopyMultiInsertInfoStore(CopyMultiInsertInfo * miinfo,ResultRelInfo * rri,TupleTableSlot * slot,int tuplen,uint64 lineno)502 CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
503 						 TupleTableSlot *slot, int tuplen, uint64 lineno)
504 {
505 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
506 
507 	Assert(buffer != NULL);
508 	Assert(slot == buffer->slots[buffer->nused]);
509 
510 	/* Store the line number so we can properly report any errors later */
511 	buffer->linenos[buffer->nused] = lineno;
512 
513 	/* Record this slot as being used */
514 	buffer->nused++;
515 
516 	/* Update how many tuples are stored and their size */
517 	miinfo->bufferedTuples++;
518 	miinfo->bufferedBytes += tuplen;
519 }
520 
521 /*
522  * Copy FROM file to relation.
523  */
524 uint64
CopyFrom(CopyFromState cstate)525 CopyFrom(CopyFromState cstate)
526 {
527 	ResultRelInfo *resultRelInfo;
528 	ResultRelInfo *target_resultRelInfo;
529 	ResultRelInfo *prevResultRelInfo = NULL;
530 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
531 	ModifyTableState *mtstate;
532 	ExprContext *econtext;
533 	TupleTableSlot *singleslot = NULL;
534 	MemoryContext oldcontext = CurrentMemoryContext;
535 
536 	PartitionTupleRouting *proute = NULL;
537 	ErrorContextCallback errcallback;
538 	CommandId	mycid = GetCurrentCommandId(true);
539 	int			ti_options = 0; /* start with default options for insert */
540 	BulkInsertState bistate = NULL;
541 	CopyInsertMethod insertMethod;
542 	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
543 	int64		processed = 0;
544 	int64		excluded = 0;
545 	bool		has_before_insert_row_trig;
546 	bool		has_instead_insert_row_trig;
547 	bool		leafpart_use_multi_insert = false;
548 
549 	Assert(cstate->rel);
550 	Assert(list_length(cstate->range_table) == 1);
551 
552 	/*
553 	 * The target must be a plain, foreign, or partitioned relation, or have
554 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
555 	 * allowed on views, so we only hint about them in the view case.)
556 	 */
557 	if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
558 		cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
559 		cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
560 		!(cstate->rel->trigdesc &&
561 		  cstate->rel->trigdesc->trig_insert_instead_row))
562 	{
563 		if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
564 			ereport(ERROR,
565 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
566 					 errmsg("cannot copy to view \"%s\"",
567 							RelationGetRelationName(cstate->rel)),
568 					 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
569 		else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
570 			ereport(ERROR,
571 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
572 					 errmsg("cannot copy to materialized view \"%s\"",
573 							RelationGetRelationName(cstate->rel))));
574 		else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
575 			ereport(ERROR,
576 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
577 					 errmsg("cannot copy to sequence \"%s\"",
578 							RelationGetRelationName(cstate->rel))));
579 		else
580 			ereport(ERROR,
581 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
582 					 errmsg("cannot copy to non-table relation \"%s\"",
583 							RelationGetRelationName(cstate->rel))));
584 	}
585 
586 	/*
587 	 * If the target file is new-in-transaction, we assume that checking FSM
588 	 * for free space is a waste of time.  This could possibly be wrong, but
589 	 * it's unlikely.
590 	 */
591 	if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
592 		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
593 		 cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
594 		ti_options |= TABLE_INSERT_SKIP_FSM;
595 
596 	/*
597 	 * Optimize if new relfilenode was created in this subxact or one of its
598 	 * committed children and we won't see those rows later as part of an
599 	 * earlier scan or command. The subxact test ensures that if this subxact
600 	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
601 	 * that the stronger test of exactly which subtransaction created it is
602 	 * crucial for correctness of this optimization. The test for an earlier
603 	 * scan or command tolerates false negatives. FREEZE causes other sessions
604 	 * to see rows they would not see under MVCC, and a false negative merely
605 	 * spreads that anomaly to the current session.
606 	 */
607 	if (cstate->opts.freeze)
608 	{
609 		/*
610 		 * We currently disallow COPY FREEZE on partitioned tables.  The
611 		 * reason for this is that we've simply not yet opened the partitions
612 		 * to determine if the optimization can be applied to them.  We could
613 		 * go and open them all here, but doing so may be quite a costly
614 		 * overhead for small copies.  In any case, we may just end up routing
615 		 * tuples to a small number of partitions.  It seems better just to
616 		 * raise an ERROR for partitioned tables.
617 		 */
618 		if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
619 		{
620 			ereport(ERROR,
621 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
622 					 errmsg("cannot perform COPY FREEZE on a partitioned table")));
623 		}
624 
625 		/*
626 		 * Tolerate one registration for the benefit of FirstXactSnapshot.
627 		 * Scan-bearing queries generally create at least two registrations,
628 		 * though relying on that is fragile, as is ignoring ActiveSnapshot.
629 		 * Clear CatalogSnapshot to avoid counting its registration.  We'll
630 		 * still detect ongoing catalog scans, each of which separately
631 		 * registers the snapshot it uses.
632 		 */
633 		InvalidateCatalogSnapshot();
634 		if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
635 			ereport(ERROR,
636 					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
637 					 errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
638 
639 		if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
640 			cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
641 			ereport(ERROR,
642 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
643 					 errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
644 
645 		ti_options |= TABLE_INSERT_FROZEN;
646 	}
647 
648 	/*
649 	 * We need a ResultRelInfo so we can use the regular executor's
650 	 * index-entry-making machinery.  (There used to be a huge amount of code
651 	 * here that basically duplicated execUtils.c ...)
652 	 */
653 	ExecInitRangeTable(estate, cstate->range_table);
654 	resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
655 	ExecInitResultRelation(estate, resultRelInfo, 1);
656 
657 	/* Verify the named relation is a valid target for INSERT */
658 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
659 
660 	ExecOpenIndices(resultRelInfo, false);
661 
662 	/*
663 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
664 	 * foreign-table result relation(s).
665 	 */
666 	mtstate = makeNode(ModifyTableState);
667 	mtstate->ps.plan = NULL;
668 	mtstate->ps.state = estate;
669 	mtstate->operation = CMD_INSERT;
670 	mtstate->mt_nrels = 1;
671 	mtstate->resultRelInfo = resultRelInfo;
672 	mtstate->rootResultRelInfo = resultRelInfo;
673 
674 	if (resultRelInfo->ri_FdwRoutine != NULL &&
675 		resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
676 		resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
677 														 resultRelInfo);
678 
679 	/* Prepare to catch AFTER triggers. */
680 	AfterTriggerBeginQuery();
681 
682 	/*
683 	 * If there are any triggers with transition tables on the named relation,
684 	 * we need to be prepared to capture transition tuples.
685 	 *
686 	 * Because partition tuple routing would like to know about whether
687 	 * transition capture is active, we also set it in mtstate, which is
688 	 * passed to ExecFindPartition() below.
689 	 */
690 	cstate->transition_capture = mtstate->mt_transition_capture =
691 		MakeTransitionCaptureState(cstate->rel->trigdesc,
692 								   RelationGetRelid(cstate->rel),
693 								   CMD_INSERT);
694 
695 	/*
696 	 * If the named relation is a partitioned table, initialize state for
697 	 * CopyFrom tuple routing.
698 	 */
699 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
700 		proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
701 
702 	if (cstate->whereClause)
703 		cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
704 										&mtstate->ps);
705 
706 	/*
707 	 * It's generally more efficient to prepare a bunch of tuples for
708 	 * insertion, and insert them in one table_multi_insert() call, than call
709 	 * table_tuple_insert() separately for every tuple. However, there are a
710 	 * number of reasons why we might not be able to do this.  These are
711 	 * explained below.
712 	 */
713 	if (resultRelInfo->ri_TrigDesc != NULL &&
714 		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
715 		 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
716 	{
717 		/*
718 		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
719 		 * triggers on the table. Such triggers might query the table we're
720 		 * inserting into and act differently if the tuples that have already
721 		 * been processed and prepared for insertion are not there.
722 		 */
723 		insertMethod = CIM_SINGLE;
724 	}
725 	else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
726 			 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
727 	{
728 		/*
729 		 * For partitioned tables we can't support multi-inserts when there
730 		 * are any statement level insert triggers. It might be possible to
731 		 * allow partitioned tables with such triggers in the future, but for
732 		 * now, CopyMultiInsertInfoFlush expects that any before row insert
733 		 * and statement level insert triggers are on the same relation.
734 		 */
735 		insertMethod = CIM_SINGLE;
736 	}
737 	else if (resultRelInfo->ri_FdwRoutine != NULL ||
738 			 cstate->volatile_defexprs)
739 	{
740 		/*
741 		 * Can't support multi-inserts to foreign tables or if there are any
742 		 * volatile default expressions in the table.  Similarly to the
743 		 * trigger case above, such expressions may query the table we're
744 		 * inserting into.
745 		 *
746 		 * Note: It does not matter if any partitions have any volatile
747 		 * default expressions as we use the defaults from the target of the
748 		 * COPY command.
749 		 */
750 		insertMethod = CIM_SINGLE;
751 	}
752 	else if (contain_volatile_functions(cstate->whereClause))
753 	{
754 		/*
755 		 * Can't support multi-inserts if there are any volatile function
756 		 * expressions in WHERE clause.  Similarly to the trigger case above,
757 		 * such expressions may query the table we're inserting into.
758 		 */
759 		insertMethod = CIM_SINGLE;
760 	}
761 	else
762 	{
763 		/*
764 		 * For partitioned tables, we may still be able to perform bulk
765 		 * inserts.  However, the possibility of this depends on which types
766 		 * of triggers exist on the partition.  We must disable bulk inserts
767 		 * if the partition is a foreign table or it has any before row insert
768 		 * or insert instead triggers (same as we checked above for the parent
769 		 * table).  Since the partition's resultRelInfos are initialized only
770 		 * when we actually need to insert the first tuple into them, we must
771 		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
772 		 * flag that we must later determine if we can use bulk-inserts for
773 		 * the partition being inserted into.
774 		 */
775 		if (proute)
776 			insertMethod = CIM_MULTI_CONDITIONAL;
777 		else
778 			insertMethod = CIM_MULTI;
779 
780 		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
781 								estate, mycid, ti_options);
782 	}
783 
784 	/*
785 	 * If not using batch mode (which allocates slots as needed) set up a
786 	 * tuple slot too. When inserting into a partitioned table, we also need
787 	 * one, even if we might batch insert, to read the tuple in the root
788 	 * partition's form.
789 	 */
790 	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
791 	{
792 		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
793 									   &estate->es_tupleTable);
794 		bistate = GetBulkInsertState();
795 	}
796 
797 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
798 								  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
799 
800 	has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
801 								   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
802 
803 	/*
804 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
805 	 * should do this for COPY, since it's not really an "INSERT" statement as
806 	 * such. However, executing these triggers maintains consistency with the
807 	 * EACH ROW triggers that we already fire on COPY.
808 	 */
809 	ExecBSInsertTriggers(estate, resultRelInfo);
810 
811 	econtext = GetPerTupleExprContext(estate);
812 
813 	/* Set up callback to identify error line number */
814 	errcallback.callback = CopyFromErrorCallback;
815 	errcallback.arg = (void *) cstate;
816 	errcallback.previous = error_context_stack;
817 	error_context_stack = &errcallback;
818 
819 	for (;;)
820 	{
821 		TupleTableSlot *myslot;
822 		bool		skip_tuple;
823 
824 		CHECK_FOR_INTERRUPTS();
825 
826 		/*
827 		 * Reset the per-tuple exprcontext. We do this after every tuple, to
828 		 * clean-up after expression evaluations etc.
829 		 */
830 		ResetPerTupleExprContext(estate);
831 
832 		/* select slot to (initially) load row into */
833 		if (insertMethod == CIM_SINGLE || proute)
834 		{
835 			myslot = singleslot;
836 			Assert(myslot != NULL);
837 		}
838 		else
839 		{
840 			Assert(resultRelInfo == target_resultRelInfo);
841 			Assert(insertMethod == CIM_MULTI);
842 
843 			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
844 													 resultRelInfo);
845 		}
846 
847 		/*
848 		 * Switch to per-tuple context before calling NextCopyFrom, which does
849 		 * evaluate default expressions etc. and requires per-tuple context.
850 		 */
851 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
852 
853 		ExecClearTuple(myslot);
854 
855 		/* Directly store the values/nulls array in the slot */
856 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
857 			break;
858 
859 		ExecStoreVirtualTuple(myslot);
860 
861 		/*
862 		 * Constraints and where clause might reference the tableoid column,
863 		 * so (re-)initialize tts_tableOid before evaluating them.
864 		 */
865 		myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
866 
867 		/* Triggers and stuff need to be invoked in query context. */
868 		MemoryContextSwitchTo(oldcontext);
869 
870 		if (cstate->whereClause)
871 		{
872 			econtext->ecxt_scantuple = myslot;
873 			/* Skip items that don't match COPY's WHERE clause */
874 			if (!ExecQual(cstate->qualexpr, econtext))
875 			{
876 				/*
877 				 * Report that this tuple was filtered out by the WHERE
878 				 * clause.
879 				 */
880 				pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
881 											 ++excluded);
882 				continue;
883 			}
884 		}
885 
886 		/* Determine the partition to insert the tuple into */
887 		if (proute)
888 		{
889 			TupleConversionMap *map;
890 
891 			/*
892 			 * Attempt to find a partition suitable for this tuple.
893 			 * ExecFindPartition() will raise an error if none can be found or
894 			 * if the found partition is not suitable for INSERTs.
895 			 */
896 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
897 											  proute, myslot, estate);
898 
899 			if (prevResultRelInfo != resultRelInfo)
900 			{
901 				/* Determine which triggers exist on this partition */
902 				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
903 											  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
904 
905 				has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
906 											   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
907 
908 				/*
909 				 * Disable multi-inserts when the partition has BEFORE/INSTEAD
910 				 * OF triggers, or if the partition is a foreign partition.
911 				 */
912 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
913 					!has_before_insert_row_trig &&
914 					!has_instead_insert_row_trig &&
915 					resultRelInfo->ri_FdwRoutine == NULL;
916 
917 				/* Set the multi-insert buffer to use for this partition. */
918 				if (leafpart_use_multi_insert)
919 				{
920 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
921 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
922 													   resultRelInfo);
923 				}
924 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
925 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
926 				{
927 					/*
928 					 * Flush pending inserts if this partition can't use
929 					 * batching, so rows are visible to triggers etc.
930 					 */
931 					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
932 				}
933 
934 				if (bistate != NULL)
935 					ReleaseBulkInsertStatePin(bistate);
936 				prevResultRelInfo = resultRelInfo;
937 			}
938 
939 			/*
940 			 * If we're capturing transition tuples, we might need to convert
941 			 * from the partition rowtype to root rowtype. But if there are no
942 			 * BEFORE triggers on the partition that could change the tuple,
943 			 * we can just remember the original unconverted tuple to avoid a
944 			 * needless round trip conversion.
945 			 */
946 			if (cstate->transition_capture != NULL)
947 				cstate->transition_capture->tcs_original_insert_tuple =
948 					!has_before_insert_row_trig ? myslot : NULL;
949 
950 			/*
951 			 * We might need to convert from the root rowtype to the partition
952 			 * rowtype.
953 			 */
954 			map = resultRelInfo->ri_RootToPartitionMap;
955 			if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
956 			{
957 				/* non batch insert */
958 				if (map != NULL)
959 				{
960 					TupleTableSlot *new_slot;
961 
962 					new_slot = resultRelInfo->ri_PartitionTupleSlot;
963 					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
964 				}
965 			}
966 			else
967 			{
968 				/*
969 				 * Prepare to queue up tuple for later batch insert into
970 				 * current partition.
971 				 */
972 				TupleTableSlot *batchslot;
973 
974 				/* no other path available for partitioned table */
975 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
976 
977 				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
978 															resultRelInfo);
979 
980 				if (map != NULL)
981 					myslot = execute_attr_map_slot(map->attrMap, myslot,
982 												   batchslot);
983 				else
984 				{
985 					/*
986 					 * This looks more expensive than it is (Believe me, I
987 					 * optimized it away. Twice.). The input is in virtual
988 					 * form, and we'll materialize the slot below - for most
989 					 * slot types the copy performs the work materialization
990 					 * would later require anyway.
991 					 */
992 					ExecCopySlot(batchslot, myslot);
993 					myslot = batchslot;
994 				}
995 			}
996 
997 			/* ensure that triggers etc see the right relation  */
998 			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
999 		}
1000 
1001 		skip_tuple = false;
1002 
1003 		/* BEFORE ROW INSERT Triggers */
1004 		if (has_before_insert_row_trig)
1005 		{
1006 			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1007 				skip_tuple = true;	/* "do nothing" */
1008 		}
1009 
1010 		if (!skip_tuple)
1011 		{
1012 			/*
1013 			 * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1014 			 * tuple.  Otherwise, proceed with inserting the tuple into the
1015 			 * table or foreign table.
1016 			 */
1017 			if (has_instead_insert_row_trig)
1018 			{
1019 				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1020 			}
1021 			else
1022 			{
1023 				/* Compute stored generated columns */
1024 				if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1025 					resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1026 					ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1027 											   CMD_INSERT);
1028 
1029 				/*
1030 				 * If the target is a plain table, check the constraints of
1031 				 * the tuple.
1032 				 */
1033 				if (resultRelInfo->ri_FdwRoutine == NULL &&
1034 					resultRelInfo->ri_RelationDesc->rd_att->constr)
1035 					ExecConstraints(resultRelInfo, myslot, estate);
1036 
1037 				/*
1038 				 * Also check the tuple against the partition constraint, if
1039 				 * there is one; except that if we got here via tuple-routing,
1040 				 * we don't need to if there's no BR trigger defined on the
1041 				 * partition.
1042 				 */
1043 				if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1044 					(proute == NULL || has_before_insert_row_trig))
1045 					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1046 
1047 				/* Store the slot in the multi-insert buffer, when enabled. */
1048 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1049 				{
1050 					/*
1051 					 * The slot previously might point into the per-tuple
1052 					 * context. For batching it needs to be longer lived.
1053 					 */
1054 					ExecMaterializeSlot(myslot);
1055 
1056 					/* Add this tuple to the tuple buffer */
1057 					CopyMultiInsertInfoStore(&multiInsertInfo,
1058 											 resultRelInfo, myslot,
1059 											 cstate->line_buf.len,
1060 											 cstate->cur_lineno);
1061 
1062 					/*
1063 					 * If enough inserts have queued up, then flush all
1064 					 * buffers out to their tables.
1065 					 */
1066 					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1067 						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
1068 				}
1069 				else
1070 				{
1071 					List	   *recheckIndexes = NIL;
1072 
1073 					/* OK, store the tuple */
1074 					if (resultRelInfo->ri_FdwRoutine != NULL)
1075 					{
1076 						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1077 																				 resultRelInfo,
1078 																				 myslot,
1079 																				 NULL);
1080 
1081 						if (myslot == NULL) /* "do nothing" */
1082 							continue;	/* next tuple please */
1083 
1084 						/*
1085 						 * AFTER ROW Triggers might reference the tableoid
1086 						 * column, so (re-)initialize tts_tableOid before
1087 						 * evaluating them.
1088 						 */
1089 						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1090 					}
1091 					else
1092 					{
1093 						/* OK, store the tuple and create index entries for it */
1094 						table_tuple_insert(resultRelInfo->ri_RelationDesc,
1095 										   myslot, mycid, ti_options, bistate);
1096 
1097 						if (resultRelInfo->ri_NumIndices > 0)
1098 							recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1099 																   myslot,
1100 																   estate,
1101 																   false,
1102 																   false,
1103 																   NULL,
1104 																   NIL);
1105 					}
1106 
1107 					/* AFTER ROW INSERT Triggers */
1108 					ExecARInsertTriggers(estate, resultRelInfo, myslot,
1109 										 recheckIndexes, cstate->transition_capture);
1110 
1111 					list_free(recheckIndexes);
1112 				}
1113 			}
1114 
1115 			/*
1116 			 * We count only tuples not suppressed by a BEFORE INSERT trigger
1117 			 * or FDW; this is the same definition used by nodeModifyTable.c
1118 			 * for counting tuples inserted by an INSERT command.  Update
1119 			 * progress of the COPY command as well.
1120 			 */
1121 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1122 										 ++processed);
1123 		}
1124 	}
1125 
1126 	/* Flush any remaining buffered tuples */
1127 	if (insertMethod != CIM_SINGLE)
1128 	{
1129 		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1130 			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
1131 	}
1132 
1133 	/* Done, clean up */
1134 	error_context_stack = errcallback.previous;
1135 
1136 	if (bistate != NULL)
1137 		FreeBulkInsertState(bistate);
1138 
1139 	MemoryContextSwitchTo(oldcontext);
1140 
1141 	/* Execute AFTER STATEMENT insertion triggers */
1142 	ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1143 
1144 	/* Handle queued AFTER triggers */
1145 	AfterTriggerEndQuery(estate);
1146 
1147 	ExecResetTupleTable(estate->es_tupleTable, false);
1148 
1149 	/* Allow the FDW to shut down */
1150 	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1151 		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1152 		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1153 															  target_resultRelInfo);
1154 
1155 	/* Tear down the multi-insert buffer data */
1156 	if (insertMethod != CIM_SINGLE)
1157 		CopyMultiInsertInfoCleanup(&multiInsertInfo);
1158 
1159 	/* Close all the partitioned tables, leaf partitions, and their indices */
1160 	if (proute)
1161 		ExecCleanupTupleRouting(mtstate, proute);
1162 
1163 	/* Close the result relations, including any trigger target relations */
1164 	ExecCloseResultRelations(estate);
1165 	ExecCloseRangeTableRelations(estate);
1166 
1167 	FreeExecutorState(estate);
1168 
1169 	return processed;
1170 }
1171 
1172 /*
1173  * Setup to read tuples from a file for COPY FROM.
1174  *
1175  * 'rel': Used as a template for the tuples
1176  * 'whereClause': WHERE clause from the COPY FROM command
1177  * 'filename': Name of server-local file to read, NULL for STDIN
1178  * 'is_program': true if 'filename' is program to execute
1179  * 'data_source_cb': callback that provides the input data
1180  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1181  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1182  *
1183  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1184  */
1185 CopyFromState
BeginCopyFrom(ParseState * pstate,Relation rel,Node * whereClause,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)1186 BeginCopyFrom(ParseState *pstate,
1187 			  Relation rel,
1188 			  Node *whereClause,
1189 			  const char *filename,
1190 			  bool is_program,
1191 			  copy_data_source_cb data_source_cb,
1192 			  List *attnamelist,
1193 			  List *options)
1194 {
1195 	CopyFromState cstate;
1196 	bool		pipe = (filename == NULL);
1197 	TupleDesc	tupDesc;
1198 	AttrNumber	num_phys_attrs,
1199 				num_defaults;
1200 	FmgrInfo   *in_functions;
1201 	Oid		   *typioparams;
1202 	int			attnum;
1203 	Oid			in_func_oid;
1204 	int		   *defmap;
1205 	ExprState **defexprs;
1206 	MemoryContext oldcontext;
1207 	bool		volatile_defexprs;
1208 	const int	progress_cols[] = {
1209 		PROGRESS_COPY_COMMAND,
1210 		PROGRESS_COPY_TYPE,
1211 		PROGRESS_COPY_BYTES_TOTAL
1212 	};
1213 	int64		progress_vals[] = {
1214 		PROGRESS_COPY_COMMAND_FROM,
1215 		0,
1216 		0
1217 	};
1218 
1219 	/* Allocate workspace and zero all fields */
1220 	cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1221 
1222 	/*
1223 	 * We allocate everything used by a cstate in a new memory context. This
1224 	 * avoids memory leaks during repeated use of COPY in a query.
1225 	 */
1226 	cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1227 												"COPY",
1228 												ALLOCSET_DEFAULT_SIZES);
1229 
1230 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1231 
1232 	/* Extract options from the statement node tree */
1233 	ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1234 
1235 	/* Process the target relation */
1236 	cstate->rel = rel;
1237 
1238 	tupDesc = RelationGetDescr(cstate->rel);
1239 
1240 	/* process commmon options or initialization */
1241 
1242 	/* Generate or convert list of attributes to process */
1243 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1244 
1245 	num_phys_attrs = tupDesc->natts;
1246 
1247 	/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1248 	cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1249 	if (cstate->opts.force_notnull)
1250 	{
1251 		List	   *attnums;
1252 		ListCell   *cur;
1253 
1254 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1255 
1256 		foreach(cur, attnums)
1257 		{
1258 			int			attnum = lfirst_int(cur);
1259 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1260 
1261 			if (!list_member_int(cstate->attnumlist, attnum))
1262 				ereport(ERROR,
1263 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1264 						 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1265 								NameStr(attr->attname))));
1266 			cstate->opts.force_notnull_flags[attnum - 1] = true;
1267 		}
1268 	}
1269 
1270 	/* Convert FORCE_NULL name list to per-column flags, check validity */
1271 	cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1272 	if (cstate->opts.force_null)
1273 	{
1274 		List	   *attnums;
1275 		ListCell   *cur;
1276 
1277 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1278 
1279 		foreach(cur, attnums)
1280 		{
1281 			int			attnum = lfirst_int(cur);
1282 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1283 
1284 			if (!list_member_int(cstate->attnumlist, attnum))
1285 				ereport(ERROR,
1286 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1287 						 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1288 								NameStr(attr->attname))));
1289 			cstate->opts.force_null_flags[attnum - 1] = true;
1290 		}
1291 	}
1292 
1293 	/* Convert convert_selectively name list to per-column flags */
1294 	if (cstate->opts.convert_selectively)
1295 	{
1296 		List	   *attnums;
1297 		ListCell   *cur;
1298 
1299 		cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1300 
1301 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1302 
1303 		foreach(cur, attnums)
1304 		{
1305 			int			attnum = lfirst_int(cur);
1306 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1307 
1308 			if (!list_member_int(cstate->attnumlist, attnum))
1309 				ereport(ERROR,
1310 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1311 						 errmsg_internal("selected column \"%s\" not referenced by COPY",
1312 										 NameStr(attr->attname))));
1313 			cstate->convert_select_flags[attnum - 1] = true;
1314 		}
1315 	}
1316 
1317 	/* Use client encoding when ENCODING option is not specified. */
1318 	if (cstate->opts.file_encoding < 0)
1319 		cstate->file_encoding = pg_get_client_encoding();
1320 	else
1321 		cstate->file_encoding = cstate->opts.file_encoding;
1322 
1323 	/*
1324 	 * Look up encoding conversion function.
1325 	 */
1326 	if (cstate->file_encoding == GetDatabaseEncoding() ||
1327 		cstate->file_encoding == PG_SQL_ASCII ||
1328 		GetDatabaseEncoding() == PG_SQL_ASCII)
1329 	{
1330 		cstate->need_transcoding = false;
1331 	}
1332 	else
1333 	{
1334 		cstate->need_transcoding = true;
1335 		cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1336 															GetDatabaseEncoding());
1337 	}
1338 
1339 	cstate->copy_src = COPY_FILE;	/* default */
1340 
1341 	cstate->whereClause = whereClause;
1342 
1343 	MemoryContextSwitchTo(oldcontext);
1344 
1345 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1346 
1347 	/* Initialize state variables */
1348 	cstate->eol_type = EOL_UNKNOWN;
1349 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
1350 	cstate->cur_lineno = 0;
1351 	cstate->cur_attname = NULL;
1352 	cstate->cur_attval = NULL;
1353 
1354 	/*
1355 	 * Allocate buffers for the input pipeline.
1356 	 *
1357 	 * attribute_buf and raw_buf are used in both text and binary modes, but
1358 	 * input_buf and line_buf only in text mode.
1359 	 */
1360 	cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1361 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
1362 	cstate->raw_reached_eof = false;
1363 
1364 	if (!cstate->opts.binary)
1365 	{
1366 		/*
1367 		 * If encoding conversion is needed, we need another buffer to hold
1368 		 * the converted input data.  Otherwise, we can just point input_buf
1369 		 * to the same buffer as raw_buf.
1370 		 */
1371 		if (cstate->need_transcoding)
1372 		{
1373 			cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1374 			cstate->input_buf_index = cstate->input_buf_len = 0;
1375 		}
1376 		else
1377 			cstate->input_buf = cstate->raw_buf;
1378 		cstate->input_reached_eof = false;
1379 
1380 		initStringInfo(&cstate->line_buf);
1381 	}
1382 
1383 	initStringInfo(&cstate->attribute_buf);
1384 
1385 	/* Assign range table, we'll need it in CopyFrom. */
1386 	if (pstate)
1387 		cstate->range_table = pstate->p_rtable;
1388 
1389 	tupDesc = RelationGetDescr(cstate->rel);
1390 	num_phys_attrs = tupDesc->natts;
1391 	num_defaults = 0;
1392 	volatile_defexprs = false;
1393 
1394 	/*
1395 	 * Pick up the required catalog information for each attribute in the
1396 	 * relation, including the input function, the element type (to pass to
1397 	 * the input function), and info about defaults and constraints. (Which
1398 	 * input function we use depends on text/binary format choice.)
1399 	 */
1400 	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1401 	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1402 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1403 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1404 
1405 	for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1406 	{
1407 		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1408 
1409 		/* We don't need info for dropped attributes */
1410 		if (att->attisdropped)
1411 			continue;
1412 
1413 		/* Fetch the input function and typioparam info */
1414 		if (cstate->opts.binary)
1415 			getTypeBinaryInputInfo(att->atttypid,
1416 								   &in_func_oid, &typioparams[attnum - 1]);
1417 		else
1418 			getTypeInputInfo(att->atttypid,
1419 							 &in_func_oid, &typioparams[attnum - 1]);
1420 		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1421 
1422 		/* Get default info if needed */
1423 		if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
1424 		{
1425 			/* attribute is NOT to be copied from input */
1426 			/* use default value if one exists */
1427 			Expr	   *defexpr = (Expr *) build_column_default(cstate->rel,
1428 																attnum);
1429 
1430 			if (defexpr != NULL)
1431 			{
1432 				/* Run the expression through planner */
1433 				defexpr = expression_planner(defexpr);
1434 
1435 				/* Initialize executable expression in copycontext */
1436 				defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1437 				defmap[num_defaults] = attnum - 1;
1438 				num_defaults++;
1439 
1440 				/*
1441 				 * If a default expression looks at the table being loaded,
1442 				 * then it could give the wrong answer when using
1443 				 * multi-insert. Since database access can be dynamic this is
1444 				 * hard to test for exactly, so we use the much wider test of
1445 				 * whether the default expression is volatile. We allow for
1446 				 * the special case of when the default expression is the
1447 				 * nextval() of a sequence which in this specific case is
1448 				 * known to be safe for use with the multi-insert
1449 				 * optimization. Hence we use this special case function
1450 				 * checker rather than the standard check for
1451 				 * contain_volatile_functions().
1452 				 */
1453 				if (!volatile_defexprs)
1454 					volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1455 			}
1456 		}
1457 	}
1458 
1459 
1460 	/* initialize progress */
1461 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1462 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1463 	cstate->bytes_processed = 0;
1464 
1465 	/* We keep those variables in cstate. */
1466 	cstate->in_functions = in_functions;
1467 	cstate->typioparams = typioparams;
1468 	cstate->defmap = defmap;
1469 	cstate->defexprs = defexprs;
1470 	cstate->volatile_defexprs = volatile_defexprs;
1471 	cstate->num_defaults = num_defaults;
1472 	cstate->is_program = is_program;
1473 
1474 	if (data_source_cb)
1475 	{
1476 		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1477 		cstate->copy_src = COPY_CALLBACK;
1478 		cstate->data_source_cb = data_source_cb;
1479 	}
1480 	else if (pipe)
1481 	{
1482 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1483 		Assert(!is_program);	/* the grammar does not allow this */
1484 		if (whereToSendOutput == DestRemote)
1485 			ReceiveCopyBegin(cstate);
1486 		else
1487 			cstate->copy_file = stdin;
1488 	}
1489 	else
1490 	{
1491 		cstate->filename = pstrdup(filename);
1492 
1493 		if (cstate->is_program)
1494 		{
1495 			progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1496 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1497 			if (cstate->copy_file == NULL)
1498 				ereport(ERROR,
1499 						(errcode_for_file_access(),
1500 						 errmsg("could not execute command \"%s\": %m",
1501 								cstate->filename)));
1502 		}
1503 		else
1504 		{
1505 			struct stat st;
1506 
1507 			progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1508 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1509 			if (cstate->copy_file == NULL)
1510 			{
1511 				/* copy errno because ereport subfunctions might change it */
1512 				int			save_errno = errno;
1513 
1514 				ereport(ERROR,
1515 						(errcode_for_file_access(),
1516 						 errmsg("could not open file \"%s\" for reading: %m",
1517 								cstate->filename),
1518 						 (save_errno == ENOENT || save_errno == EACCES) ?
1519 						 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1520 								 "You may want a client-side facility such as psql's \\copy.") : 0));
1521 			}
1522 
1523 			if (fstat(fileno(cstate->copy_file), &st))
1524 				ereport(ERROR,
1525 						(errcode_for_file_access(),
1526 						 errmsg("could not stat file \"%s\": %m",
1527 								cstate->filename)));
1528 
1529 			if (S_ISDIR(st.st_mode))
1530 				ereport(ERROR,
1531 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1532 						 errmsg("\"%s\" is a directory", cstate->filename)));
1533 
1534 			progress_vals[2] = st.st_size;
1535 		}
1536 	}
1537 
1538 	pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1539 
1540 	if (cstate->opts.binary)
1541 	{
1542 		/* Read and verify binary header */
1543 		ReceiveCopyBinaryHeader(cstate);
1544 	}
1545 
1546 	/* create workspace for CopyReadAttributes results */
1547 	if (!cstate->opts.binary)
1548 	{
1549 		AttrNumber	attr_count = list_length(cstate->attnumlist);
1550 
1551 		cstate->max_fields = attr_count;
1552 		cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1553 	}
1554 
1555 	MemoryContextSwitchTo(oldcontext);
1556 
1557 	return cstate;
1558 }
1559 
1560 /*
1561  * Clean up storage and release resources for COPY FROM.
1562  */
1563 void
EndCopyFrom(CopyFromState cstate)1564 EndCopyFrom(CopyFromState cstate)
1565 {
1566 	/* No COPY FROM related resources except memory. */
1567 	if (cstate->is_program)
1568 	{
1569 		ClosePipeFromProgram(cstate);
1570 	}
1571 	else
1572 	{
1573 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1574 			ereport(ERROR,
1575 					(errcode_for_file_access(),
1576 					 errmsg("could not close file \"%s\": %m",
1577 							cstate->filename)));
1578 	}
1579 
1580 	pgstat_progress_end_command();
1581 
1582 	MemoryContextDelete(cstate->copycontext);
1583 	pfree(cstate);
1584 }
1585 
1586 /*
1587  * Closes the pipe from an external program, checking the pclose() return code.
1588  */
1589 static void
ClosePipeFromProgram(CopyFromState cstate)1590 ClosePipeFromProgram(CopyFromState cstate)
1591 {
1592 	int			pclose_rc;
1593 
1594 	Assert(cstate->is_program);
1595 
1596 	pclose_rc = ClosePipeStream(cstate->copy_file);
1597 	if (pclose_rc == -1)
1598 		ereport(ERROR,
1599 				(errcode_for_file_access(),
1600 				 errmsg("could not close pipe to external command: %m")));
1601 	else if (pclose_rc != 0)
1602 	{
1603 		/*
1604 		 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1605 		 * expectable for the called program to fail with SIGPIPE, and we
1606 		 * should not report that as an error.  Otherwise, SIGPIPE indicates a
1607 		 * problem.
1608 		 */
1609 		if (!cstate->raw_reached_eof &&
1610 			wait_result_is_signal(pclose_rc, SIGPIPE))
1611 			return;
1612 
1613 		ereport(ERROR,
1614 				(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1615 				 errmsg("program \"%s\" failed",
1616 						cstate->filename),
1617 				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1618 	}
1619 }
1620