1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  *		Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/xact.h"
25 #include "access/xlog.h"
26 #include "catalog/dependency.h"
27 #include "catalog/pg_authid.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/execPartition.h"
33 #include "executor/executor.h"
34 #include "foreign/fdwapi.h"
35 #include "libpq/libpq.h"
36 #include "libpq/pqformat.h"
37 #include "mb/pg_wchar.h"
38 #include "miscadmin.h"
39 #include "optimizer/clauses.h"
40 #include "optimizer/planner.h"
41 #include "nodes/makefuncs.h"
42 #include "parser/parse_relation.h"
43 #include "port/pg_bswap.h"
44 #include "rewrite/rewriteHandler.h"
45 #include "storage/fd.h"
46 #include "tcop/tcopprot.h"
47 #include "utils/builtins.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/portal.h"
51 #include "utils/rel.h"
52 #include "utils/rls.h"
53 #include "utils/snapmgr.h"
54 
55 
56 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
57 #define OCTVALUE(c) ((c) - '0')
58 
59 /*
60  * Represents the different source/dest cases we need to worry about at
61  * the bottom level
62  */
63 typedef enum CopyDest
64 {
65 	COPY_FILE,					/* to/from file (or a piped program) */
66 	COPY_OLD_FE,				/* to/from frontend (2.0 protocol) */
67 	COPY_NEW_FE,				/* to/from frontend (3.0 protocol) */
68 	COPY_CALLBACK				/* to/from callback function */
69 } CopyDest;
70 
71 /*
72  *	Represents the end-of-line terminator type of the input
73  */
74 typedef enum EolType
75 {
76 	EOL_UNKNOWN,
77 	EOL_NL,
78 	EOL_CR,
79 	EOL_CRNL
80 } EolType;
81 
82 /*
83  * This struct contains all the state variables used throughout a COPY
84  * operation. For simplicity, we use the same struct for all variants of COPY,
85  * even though some fields are used in only some cases.
86  *
87  * Multi-byte encodings: all supported client-side encodings encode multi-byte
88  * characters by having the first byte's high bit set. Subsequent bytes of the
89  * character can have the high bit not set. When scanning data in such an
90  * encoding to look for a match to a single-byte (ie ASCII) character, we must
91  * use the full pg_encoding_mblen() machinery to skip over multibyte
92  * characters, else we might find a false match to a trailing byte. In
93  * supported server encodings, there is no possibility of a false match, and
94  * it's faster to make useless comparisons to trailing bytes than it is to
95  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
96  * when we have to do it the hard way.
97  */
98 typedef struct CopyStateData
99 {
100 	/* low-level state data */
101 	CopyDest	copy_dest;		/* type of copy source/destination */
102 	FILE	   *copy_file;		/* used if copy_dest == COPY_FILE */
103 	StringInfo	fe_msgbuf;		/* used for all dests during COPY TO, only for
104 								 * dest == COPY_NEW_FE in COPY FROM */
105 	bool		is_copy_from;	/* COPY TO, or COPY FROM? */
106 	bool		reached_eof;	/* true if we read to end of copy data (not
107 								 * all copy_dest types maintain this) */
108 	EolType		eol_type;		/* EOL type of input */
109 	int			file_encoding;	/* file or remote side's character encoding */
110 	bool		need_transcoding;	/* file encoding diff from server? */
111 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
112 
113 	/* parameters from the COPY command */
114 	Relation	rel;			/* relation to copy to or from */
115 	QueryDesc  *queryDesc;		/* executable query to copy from */
116 	List	   *attnumlist;		/* integer list of attnums to copy */
117 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
118 	bool		is_program;		/* is 'filename' a program to popen? */
119 	copy_data_source_cb data_source_cb; /* function for reading data */
120 	bool		binary;			/* binary format? */
121 	bool		oids;			/* include OIDs? */
122 	bool		freeze;			/* freeze rows on loading? */
123 	bool		csv_mode;		/* Comma Separated Value format? */
124 	bool		header_line;	/* CSV header line? */
125 	char	   *null_print;		/* NULL marker string (server encoding!) */
126 	int			null_print_len; /* length of same */
127 	char	   *null_print_client;	/* same converted to file encoding */
128 	char	   *delim;			/* column delimiter (must be 1 byte) */
129 	char	   *quote;			/* CSV quote char (must be 1 byte) */
130 	char	   *escape;			/* CSV escape char (must be 1 byte) */
131 	List	   *force_quote;	/* list of column names */
132 	bool		force_quote_all;	/* FORCE_QUOTE *? */
133 	bool	   *force_quote_flags;	/* per-column CSV FQ flags */
134 	List	   *force_notnull;	/* list of column names */
135 	bool	   *force_notnull_flags;	/* per-column CSV FNN flags */
136 	List	   *force_null;		/* list of column names */
137 	bool	   *force_null_flags;	/* per-column CSV FN flags */
138 	bool		convert_selectively;	/* do selective binary conversion? */
139 	List	   *convert_select; /* list of column names (can be NIL) */
140 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
141 
142 	/* these are just for error messages, see CopyFromErrorCallback */
143 	const char *cur_relname;	/* table name for error messages */
144 	uint64		cur_lineno;		/* line number for error messages */
145 	const char *cur_attname;	/* current att for error messages */
146 	const char *cur_attval;		/* current att value for error messages */
147 
148 	/*
149 	 * Working state for COPY TO/FROM
150 	 */
151 	MemoryContext copycontext;	/* per-copy execution context */
152 
153 	/*
154 	 * Working state for COPY TO
155 	 */
156 	FmgrInfo   *out_functions;	/* lookup info for output functions */
157 	MemoryContext rowcontext;	/* per-row evaluation context */
158 
159 	/*
160 	 * Working state for COPY FROM
161 	 */
162 	AttrNumber	num_defaults;
163 	bool		file_has_oids;
164 	FmgrInfo	oid_in_function;
165 	Oid			oid_typioparam;
166 	FmgrInfo   *in_functions;	/* array of input functions for each attrs */
167 	Oid		   *typioparams;	/* array of element types for in_functions */
168 	int		   *defmap;			/* array of default att numbers */
169 	ExprState **defexprs;		/* array of default att expressions */
170 	bool		volatile_defexprs;	/* is any of defexprs volatile? */
171 	List	   *range_table;
172 
173 	/* Tuple-routing support info */
174 	PartitionTupleRouting *partition_tuple_routing;
175 
176 	TransitionCaptureState *transition_capture;
177 
178 	/*
179 	 * These variables are used to reduce overhead in textual COPY FROM.
180 	 *
181 	 * attribute_buf holds the separated, de-escaped text for each field of
182 	 * the current line.  The CopyReadAttributes functions return arrays of
183 	 * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
184 	 * the buffer on each cycle.
185 	 */
186 	StringInfoData attribute_buf;
187 
188 	/* field raw data pointers found by COPY FROM */
189 
190 	int			max_fields;
191 	char	  **raw_fields;
192 
193 	/*
194 	 * Similarly, line_buf holds the whole input line being processed. The
195 	 * input cycle is first to read the whole line into line_buf, convert it
196 	 * to server encoding there, and then extract the individual attribute
197 	 * fields into attribute_buf.  line_buf is preserved unmodified so that we
198 	 * can display it in error messages if appropriate.
199 	 */
200 	StringInfoData line_buf;
201 	bool		line_buf_converted; /* converted to server encoding? */
202 	bool		line_buf_valid; /* contains the row being processed? */
203 
204 	/*
205 	 * Finally, raw_buf holds raw data read from the data source (file or
206 	 * client connection).  CopyReadLine parses this data sufficiently to
207 	 * locate line boundaries, then transfers the data to line_buf and
208 	 * converts it.  Note: we guarantee that there is a \0 at
209 	 * raw_buf[raw_buf_len].
210 	 */
211 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
212 	char	   *raw_buf;
213 	int			raw_buf_index;	/* next byte to process */
214 	int			raw_buf_len;	/* total # of bytes stored */
215 } CopyStateData;
216 
217 /* DestReceiver for COPY (query) TO */
218 typedef struct
219 {
220 	DestReceiver pub;			/* publicly-known function pointers */
221 	CopyState	cstate;			/* CopyStateData for the command */
222 	uint64		processed;		/* # of tuples processed */
223 } DR_copy;
224 
225 
226 /*
227  * These macros centralize code used to process line_buf and raw_buf buffers.
228  * They are macros because they often do continue/break control and to avoid
229  * function call overhead in tight COPY loops.
230  *
231  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
232  * prevent the continue/break processing from working.  We end the "if (1)"
233  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
234  * any "else" in the calling code, and to avoid any compiler warnings about
235  * empty statements.  See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
236  */
237 
238 /*
239  * This keeps the character read at the top of the loop in the buffer
240  * even if there is more than one read-ahead.
241  */
242 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
243 if (1) \
244 { \
245 	if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
246 	{ \
247 		raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
248 		need_data = true; \
249 		continue; \
250 	} \
251 } else ((void) 0)
252 
253 /* This consumes the remainder of the buffer and breaks */
254 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
255 if (1) \
256 { \
257 	if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
258 	{ \
259 		if (extralen) \
260 			raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
261 		/* backslash just before EOF, treat as data char */ \
262 		result = true; \
263 		break; \
264 	} \
265 } else ((void) 0)
266 
267 /*
268  * Transfer any approved data to line_buf; must do this to be sure
269  * there is some room in raw_buf.
270  */
271 #define REFILL_LINEBUF \
272 if (1) \
273 { \
274 	if (raw_buf_ptr > cstate->raw_buf_index) \
275 	{ \
276 		appendBinaryStringInfo(&cstate->line_buf, \
277 							 cstate->raw_buf + cstate->raw_buf_index, \
278 							   raw_buf_ptr - cstate->raw_buf_index); \
279 		cstate->raw_buf_index = raw_buf_ptr; \
280 	} \
281 } else ((void) 0)
282 
283 /* Undo any read-ahead and jump out of the block. */
284 #define NO_END_OF_COPY_GOTO \
285 if (1) \
286 { \
287 	raw_buf_ptr = prev_raw_ptr + 1; \
288 	goto not_end_of_copy; \
289 } else ((void) 0)
290 
291 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
292 
293 
294 /* non-export function prototypes */
295 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
296 		  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
297 		  List *options);
298 static void EndCopy(CopyState cstate);
299 static void ClosePipeToProgram(CopyState cstate);
300 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
301 			Oid queryRelId, const char *filename, bool is_program,
302 			List *attnamelist, List *options);
303 static void EndCopyTo(CopyState cstate);
304 static uint64 DoCopyTo(CopyState cstate);
305 static uint64 CopyTo(CopyState cstate);
306 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
307 			 Datum *values, bool *nulls);
308 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
309 					CommandId mycid, int hi_options,
310 					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
311 					BulkInsertState bistate,
312 					int nBufferedTuples, HeapTuple *bufferedTuples,
313 					uint64 firstBufferedLineNo);
314 static bool CopyReadLine(CopyState cstate);
315 static bool CopyReadLineText(CopyState cstate);
316 static int	CopyReadAttributesText(CopyState cstate);
317 static int	CopyReadAttributesCSV(CopyState cstate);
318 static Datum CopyReadBinaryAttribute(CopyState cstate,
319 						int column_no, FmgrInfo *flinfo,
320 						Oid typioparam, int32 typmod,
321 						bool *isnull);
322 static void CopyAttributeOutText(CopyState cstate, char *string);
323 static void CopyAttributeOutCSV(CopyState cstate, char *string,
324 					bool use_quote, bool single_attr);
325 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
326 			   List *attnamelist);
327 static char *limit_printout_length(const char *str);
328 
329 /* Low-level communications functions */
330 static void SendCopyBegin(CopyState cstate);
331 static void ReceiveCopyBegin(CopyState cstate);
332 static void SendCopyEnd(CopyState cstate);
333 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
334 static void CopySendString(CopyState cstate, const char *str);
335 static void CopySendChar(CopyState cstate, char c);
336 static void CopySendEndOfRow(CopyState cstate);
337 static int CopyGetData(CopyState cstate, void *databuf,
338 			int minread, int maxread);
339 static void CopySendInt32(CopyState cstate, int32 val);
340 static bool CopyGetInt32(CopyState cstate, int32 *val);
341 static void CopySendInt16(CopyState cstate, int16 val);
342 static bool CopyGetInt16(CopyState cstate, int16 *val);
343 
344 
345 /*
346  * Send copy start/stop messages for frontend copies.  These have changed
347  * in past protocol redesigns.
348  */
349 static void
SendCopyBegin(CopyState cstate)350 SendCopyBegin(CopyState cstate)
351 {
352 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
353 	{
354 		/* new way */
355 		StringInfoData buf;
356 		int			natts = list_length(cstate->attnumlist);
357 		int16		format = (cstate->binary ? 1 : 0);
358 		int			i;
359 
360 		pq_beginmessage(&buf, 'H');
361 		pq_sendbyte(&buf, format);	/* overall format */
362 		pq_sendint16(&buf, natts);
363 		for (i = 0; i < natts; i++)
364 			pq_sendint16(&buf, format); /* per-column formats */
365 		pq_endmessage(&buf);
366 		cstate->copy_dest = COPY_NEW_FE;
367 	}
368 	else
369 	{
370 		/* old way */
371 		if (cstate->binary)
372 			ereport(ERROR,
373 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374 					 errmsg("COPY BINARY is not supported to stdout or from stdin")));
375 		pq_putemptymessage('H');
376 		/* grottiness needed for old COPY OUT protocol */
377 		pq_startcopyout();
378 		cstate->copy_dest = COPY_OLD_FE;
379 	}
380 }
381 
382 static void
ReceiveCopyBegin(CopyState cstate)383 ReceiveCopyBegin(CopyState cstate)
384 {
385 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
386 	{
387 		/* new way */
388 		StringInfoData buf;
389 		int			natts = list_length(cstate->attnumlist);
390 		int16		format = (cstate->binary ? 1 : 0);
391 		int			i;
392 
393 		pq_beginmessage(&buf, 'G');
394 		pq_sendbyte(&buf, format);	/* overall format */
395 		pq_sendint16(&buf, natts);
396 		for (i = 0; i < natts; i++)
397 			pq_sendint16(&buf, format); /* per-column formats */
398 		pq_endmessage(&buf);
399 		cstate->copy_dest = COPY_NEW_FE;
400 		cstate->fe_msgbuf = makeStringInfo();
401 	}
402 	else
403 	{
404 		/* old way */
405 		if (cstate->binary)
406 			ereport(ERROR,
407 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 					 errmsg("COPY BINARY is not supported to stdout or from stdin")));
409 		pq_putemptymessage('G');
410 		/* any error in old protocol will make us lose sync */
411 		pq_startmsgread();
412 		cstate->copy_dest = COPY_OLD_FE;
413 	}
414 	/* We *must* flush here to ensure FE knows it can send. */
415 	pq_flush();
416 }
417 
418 static void
SendCopyEnd(CopyState cstate)419 SendCopyEnd(CopyState cstate)
420 {
421 	if (cstate->copy_dest == COPY_NEW_FE)
422 	{
423 		/* Shouldn't have any unsent data */
424 		Assert(cstate->fe_msgbuf->len == 0);
425 		/* Send Copy Done message */
426 		pq_putemptymessage('c');
427 	}
428 	else
429 	{
430 		CopySendData(cstate, "\\.", 2);
431 		/* Need to flush out the trailer (this also appends a newline) */
432 		CopySendEndOfRow(cstate);
433 		pq_endcopyout(false);
434 	}
435 }
436 
437 /*----------
438  * CopySendData sends output data to the destination (file or frontend)
439  * CopySendString does the same for null-terminated strings
440  * CopySendChar does the same for single characters
441  * CopySendEndOfRow does the appropriate thing at end of each data row
442  *	(data is not actually flushed except by CopySendEndOfRow)
443  *
444  * NB: no data conversion is applied by these functions
445  *----------
446  */
447 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)448 CopySendData(CopyState cstate, const void *databuf, int datasize)
449 {
450 	appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
451 }
452 
453 static void
CopySendString(CopyState cstate,const char * str)454 CopySendString(CopyState cstate, const char *str)
455 {
456 	appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
457 }
458 
459 static void
CopySendChar(CopyState cstate,char c)460 CopySendChar(CopyState cstate, char c)
461 {
462 	appendStringInfoCharMacro(cstate->fe_msgbuf, c);
463 }
464 
465 static void
CopySendEndOfRow(CopyState cstate)466 CopySendEndOfRow(CopyState cstate)
467 {
468 	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
469 
470 	switch (cstate->copy_dest)
471 	{
472 		case COPY_FILE:
473 			if (!cstate->binary)
474 			{
475 				/* Default line termination depends on platform */
476 #ifndef WIN32
477 				CopySendChar(cstate, '\n');
478 #else
479 				CopySendString(cstate, "\r\n");
480 #endif
481 			}
482 
483 			if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
484 					   cstate->copy_file) != 1 ||
485 				ferror(cstate->copy_file))
486 			{
487 				if (cstate->is_program)
488 				{
489 					if (errno == EPIPE)
490 					{
491 						/*
492 						 * The pipe will be closed automatically on error at
493 						 * the end of transaction, but we might get a better
494 						 * error message from the subprocess' exit code than
495 						 * just "Broken Pipe"
496 						 */
497 						ClosePipeToProgram(cstate);
498 
499 						/*
500 						 * If ClosePipeToProgram() didn't throw an error, the
501 						 * program terminated normally, but closed the pipe
502 						 * first. Restore errno, and throw an error.
503 						 */
504 						errno = EPIPE;
505 					}
506 					ereport(ERROR,
507 							(errcode_for_file_access(),
508 							 errmsg("could not write to COPY program: %m")));
509 				}
510 				else
511 					ereport(ERROR,
512 							(errcode_for_file_access(),
513 							 errmsg("could not write to COPY file: %m")));
514 			}
515 			break;
516 		case COPY_OLD_FE:
517 			/* The FE/BE protocol uses \n as newline for all platforms */
518 			if (!cstate->binary)
519 				CopySendChar(cstate, '\n');
520 
521 			if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
522 			{
523 				/* no hope of recovering connection sync, so FATAL */
524 				ereport(FATAL,
525 						(errcode(ERRCODE_CONNECTION_FAILURE),
526 						 errmsg("connection lost during COPY to stdout")));
527 			}
528 			break;
529 		case COPY_NEW_FE:
530 			/* The FE/BE protocol uses \n as newline for all platforms */
531 			if (!cstate->binary)
532 				CopySendChar(cstate, '\n');
533 
534 			/* Dump the accumulated row as one CopyData message */
535 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
536 			break;
537 		case COPY_CALLBACK:
538 			Assert(false);		/* Not yet supported. */
539 			break;
540 	}
541 
542 	resetStringInfo(fe_msgbuf);
543 }
544 
545 /*
546  * CopyGetData reads data from the source (file or frontend)
547  *
548  * We attempt to read at least minread, and at most maxread, bytes from
549  * the source.  The actual number of bytes read is returned; if this is
550  * less than minread, EOF was detected.
551  *
552  * Note: when copying from the frontend, we expect a proper EOF mark per
553  * protocol; if the frontend simply drops the connection, we raise error.
554  * It seems unwise to allow the COPY IN to complete normally in that case.
555  *
556  * NB: no data conversion is applied here.
557  */
558 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)559 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
560 {
561 	int			bytesread = 0;
562 
563 	switch (cstate->copy_dest)
564 	{
565 		case COPY_FILE:
566 			bytesread = fread(databuf, 1, maxread, cstate->copy_file);
567 			if (ferror(cstate->copy_file))
568 				ereport(ERROR,
569 						(errcode_for_file_access(),
570 						 errmsg("could not read from COPY file: %m")));
571 			if (bytesread == 0)
572 				cstate->reached_eof = true;
573 			break;
574 		case COPY_OLD_FE:
575 
576 			/*
577 			 * We cannot read more than minread bytes (which in practice is 1)
578 			 * because old protocol doesn't have any clear way of separating
579 			 * the COPY stream from following data.  This is slow, but not any
580 			 * slower than the code path was originally, and we don't care
581 			 * much anymore about the performance of old protocol.
582 			 */
583 			if (pq_getbytes((char *) databuf, minread))
584 			{
585 				/* Only a \. terminator is legal EOF in old protocol */
586 				ereport(ERROR,
587 						(errcode(ERRCODE_CONNECTION_FAILURE),
588 						 errmsg("unexpected EOF on client connection with an open transaction")));
589 			}
590 			bytesread = minread;
591 			break;
592 		case COPY_NEW_FE:
593 			while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
594 			{
595 				int			avail;
596 
597 				while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
598 				{
599 					/* Try to receive another message */
600 					int			mtype;
601 
602 			readmessage:
603 					HOLD_CANCEL_INTERRUPTS();
604 					pq_startmsgread();
605 					mtype = pq_getbyte();
606 					if (mtype == EOF)
607 						ereport(ERROR,
608 								(errcode(ERRCODE_CONNECTION_FAILURE),
609 								 errmsg("unexpected EOF on client connection with an open transaction")));
610 					if (pq_getmessage(cstate->fe_msgbuf, 0))
611 						ereport(ERROR,
612 								(errcode(ERRCODE_CONNECTION_FAILURE),
613 								 errmsg("unexpected EOF on client connection with an open transaction")));
614 					RESUME_CANCEL_INTERRUPTS();
615 					switch (mtype)
616 					{
617 						case 'd':	/* CopyData */
618 							break;
619 						case 'c':	/* CopyDone */
620 							/* COPY IN correctly terminated by frontend */
621 							cstate->reached_eof = true;
622 							return bytesread;
623 						case 'f':	/* CopyFail */
624 							ereport(ERROR,
625 									(errcode(ERRCODE_QUERY_CANCELED),
626 									 errmsg("COPY from stdin failed: %s",
627 											pq_getmsgstring(cstate->fe_msgbuf))));
628 							break;
629 						case 'H':	/* Flush */
630 						case 'S':	/* Sync */
631 
632 							/*
633 							 * Ignore Flush/Sync for the convenience of client
634 							 * libraries (such as libpq) that may send those
635 							 * without noticing that the command they just
636 							 * sent was COPY.
637 							 */
638 							goto readmessage;
639 						default:
640 							ereport(ERROR,
641 									(errcode(ERRCODE_PROTOCOL_VIOLATION),
642 									 errmsg("unexpected message type 0x%02X during COPY from stdin",
643 											mtype)));
644 							break;
645 					}
646 				}
647 				avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
648 				if (avail > maxread)
649 					avail = maxread;
650 				pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
651 				databuf = (void *) ((char *) databuf + avail);
652 				maxread -= avail;
653 				bytesread += avail;
654 			}
655 			break;
656 		case COPY_CALLBACK:
657 			bytesread = cstate->data_source_cb(databuf, minread, maxread);
658 			break;
659 	}
660 
661 	return bytesread;
662 }
663 
664 
665 /*
666  * These functions do apply some data conversion
667  */
668 
669 /*
670  * CopySendInt32 sends an int32 in network byte order
671  */
672 static void
CopySendInt32(CopyState cstate,int32 val)673 CopySendInt32(CopyState cstate, int32 val)
674 {
675 	uint32		buf;
676 
677 	buf = pg_hton32((uint32) val);
678 	CopySendData(cstate, &buf, sizeof(buf));
679 }
680 
681 /*
682  * CopyGetInt32 reads an int32 that appears in network byte order
683  *
684  * Returns true if OK, false if EOF
685  */
686 static bool
CopyGetInt32(CopyState cstate,int32 * val)687 CopyGetInt32(CopyState cstate, int32 *val)
688 {
689 	uint32		buf;
690 
691 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
692 	{
693 		*val = 0;				/* suppress compiler warning */
694 		return false;
695 	}
696 	*val = (int32) pg_ntoh32(buf);
697 	return true;
698 }
699 
700 /*
701  * CopySendInt16 sends an int16 in network byte order
702  */
703 static void
CopySendInt16(CopyState cstate,int16 val)704 CopySendInt16(CopyState cstate, int16 val)
705 {
706 	uint16		buf;
707 
708 	buf = pg_hton16((uint16) val);
709 	CopySendData(cstate, &buf, sizeof(buf));
710 }
711 
712 /*
713  * CopyGetInt16 reads an int16 that appears in network byte order
714  */
715 static bool
CopyGetInt16(CopyState cstate,int16 * val)716 CopyGetInt16(CopyState cstate, int16 *val)
717 {
718 	uint16		buf;
719 
720 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
721 	{
722 		*val = 0;				/* suppress compiler warning */
723 		return false;
724 	}
725 	*val = (int16) pg_ntoh16(buf);
726 	return true;
727 }
728 
729 
730 /*
731  * CopyLoadRawBuf loads some more data into raw_buf
732  *
733  * Returns true if able to obtain at least one more byte, else false.
734  *
735  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
736  * down to the start of the buffer and then we load more data after that.
737  * This case is used only when a frontend multibyte character crosses a
738  * bufferload boundary.
739  */
740 static bool
CopyLoadRawBuf(CopyState cstate)741 CopyLoadRawBuf(CopyState cstate)
742 {
743 	int			nbytes;
744 	int			inbytes;
745 
746 	if (cstate->raw_buf_index < cstate->raw_buf_len)
747 	{
748 		/* Copy down the unprocessed data */
749 		nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
750 		memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
751 				nbytes);
752 	}
753 	else
754 		nbytes = 0;				/* no data need be saved */
755 
756 	inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
757 						  1, RAW_BUF_SIZE - nbytes);
758 	nbytes += inbytes;
759 	cstate->raw_buf[nbytes] = '\0';
760 	cstate->raw_buf_index = 0;
761 	cstate->raw_buf_len = nbytes;
762 	return (inbytes > 0);
763 }
764 
765 
766 /*
767  *	 DoCopy executes the SQL COPY statement
768  *
769  * Either unload or reload contents of table <relation>, depending on <from>.
770  * (<from> = true means we are inserting into the table.)  In the "TO" case
771  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
772  * or DELETE query.
773  *
774  * If <pipe> is false, transfer is between the table and the file named
775  * <filename>.  Otherwise, transfer is between the table and our regular
776  * input/output stream. The latter could be either stdin/stdout or a
777  * socket, depending on whether we're running under Postmaster control.
778  *
779  * Do not allow a Postgres user without the 'pg_access_server_files' role to
780  * read from or write to a file.
781  *
782  * Do not allow the copy if user doesn't have proper permission to access
783  * the table or the specifically requested columns.
784  */
785 void
DoCopy(ParseState * pstate,const CopyStmt * stmt,int stmt_location,int stmt_len,uint64 * processed)786 DoCopy(ParseState *pstate, const CopyStmt *stmt,
787 	   int stmt_location, int stmt_len,
788 	   uint64 *processed)
789 {
790 	CopyState	cstate;
791 	bool		is_from = stmt->is_from;
792 	bool		pipe = (stmt->filename == NULL);
793 	Relation	rel;
794 	Oid			relid;
795 	RawStmt    *query = NULL;
796 
797 	/*
798 	 * Disallow COPY to/from file or program except to users with the
799 	 * appropriate role.
800 	 */
801 	if (!pipe)
802 	{
803 		if (stmt->is_program)
804 		{
805 			if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
806 				ereport(ERROR,
807 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
808 						 errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
809 						 errhint("Anyone can COPY to stdout or from stdin. "
810 								 "psql's \\copy command also works for anyone.")));
811 		}
812 		else
813 		{
814 			if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
815 				ereport(ERROR,
816 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
817 						 errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
818 						 errhint("Anyone can COPY to stdout or from stdin. "
819 								 "psql's \\copy command also works for anyone.")));
820 
821 			if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES))
822 				ereport(ERROR,
823 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
824 						 errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
825 						 errhint("Anyone can COPY to stdout or from stdin. "
826 								 "psql's \\copy command also works for anyone.")));
827 		}
828 	}
829 
830 	if (stmt->relation)
831 	{
832 		TupleDesc	tupDesc;
833 		List	   *attnums;
834 		ListCell   *cur;
835 		RangeTblEntry *rte;
836 
837 		Assert(!stmt->query);
838 
839 		/* Open and lock the relation, using the appropriate lock type. */
840 		rel = heap_openrv(stmt->relation,
841 						  (is_from ? RowExclusiveLock : AccessShareLock));
842 
843 		relid = RelationGetRelid(rel);
844 
845 		rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
846 		rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
847 
848 		tupDesc = RelationGetDescr(rel);
849 		attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
850 		foreach(cur, attnums)
851 		{
852 			int			attno = lfirst_int(cur) -
853 			FirstLowInvalidHeapAttributeNumber;
854 
855 			if (is_from)
856 				rte->insertedCols = bms_add_member(rte->insertedCols, attno);
857 			else
858 				rte->selectedCols = bms_add_member(rte->selectedCols, attno);
859 		}
860 		ExecCheckRTPerms(pstate->p_rtable, true);
861 
862 		/*
863 		 * Permission check for row security policies.
864 		 *
865 		 * check_enable_rls will ereport(ERROR) if the user has requested
866 		 * something invalid and will otherwise indicate if we should enable
867 		 * RLS (returns RLS_ENABLED) or not for this COPY statement.
868 		 *
869 		 * If the relation has a row security policy and we are to apply it
870 		 * then perform a "query" copy and allow the normal query processing
871 		 * to handle the policies.
872 		 *
873 		 * If RLS is not enabled for this, then just fall through to the
874 		 * normal non-filtering relation handling.
875 		 */
876 		if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
877 		{
878 			SelectStmt *select;
879 			ColumnRef  *cr;
880 			ResTarget  *target;
881 			RangeVar   *from;
882 			List	   *targetList = NIL;
883 
884 			if (is_from)
885 				ereport(ERROR,
886 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
887 						 errmsg("COPY FROM not supported with row-level security"),
888 						 errhint("Use INSERT statements instead.")));
889 
890 			/*
891 			 * Build target list
892 			 *
893 			 * If no columns are specified in the attribute list of the COPY
894 			 * command, then the target list is 'all' columns. Therefore, '*'
895 			 * should be used as the target list for the resulting SELECT
896 			 * statement.
897 			 *
898 			 * In the case that columns are specified in the attribute list,
899 			 * create a ColumnRef and ResTarget for each column and add them
900 			 * to the target list for the resulting SELECT statement.
901 			 */
902 			if (!stmt->attlist)
903 			{
904 				cr = makeNode(ColumnRef);
905 				cr->fields = list_make1(makeNode(A_Star));
906 				cr->location = -1;
907 
908 				target = makeNode(ResTarget);
909 				target->name = NULL;
910 				target->indirection = NIL;
911 				target->val = (Node *) cr;
912 				target->location = -1;
913 
914 				targetList = list_make1(target);
915 			}
916 			else
917 			{
918 				ListCell   *lc;
919 
920 				foreach(lc, stmt->attlist)
921 				{
922 					/*
923 					 * Build the ColumnRef for each column.  The ColumnRef
924 					 * 'fields' property is a String 'Value' node (see
925 					 * nodes/value.h) that corresponds to the column name
926 					 * respectively.
927 					 */
928 					cr = makeNode(ColumnRef);
929 					cr->fields = list_make1(lfirst(lc));
930 					cr->location = -1;
931 
932 					/* Build the ResTarget and add the ColumnRef to it. */
933 					target = makeNode(ResTarget);
934 					target->name = NULL;
935 					target->indirection = NIL;
936 					target->val = (Node *) cr;
937 					target->location = -1;
938 
939 					/* Add each column to the SELECT statement's target list */
940 					targetList = lappend(targetList, target);
941 				}
942 			}
943 
944 			/*
945 			 * Build RangeVar for from clause, fully qualified based on the
946 			 * relation which we have opened and locked.
947 			 */
948 			from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
949 								pstrdup(RelationGetRelationName(rel)),
950 								-1);
951 
952 			/* Build query */
953 			select = makeNode(SelectStmt);
954 			select->targetList = targetList;
955 			select->fromClause = list_make1(from);
956 
957 			query = makeNode(RawStmt);
958 			query->stmt = (Node *) select;
959 			query->stmt_location = stmt_location;
960 			query->stmt_len = stmt_len;
961 
962 			/*
963 			 * Close the relation for now, but keep the lock on it to prevent
964 			 * changes between now and when we start the query-based COPY.
965 			 *
966 			 * We'll reopen it later as part of the query-based COPY.
967 			 */
968 			heap_close(rel, NoLock);
969 			rel = NULL;
970 		}
971 	}
972 	else
973 	{
974 		Assert(stmt->query);
975 
976 		query = makeNode(RawStmt);
977 		query->stmt = stmt->query;
978 		query->stmt_location = stmt_location;
979 		query->stmt_len = stmt_len;
980 
981 		relid = InvalidOid;
982 		rel = NULL;
983 	}
984 
985 	if (is_from)
986 	{
987 		Assert(rel);
988 
989 		/* check read-only transaction and parallel mode */
990 		if (XactReadOnly && !rel->rd_islocaltemp)
991 			PreventCommandIfReadOnly("COPY FROM");
992 		PreventCommandIfParallelMode("COPY FROM");
993 
994 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
995 							   NULL, stmt->attlist, stmt->options);
996 		*processed = CopyFrom(cstate);	/* copy from file to database */
997 		EndCopyFrom(cstate);
998 	}
999 	else
1000 	{
1001 		cstate = BeginCopyTo(pstate, rel, query, relid,
1002 							 stmt->filename, stmt->is_program,
1003 							 stmt->attlist, stmt->options);
1004 		*processed = DoCopyTo(cstate);	/* copy from database to file */
1005 		EndCopyTo(cstate);
1006 	}
1007 
1008 	/*
1009 	 * Close the relation. If reading, we can release the AccessShareLock we
1010 	 * got; if writing, we should hold the lock until end of transaction to
1011 	 * ensure that updates will be committed before lock is released.
1012 	 */
1013 	if (rel != NULL)
1014 		heap_close(rel, (is_from ? NoLock : AccessShareLock));
1015 }
1016 
1017 /*
1018  * Process the statement option list for COPY.
1019  *
1020  * Scan the options list (a list of DefElem) and transpose the information
1021  * into cstate, applying appropriate error checking.
1022  *
1023  * cstate is assumed to be filled with zeroes initially.
1024  *
1025  * This is exported so that external users of the COPY API can sanity-check
1026  * a list of options.  In that usage, cstate should be passed as NULL
1027  * (since external users don't know sizeof(CopyStateData)) and the collected
1028  * data is just leaked until CurrentMemoryContext is reset.
1029  *
1030  * Note that additional checking, such as whether column names listed in FORCE
1031  * QUOTE actually exist, has to be applied later.  This just checks for
1032  * self-consistency of the options list.
1033  */
1034 void
ProcessCopyOptions(ParseState * pstate,CopyState cstate,bool is_from,List * options)1035 ProcessCopyOptions(ParseState *pstate,
1036 				   CopyState cstate,
1037 				   bool is_from,
1038 				   List *options)
1039 {
1040 	bool		format_specified = false;
1041 	ListCell   *option;
1042 
1043 	/* Support external use for option sanity checking */
1044 	if (cstate == NULL)
1045 		cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1046 
1047 	cstate->is_copy_from = is_from;
1048 
1049 	cstate->file_encoding = -1;
1050 
1051 	/* Extract options from the statement node tree */
1052 	foreach(option, options)
1053 	{
1054 		DefElem    *defel = lfirst_node(DefElem, option);
1055 
1056 		if (strcmp(defel->defname, "format") == 0)
1057 		{
1058 			char	   *fmt = defGetString(defel);
1059 
1060 			if (format_specified)
1061 				ereport(ERROR,
1062 						(errcode(ERRCODE_SYNTAX_ERROR),
1063 						 errmsg("conflicting or redundant options"),
1064 						 parser_errposition(pstate, defel->location)));
1065 			format_specified = true;
1066 			if (strcmp(fmt, "text") == 0)
1067 				 /* default format */ ;
1068 			else if (strcmp(fmt, "csv") == 0)
1069 				cstate->csv_mode = true;
1070 			else if (strcmp(fmt, "binary") == 0)
1071 				cstate->binary = true;
1072 			else
1073 				ereport(ERROR,
1074 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1075 						 errmsg("COPY format \"%s\" not recognized", fmt),
1076 						 parser_errposition(pstate, defel->location)));
1077 		}
1078 		else if (strcmp(defel->defname, "oids") == 0)
1079 		{
1080 			if (cstate->oids)
1081 				ereport(ERROR,
1082 						(errcode(ERRCODE_SYNTAX_ERROR),
1083 						 errmsg("conflicting or redundant options"),
1084 						 parser_errposition(pstate, defel->location)));
1085 			cstate->oids = defGetBoolean(defel);
1086 		}
1087 		else if (strcmp(defel->defname, "freeze") == 0)
1088 		{
1089 			if (cstate->freeze)
1090 				ereport(ERROR,
1091 						(errcode(ERRCODE_SYNTAX_ERROR),
1092 						 errmsg("conflicting or redundant options"),
1093 						 parser_errposition(pstate, defel->location)));
1094 			cstate->freeze = defGetBoolean(defel);
1095 		}
1096 		else if (strcmp(defel->defname, "delimiter") == 0)
1097 		{
1098 			if (cstate->delim)
1099 				ereport(ERROR,
1100 						(errcode(ERRCODE_SYNTAX_ERROR),
1101 						 errmsg("conflicting or redundant options"),
1102 						 parser_errposition(pstate, defel->location)));
1103 			cstate->delim = defGetString(defel);
1104 		}
1105 		else if (strcmp(defel->defname, "null") == 0)
1106 		{
1107 			if (cstate->null_print)
1108 				ereport(ERROR,
1109 						(errcode(ERRCODE_SYNTAX_ERROR),
1110 						 errmsg("conflicting or redundant options"),
1111 						 parser_errposition(pstate, defel->location)));
1112 			cstate->null_print = defGetString(defel);
1113 		}
1114 		else if (strcmp(defel->defname, "header") == 0)
1115 		{
1116 			if (cstate->header_line)
1117 				ereport(ERROR,
1118 						(errcode(ERRCODE_SYNTAX_ERROR),
1119 						 errmsg("conflicting or redundant options"),
1120 						 parser_errposition(pstate, defel->location)));
1121 			cstate->header_line = defGetBoolean(defel);
1122 		}
1123 		else if (strcmp(defel->defname, "quote") == 0)
1124 		{
1125 			if (cstate->quote)
1126 				ereport(ERROR,
1127 						(errcode(ERRCODE_SYNTAX_ERROR),
1128 						 errmsg("conflicting or redundant options"),
1129 						 parser_errposition(pstate, defel->location)));
1130 			cstate->quote = defGetString(defel);
1131 		}
1132 		else if (strcmp(defel->defname, "escape") == 0)
1133 		{
1134 			if (cstate->escape)
1135 				ereport(ERROR,
1136 						(errcode(ERRCODE_SYNTAX_ERROR),
1137 						 errmsg("conflicting or redundant options"),
1138 						 parser_errposition(pstate, defel->location)));
1139 			cstate->escape = defGetString(defel);
1140 		}
1141 		else if (strcmp(defel->defname, "force_quote") == 0)
1142 		{
1143 			if (cstate->force_quote || cstate->force_quote_all)
1144 				ereport(ERROR,
1145 						(errcode(ERRCODE_SYNTAX_ERROR),
1146 						 errmsg("conflicting or redundant options"),
1147 						 parser_errposition(pstate, defel->location)));
1148 			if (defel->arg && IsA(defel->arg, A_Star))
1149 				cstate->force_quote_all = true;
1150 			else if (defel->arg && IsA(defel->arg, List))
1151 				cstate->force_quote = castNode(List, defel->arg);
1152 			else
1153 				ereport(ERROR,
1154 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1155 						 errmsg("argument to option \"%s\" must be a list of column names",
1156 								defel->defname),
1157 						 parser_errposition(pstate, defel->location)));
1158 		}
1159 		else if (strcmp(defel->defname, "force_not_null") == 0)
1160 		{
1161 			if (cstate->force_notnull)
1162 				ereport(ERROR,
1163 						(errcode(ERRCODE_SYNTAX_ERROR),
1164 						 errmsg("conflicting or redundant options"),
1165 						 parser_errposition(pstate, defel->location)));
1166 			if (defel->arg && IsA(defel->arg, List))
1167 				cstate->force_notnull = castNode(List, defel->arg);
1168 			else
1169 				ereport(ERROR,
1170 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1171 						 errmsg("argument to option \"%s\" must be a list of column names",
1172 								defel->defname),
1173 						 parser_errposition(pstate, defel->location)));
1174 		}
1175 		else if (strcmp(defel->defname, "force_null") == 0)
1176 		{
1177 			if (cstate->force_null)
1178 				ereport(ERROR,
1179 						(errcode(ERRCODE_SYNTAX_ERROR),
1180 						 errmsg("conflicting or redundant options")));
1181 			if (defel->arg && IsA(defel->arg, List))
1182 				cstate->force_null = castNode(List, defel->arg);
1183 			else
1184 				ereport(ERROR,
1185 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1186 						 errmsg("argument to option \"%s\" must be a list of column names",
1187 								defel->defname),
1188 						 parser_errposition(pstate, defel->location)));
1189 		}
1190 		else if (strcmp(defel->defname, "convert_selectively") == 0)
1191 		{
1192 			/*
1193 			 * Undocumented, not-accessible-from-SQL option: convert only the
1194 			 * named columns to binary form, storing the rest as NULLs. It's
1195 			 * allowed for the column list to be NIL.
1196 			 */
1197 			if (cstate->convert_selectively)
1198 				ereport(ERROR,
1199 						(errcode(ERRCODE_SYNTAX_ERROR),
1200 						 errmsg("conflicting or redundant options"),
1201 						 parser_errposition(pstate, defel->location)));
1202 			cstate->convert_selectively = true;
1203 			if (defel->arg == NULL || IsA(defel->arg, List))
1204 				cstate->convert_select = castNode(List, defel->arg);
1205 			else
1206 				ereport(ERROR,
1207 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1208 						 errmsg("argument to option \"%s\" must be a list of column names",
1209 								defel->defname),
1210 						 parser_errposition(pstate, defel->location)));
1211 		}
1212 		else if (strcmp(defel->defname, "encoding") == 0)
1213 		{
1214 			if (cstate->file_encoding >= 0)
1215 				ereport(ERROR,
1216 						(errcode(ERRCODE_SYNTAX_ERROR),
1217 						 errmsg("conflicting or redundant options"),
1218 						 parser_errposition(pstate, defel->location)));
1219 			cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1220 			if (cstate->file_encoding < 0)
1221 				ereport(ERROR,
1222 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1223 						 errmsg("argument to option \"%s\" must be a valid encoding name",
1224 								defel->defname),
1225 						 parser_errposition(pstate, defel->location)));
1226 		}
1227 		else
1228 			ereport(ERROR,
1229 					(errcode(ERRCODE_SYNTAX_ERROR),
1230 					 errmsg("option \"%s\" not recognized",
1231 							defel->defname),
1232 					 parser_errposition(pstate, defel->location)));
1233 	}
1234 
1235 	/*
1236 	 * Check for incompatible options (must do these two before inserting
1237 	 * defaults)
1238 	 */
1239 	if (cstate->binary && cstate->delim)
1240 		ereport(ERROR,
1241 				(errcode(ERRCODE_SYNTAX_ERROR),
1242 				 errmsg("cannot specify DELIMITER in BINARY mode")));
1243 
1244 	if (cstate->binary && cstate->null_print)
1245 		ereport(ERROR,
1246 				(errcode(ERRCODE_SYNTAX_ERROR),
1247 				 errmsg("cannot specify NULL in BINARY mode")));
1248 
1249 	/* Set defaults for omitted options */
1250 	if (!cstate->delim)
1251 		cstate->delim = cstate->csv_mode ? "," : "\t";
1252 
1253 	if (!cstate->null_print)
1254 		cstate->null_print = cstate->csv_mode ? "" : "\\N";
1255 	cstate->null_print_len = strlen(cstate->null_print);
1256 
1257 	if (cstate->csv_mode)
1258 	{
1259 		if (!cstate->quote)
1260 			cstate->quote = "\"";
1261 		if (!cstate->escape)
1262 			cstate->escape = cstate->quote;
1263 	}
1264 
1265 	/* Only single-byte delimiter strings are supported. */
1266 	if (strlen(cstate->delim) != 1)
1267 		ereport(ERROR,
1268 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1269 				 errmsg("COPY delimiter must be a single one-byte character")));
1270 
1271 	/* Disallow end-of-line characters */
1272 	if (strchr(cstate->delim, '\r') != NULL ||
1273 		strchr(cstate->delim, '\n') != NULL)
1274 		ereport(ERROR,
1275 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1276 				 errmsg("COPY delimiter cannot be newline or carriage return")));
1277 
1278 	if (strchr(cstate->null_print, '\r') != NULL ||
1279 		strchr(cstate->null_print, '\n') != NULL)
1280 		ereport(ERROR,
1281 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1282 				 errmsg("COPY null representation cannot use newline or carriage return")));
1283 
1284 	/*
1285 	 * Disallow unsafe delimiter characters in non-CSV mode.  We can't allow
1286 	 * backslash because it would be ambiguous.  We can't allow the other
1287 	 * cases because data characters matching the delimiter must be
1288 	 * backslashed, and certain backslash combinations are interpreted
1289 	 * non-literally by COPY IN.  Disallowing all lower case ASCII letters is
1290 	 * more than strictly necessary, but seems best for consistency and
1291 	 * future-proofing.  Likewise we disallow all digits though only octal
1292 	 * digits are actually dangerous.
1293 	 */
1294 	if (!cstate->csv_mode &&
1295 		strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1296 			   cstate->delim[0]) != NULL)
1297 		ereport(ERROR,
1298 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1299 				 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1300 
1301 	/* Check header */
1302 	if (!cstate->csv_mode && cstate->header_line)
1303 		ereport(ERROR,
1304 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1305 				 errmsg("COPY HEADER available only in CSV mode")));
1306 
1307 	/* Check quote */
1308 	if (!cstate->csv_mode && cstate->quote != NULL)
1309 		ereport(ERROR,
1310 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1311 				 errmsg("COPY quote available only in CSV mode")));
1312 
1313 	if (cstate->csv_mode && strlen(cstate->quote) != 1)
1314 		ereport(ERROR,
1315 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1316 				 errmsg("COPY quote must be a single one-byte character")));
1317 
1318 	if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1319 		ereport(ERROR,
1320 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1321 				 errmsg("COPY delimiter and quote must be different")));
1322 
1323 	/* Check escape */
1324 	if (!cstate->csv_mode && cstate->escape != NULL)
1325 		ereport(ERROR,
1326 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 				 errmsg("COPY escape available only in CSV mode")));
1328 
1329 	if (cstate->csv_mode && strlen(cstate->escape) != 1)
1330 		ereport(ERROR,
1331 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1332 				 errmsg("COPY escape must be a single one-byte character")));
1333 
1334 	/* Check force_quote */
1335 	if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1336 		ereport(ERROR,
1337 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338 				 errmsg("COPY force quote available only in CSV mode")));
1339 	if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1340 		ereport(ERROR,
1341 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 				 errmsg("COPY force quote only available using COPY TO")));
1343 
1344 	/* Check force_notnull */
1345 	if (!cstate->csv_mode && cstate->force_notnull != NIL)
1346 		ereport(ERROR,
1347 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 				 errmsg("COPY force not null available only in CSV mode")));
1349 	if (cstate->force_notnull != NIL && !is_from)
1350 		ereport(ERROR,
1351 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1352 				 errmsg("COPY force not null only available using COPY FROM")));
1353 
1354 	/* Check force_null */
1355 	if (!cstate->csv_mode && cstate->force_null != NIL)
1356 		ereport(ERROR,
1357 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1358 				 errmsg("COPY force null available only in CSV mode")));
1359 
1360 	if (cstate->force_null != NIL && !is_from)
1361 		ereport(ERROR,
1362 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1363 				 errmsg("COPY force null only available using COPY FROM")));
1364 
1365 	/* Don't allow the delimiter to appear in the null string. */
1366 	if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1367 		ereport(ERROR,
1368 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1369 				 errmsg("COPY delimiter must not appear in the NULL specification")));
1370 
1371 	/* Don't allow the CSV quote char to appear in the null string. */
1372 	if (cstate->csv_mode &&
1373 		strchr(cstate->null_print, cstate->quote[0]) != NULL)
1374 		ereport(ERROR,
1375 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1376 				 errmsg("CSV quote character must not appear in the NULL specification")));
1377 }
1378 
1379 /*
1380  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1381  *
1382  * Iff <binary>, unload or reload in the binary format, as opposed to the
1383  * more wasteful but more robust and portable text format.
1384  *
1385  * Iff <oids>, unload or reload the format that includes OID information.
1386  * On input, we accept OIDs whether or not the table has an OID column,
1387  * but silently drop them if it does not.  On output, we report an error
1388  * if the user asks for OIDs in a table that has none (not providing an
1389  * OID column might seem friendlier, but could seriously confuse programs).
1390  *
1391  * If in the text format, delimit columns with delimiter <delim> and print
1392  * NULL values as <null_print>.
1393  */
1394 static CopyState
BeginCopy(ParseState * pstate,bool is_from,Relation rel,RawStmt * raw_query,Oid queryRelId,List * attnamelist,List * options)1395 BeginCopy(ParseState *pstate,
1396 		  bool is_from,
1397 		  Relation rel,
1398 		  RawStmt *raw_query,
1399 		  Oid queryRelId,
1400 		  List *attnamelist,
1401 		  List *options)
1402 {
1403 	CopyState	cstate;
1404 	TupleDesc	tupDesc;
1405 	int			num_phys_attrs;
1406 	MemoryContext oldcontext;
1407 
1408 	/* Allocate workspace and zero all fields */
1409 	cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1410 
1411 	/*
1412 	 * We allocate everything used by a cstate in a new memory context. This
1413 	 * avoids memory leaks during repeated use of COPY in a query.
1414 	 */
1415 	cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1416 												"COPY",
1417 												ALLOCSET_DEFAULT_SIZES);
1418 
1419 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1420 
1421 	/* Extract options from the statement node tree */
1422 	ProcessCopyOptions(pstate, cstate, is_from, options);
1423 
1424 	/* Process the source/target relation or query */
1425 	if (rel)
1426 	{
1427 		Assert(!raw_query);
1428 
1429 		cstate->rel = rel;
1430 
1431 		tupDesc = RelationGetDescr(cstate->rel);
1432 
1433 		/* Don't allow COPY w/ OIDs to or from a table without them */
1434 		if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1435 			ereport(ERROR,
1436 					(errcode(ERRCODE_UNDEFINED_COLUMN),
1437 					 errmsg("table \"%s\" does not have OIDs",
1438 							RelationGetRelationName(cstate->rel))));
1439 	}
1440 	else
1441 	{
1442 		List	   *rewritten;
1443 		Query	   *query;
1444 		PlannedStmt *plan;
1445 		DestReceiver *dest;
1446 
1447 		Assert(!is_from);
1448 		cstate->rel = NULL;
1449 
1450 		/* Don't allow COPY w/ OIDs from a query */
1451 		if (cstate->oids)
1452 			ereport(ERROR,
1453 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1454 					 errmsg("COPY (query) WITH OIDS is not supported")));
1455 
1456 		/*
1457 		 * Run parse analysis and rewrite.  Note this also acquires sufficient
1458 		 * locks on the source table(s).
1459 		 *
1460 		 * Because the parser and planner tend to scribble on their input, we
1461 		 * make a preliminary copy of the source querytree.  This prevents
1462 		 * problems in the case that the COPY is in a portal or plpgsql
1463 		 * function and is executed repeatedly.  (See also the same hack in
1464 		 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
1465 		 */
1466 		rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1467 										   pstate->p_sourcetext, NULL, 0,
1468 										   NULL);
1469 
1470 		/* check that we got back something we can work with */
1471 		if (rewritten == NIL)
1472 		{
1473 			ereport(ERROR,
1474 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1475 					 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1476 		}
1477 		else if (list_length(rewritten) > 1)
1478 		{
1479 			ListCell   *lc;
1480 
1481 			/* examine queries to determine which error message to issue */
1482 			foreach(lc, rewritten)
1483 			{
1484 				Query	   *q = lfirst_node(Query, lc);
1485 
1486 				if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1487 					ereport(ERROR,
1488 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1489 							 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1490 				if (q->querySource == QSRC_NON_INSTEAD_RULE)
1491 					ereport(ERROR,
1492 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1493 							 errmsg("DO ALSO rules are not supported for the COPY")));
1494 			}
1495 
1496 			ereport(ERROR,
1497 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498 					 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1499 		}
1500 
1501 		query = linitial_node(Query, rewritten);
1502 
1503 		/* The grammar allows SELECT INTO, but we don't support that */
1504 		if (query->utilityStmt != NULL &&
1505 			IsA(query->utilityStmt, CreateTableAsStmt))
1506 			ereport(ERROR,
1507 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1508 					 errmsg("COPY (SELECT INTO) is not supported")));
1509 
1510 		Assert(query->utilityStmt == NULL);
1511 
1512 		/*
1513 		 * Similarly the grammar doesn't enforce the presence of a RETURNING
1514 		 * clause, but this is required here.
1515 		 */
1516 		if (query->commandType != CMD_SELECT &&
1517 			query->returningList == NIL)
1518 		{
1519 			Assert(query->commandType == CMD_INSERT ||
1520 				   query->commandType == CMD_UPDATE ||
1521 				   query->commandType == CMD_DELETE);
1522 
1523 			ereport(ERROR,
1524 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1525 					 errmsg("COPY query must have a RETURNING clause")));
1526 		}
1527 
1528 		/* plan the query */
1529 		plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1530 
1531 		/*
1532 		 * With row level security and a user using "COPY relation TO", we
1533 		 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1534 		 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1535 		 * in any RLS clauses.
1536 		 *
1537 		 * When this happens, we are passed in the relid of the originally
1538 		 * found relation (which we have locked).  As the planner will look up
1539 		 * the relation again, we double-check here to make sure it found the
1540 		 * same one that we have locked.
1541 		 */
1542 		if (queryRelId != InvalidOid)
1543 		{
1544 			/*
1545 			 * Note that with RLS involved there may be multiple relations,
1546 			 * and while the one we need is almost certainly first, we don't
1547 			 * make any guarantees of that in the planner, so check the whole
1548 			 * list and make sure we find the original relation.
1549 			 */
1550 			if (!list_member_oid(plan->relationOids, queryRelId))
1551 				ereport(ERROR,
1552 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1553 						 errmsg("relation referenced by COPY statement has changed")));
1554 		}
1555 
1556 		/*
1557 		 * Use a snapshot with an updated command ID to ensure this query sees
1558 		 * results of any previously executed queries.
1559 		 */
1560 		PushCopiedSnapshot(GetActiveSnapshot());
1561 		UpdateActiveSnapshotCommandId();
1562 
1563 		/* Create dest receiver for COPY OUT */
1564 		dest = CreateDestReceiver(DestCopyOut);
1565 		((DR_copy *) dest)->cstate = cstate;
1566 
1567 		/* Create a QueryDesc requesting no output */
1568 		cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1569 											GetActiveSnapshot(),
1570 											InvalidSnapshot,
1571 											dest, NULL, NULL, 0);
1572 
1573 		/*
1574 		 * Call ExecutorStart to prepare the plan for execution.
1575 		 *
1576 		 * ExecutorStart computes a result tupdesc for us
1577 		 */
1578 		ExecutorStart(cstate->queryDesc, 0);
1579 
1580 		tupDesc = cstate->queryDesc->tupDesc;
1581 	}
1582 
1583 	/* Generate or convert list of attributes to process */
1584 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1585 
1586 	num_phys_attrs = tupDesc->natts;
1587 
1588 	/* Convert FORCE_QUOTE name list to per-column flags, check validity */
1589 	cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1590 	if (cstate->force_quote_all)
1591 	{
1592 		int			i;
1593 
1594 		for (i = 0; i < num_phys_attrs; i++)
1595 			cstate->force_quote_flags[i] = true;
1596 	}
1597 	else if (cstate->force_quote)
1598 	{
1599 		List	   *attnums;
1600 		ListCell   *cur;
1601 
1602 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1603 
1604 		foreach(cur, attnums)
1605 		{
1606 			int			attnum = lfirst_int(cur);
1607 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1608 
1609 			if (!list_member_int(cstate->attnumlist, attnum))
1610 				ereport(ERROR,
1611 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1612 						 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1613 								NameStr(attr->attname))));
1614 			cstate->force_quote_flags[attnum - 1] = true;
1615 		}
1616 	}
1617 
1618 	/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1619 	cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1620 	if (cstate->force_notnull)
1621 	{
1622 		List	   *attnums;
1623 		ListCell   *cur;
1624 
1625 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1626 
1627 		foreach(cur, attnums)
1628 		{
1629 			int			attnum = lfirst_int(cur);
1630 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1631 
1632 			if (!list_member_int(cstate->attnumlist, attnum))
1633 				ereport(ERROR,
1634 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1635 						 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1636 								NameStr(attr->attname))));
1637 			cstate->force_notnull_flags[attnum - 1] = true;
1638 		}
1639 	}
1640 
1641 	/* Convert FORCE_NULL name list to per-column flags, check validity */
1642 	cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1643 	if (cstate->force_null)
1644 	{
1645 		List	   *attnums;
1646 		ListCell   *cur;
1647 
1648 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1649 
1650 		foreach(cur, attnums)
1651 		{
1652 			int			attnum = lfirst_int(cur);
1653 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1654 
1655 			if (!list_member_int(cstate->attnumlist, attnum))
1656 				ereport(ERROR,
1657 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1658 						 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1659 								NameStr(attr->attname))));
1660 			cstate->force_null_flags[attnum - 1] = true;
1661 		}
1662 	}
1663 
1664 	/* Convert convert_selectively name list to per-column flags */
1665 	if (cstate->convert_selectively)
1666 	{
1667 		List	   *attnums;
1668 		ListCell   *cur;
1669 
1670 		cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1671 
1672 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1673 
1674 		foreach(cur, attnums)
1675 		{
1676 			int			attnum = lfirst_int(cur);
1677 			Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1678 
1679 			if (!list_member_int(cstate->attnumlist, attnum))
1680 				ereport(ERROR,
1681 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1682 						 errmsg_internal("selected column \"%s\" not referenced by COPY",
1683 										 NameStr(attr->attname))));
1684 			cstate->convert_select_flags[attnum - 1] = true;
1685 		}
1686 	}
1687 
1688 	/* Use client encoding when ENCODING option is not specified. */
1689 	if (cstate->file_encoding < 0)
1690 		cstate->file_encoding = pg_get_client_encoding();
1691 
1692 	/*
1693 	 * Set up encoding conversion info.  Even if the file and server encodings
1694 	 * are the same, we must apply pg_any_to_server() to validate data in
1695 	 * multibyte encodings.
1696 	 */
1697 	cstate->need_transcoding =
1698 		(cstate->file_encoding != GetDatabaseEncoding() ||
1699 		 pg_database_encoding_max_length() > 1);
1700 	/* See Multibyte encoding comment above */
1701 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1702 
1703 	cstate->copy_dest = COPY_FILE;	/* default */
1704 
1705 	MemoryContextSwitchTo(oldcontext);
1706 
1707 	return cstate;
1708 }
1709 
1710 /*
1711  * Closes the pipe to an external program, checking the pclose() return code.
1712  */
1713 static void
ClosePipeToProgram(CopyState cstate)1714 ClosePipeToProgram(CopyState cstate)
1715 {
1716 	int			pclose_rc;
1717 
1718 	Assert(cstate->is_program);
1719 
1720 	pclose_rc = ClosePipeStream(cstate->copy_file);
1721 	if (pclose_rc == -1)
1722 		ereport(ERROR,
1723 				(errcode_for_file_access(),
1724 				 errmsg("could not close pipe to external command: %m")));
1725 	else if (pclose_rc != 0)
1726 	{
1727 		/*
1728 		 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1729 		 * expectable for the called program to fail with SIGPIPE, and we
1730 		 * should not report that as an error.  Otherwise, SIGPIPE indicates a
1731 		 * problem.
1732 		 */
1733 		if (cstate->is_copy_from && !cstate->reached_eof &&
1734 			wait_result_is_signal(pclose_rc, SIGPIPE))
1735 			return;
1736 
1737 		ereport(ERROR,
1738 				(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1739 				 errmsg("program \"%s\" failed",
1740 						cstate->filename),
1741 				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1742 	}
1743 }
1744 
1745 /*
1746  * Release resources allocated in a cstate for COPY TO/FROM.
1747  */
1748 static void
EndCopy(CopyState cstate)1749 EndCopy(CopyState cstate)
1750 {
1751 	if (cstate->is_program)
1752 	{
1753 		ClosePipeToProgram(cstate);
1754 	}
1755 	else
1756 	{
1757 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1758 			ereport(ERROR,
1759 					(errcode_for_file_access(),
1760 					 errmsg("could not close file \"%s\": %m",
1761 							cstate->filename)));
1762 	}
1763 
1764 	MemoryContextDelete(cstate->copycontext);
1765 	pfree(cstate);
1766 }
1767 
1768 /*
1769  * Setup CopyState to read tuples from a table or a query for COPY TO.
1770  */
1771 static CopyState
BeginCopyTo(ParseState * pstate,Relation rel,RawStmt * query,Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1772 BeginCopyTo(ParseState *pstate,
1773 			Relation rel,
1774 			RawStmt *query,
1775 			Oid queryRelId,
1776 			const char *filename,
1777 			bool is_program,
1778 			List *attnamelist,
1779 			List *options)
1780 {
1781 	CopyState	cstate;
1782 	bool		pipe = (filename == NULL);
1783 	MemoryContext oldcontext;
1784 
1785 	if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1786 	{
1787 		if (rel->rd_rel->relkind == RELKIND_VIEW)
1788 			ereport(ERROR,
1789 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1790 					 errmsg("cannot copy from view \"%s\"",
1791 							RelationGetRelationName(rel)),
1792 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1793 		else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1794 			ereport(ERROR,
1795 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1796 					 errmsg("cannot copy from materialized view \"%s\"",
1797 							RelationGetRelationName(rel)),
1798 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1799 		else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1800 			ereport(ERROR,
1801 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1802 					 errmsg("cannot copy from foreign table \"%s\"",
1803 							RelationGetRelationName(rel)),
1804 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1805 		else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1806 			ereport(ERROR,
1807 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1808 					 errmsg("cannot copy from sequence \"%s\"",
1809 							RelationGetRelationName(rel))));
1810 		else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1811 			ereport(ERROR,
1812 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1813 					 errmsg("cannot copy from partitioned table \"%s\"",
1814 							RelationGetRelationName(rel)),
1815 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1816 		else
1817 			ereport(ERROR,
1818 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1819 					 errmsg("cannot copy from non-table relation \"%s\"",
1820 							RelationGetRelationName(rel))));
1821 	}
1822 
1823 	cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1824 					   options);
1825 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1826 
1827 	if (pipe)
1828 	{
1829 		Assert(!is_program);	/* the grammar does not allow this */
1830 		if (whereToSendOutput != DestRemote)
1831 			cstate->copy_file = stdout;
1832 	}
1833 	else
1834 	{
1835 		cstate->filename = pstrdup(filename);
1836 		cstate->is_program = is_program;
1837 
1838 		if (is_program)
1839 		{
1840 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1841 			if (cstate->copy_file == NULL)
1842 				ereport(ERROR,
1843 						(errcode_for_file_access(),
1844 						 errmsg("could not execute command \"%s\": %m",
1845 								cstate->filename)));
1846 		}
1847 		else
1848 		{
1849 			mode_t		oumask; /* Pre-existing umask value */
1850 			struct stat st;
1851 
1852 			/*
1853 			 * Prevent write to relative path ... too easy to shoot oneself in
1854 			 * the foot by overwriting a database file ...
1855 			 */
1856 			if (!is_absolute_path(filename))
1857 				ereport(ERROR,
1858 						(errcode(ERRCODE_INVALID_NAME),
1859 						 errmsg("relative path not allowed for COPY to file")));
1860 
1861 			oumask = umask(S_IWGRP | S_IWOTH);
1862 			PG_TRY();
1863 			{
1864 				cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1865 			}
1866 			PG_CATCH();
1867 			{
1868 				umask(oumask);
1869 				PG_RE_THROW();
1870 			}
1871 			PG_END_TRY();
1872 			umask(oumask);
1873 			if (cstate->copy_file == NULL)
1874 			{
1875 				/* copy errno because ereport subfunctions might change it */
1876 				int			save_errno = errno;
1877 
1878 				ereport(ERROR,
1879 						(errcode_for_file_access(),
1880 						 errmsg("could not open file \"%s\" for writing: %m",
1881 								cstate->filename),
1882 						 (save_errno == ENOENT || save_errno == EACCES) ?
1883 						 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1884 								 "You may want a client-side facility such as psql's \\copy.") : 0));
1885 			}
1886 
1887 			if (fstat(fileno(cstate->copy_file), &st))
1888 				ereport(ERROR,
1889 						(errcode_for_file_access(),
1890 						 errmsg("could not stat file \"%s\": %m",
1891 								cstate->filename)));
1892 
1893 			if (S_ISDIR(st.st_mode))
1894 				ereport(ERROR,
1895 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1896 						 errmsg("\"%s\" is a directory", cstate->filename)));
1897 		}
1898 	}
1899 
1900 	MemoryContextSwitchTo(oldcontext);
1901 
1902 	return cstate;
1903 }
1904 
1905 /*
1906  * This intermediate routine exists mainly to localize the effects of setjmp
1907  * so we don't need to plaster a lot of variables with "volatile".
1908  */
1909 static uint64
DoCopyTo(CopyState cstate)1910 DoCopyTo(CopyState cstate)
1911 {
1912 	bool		pipe = (cstate->filename == NULL);
1913 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
1914 	uint64		processed;
1915 
1916 	PG_TRY();
1917 	{
1918 		if (fe_copy)
1919 			SendCopyBegin(cstate);
1920 
1921 		processed = CopyTo(cstate);
1922 
1923 		if (fe_copy)
1924 			SendCopyEnd(cstate);
1925 	}
1926 	PG_CATCH();
1927 	{
1928 		/*
1929 		 * Make sure we turn off old-style COPY OUT mode upon error. It is
1930 		 * okay to do this in all cases, since it does nothing if the mode is
1931 		 * not on.
1932 		 */
1933 		pq_endcopyout(true);
1934 		PG_RE_THROW();
1935 	}
1936 	PG_END_TRY();
1937 
1938 	return processed;
1939 }
1940 
1941 /*
1942  * Clean up storage and release resources for COPY TO.
1943  */
1944 static void
EndCopyTo(CopyState cstate)1945 EndCopyTo(CopyState cstate)
1946 {
1947 	if (cstate->queryDesc != NULL)
1948 	{
1949 		/* Close down the query and free resources. */
1950 		ExecutorFinish(cstate->queryDesc);
1951 		ExecutorEnd(cstate->queryDesc);
1952 		FreeQueryDesc(cstate->queryDesc);
1953 		PopActiveSnapshot();
1954 	}
1955 
1956 	/* Clean up storage */
1957 	EndCopy(cstate);
1958 }
1959 
1960 /*
1961  * Copy from relation or query TO file.
1962  */
1963 static uint64
CopyTo(CopyState cstate)1964 CopyTo(CopyState cstate)
1965 {
1966 	TupleDesc	tupDesc;
1967 	int			num_phys_attrs;
1968 	ListCell   *cur;
1969 	uint64		processed;
1970 
1971 	if (cstate->rel)
1972 		tupDesc = RelationGetDescr(cstate->rel);
1973 	else
1974 		tupDesc = cstate->queryDesc->tupDesc;
1975 	num_phys_attrs = tupDesc->natts;
1976 	cstate->null_print_client = cstate->null_print; /* default */
1977 
1978 	/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1979 	cstate->fe_msgbuf = makeStringInfo();
1980 
1981 	/* Get info about the columns we need to process. */
1982 	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1983 	foreach(cur, cstate->attnumlist)
1984 	{
1985 		int			attnum = lfirst_int(cur);
1986 		Oid			out_func_oid;
1987 		bool		isvarlena;
1988 		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1989 
1990 		if (cstate->binary)
1991 			getTypeBinaryOutputInfo(attr->atttypid,
1992 									&out_func_oid,
1993 									&isvarlena);
1994 		else
1995 			getTypeOutputInfo(attr->atttypid,
1996 							  &out_func_oid,
1997 							  &isvarlena);
1998 		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1999 	}
2000 
2001 	/*
2002 	 * Create a temporary memory context that we can reset once per row to
2003 	 * recover palloc'd memory.  This avoids any problems with leaks inside
2004 	 * datatype output routines, and should be faster than retail pfree's
2005 	 * anyway.  (We don't need a whole econtext as CopyFrom does.)
2006 	 */
2007 	cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
2008 											   "COPY TO",
2009 											   ALLOCSET_DEFAULT_SIZES);
2010 
2011 	if (cstate->binary)
2012 	{
2013 		/* Generate header for a binary copy */
2014 		int32		tmp;
2015 
2016 		/* Signature */
2017 		CopySendData(cstate, BinarySignature, 11);
2018 		/* Flags field */
2019 		tmp = 0;
2020 		if (cstate->oids)
2021 			tmp |= (1 << 16);
2022 		CopySendInt32(cstate, tmp);
2023 		/* No header extension */
2024 		tmp = 0;
2025 		CopySendInt32(cstate, tmp);
2026 	}
2027 	else
2028 	{
2029 		/*
2030 		 * For non-binary copy, we need to convert null_print to file
2031 		 * encoding, because it will be sent directly with CopySendString.
2032 		 */
2033 		if (cstate->need_transcoding)
2034 			cstate->null_print_client = pg_server_to_any(cstate->null_print,
2035 														 cstate->null_print_len,
2036 														 cstate->file_encoding);
2037 
2038 		/* if a header has been requested send the line */
2039 		if (cstate->header_line)
2040 		{
2041 			bool		hdr_delim = false;
2042 
2043 			foreach(cur, cstate->attnumlist)
2044 			{
2045 				int			attnum = lfirst_int(cur);
2046 				char	   *colname;
2047 
2048 				if (hdr_delim)
2049 					CopySendChar(cstate, cstate->delim[0]);
2050 				hdr_delim = true;
2051 
2052 				colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2053 
2054 				CopyAttributeOutCSV(cstate, colname, false,
2055 									list_length(cstate->attnumlist) == 1);
2056 			}
2057 
2058 			CopySendEndOfRow(cstate);
2059 		}
2060 	}
2061 
2062 	if (cstate->rel)
2063 	{
2064 		Datum	   *values;
2065 		bool	   *nulls;
2066 		HeapScanDesc scandesc;
2067 		HeapTuple	tuple;
2068 
2069 		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2070 		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2071 
2072 		scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2073 
2074 		processed = 0;
2075 		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2076 		{
2077 			CHECK_FOR_INTERRUPTS();
2078 
2079 			/* Deconstruct the tuple ... faster than repeated heap_getattr */
2080 			heap_deform_tuple(tuple, tupDesc, values, nulls);
2081 
2082 			/* Format and send the data */
2083 			CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2084 			processed++;
2085 		}
2086 
2087 		heap_endscan(scandesc);
2088 
2089 		pfree(values);
2090 		pfree(nulls);
2091 	}
2092 	else
2093 	{
2094 		/* run the plan --- the dest receiver will send tuples */
2095 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2096 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2097 	}
2098 
2099 	if (cstate->binary)
2100 	{
2101 		/* Generate trailer for a binary copy */
2102 		CopySendInt16(cstate, -1);
2103 		/* Need to flush out the trailer */
2104 		CopySendEndOfRow(cstate);
2105 	}
2106 
2107 	MemoryContextDelete(cstate->rowcontext);
2108 
2109 	return processed;
2110 }
2111 
2112 /*
2113  * Emit one row during CopyTo().
2114  */
2115 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2116 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2117 {
2118 	bool		need_delim = false;
2119 	FmgrInfo   *out_functions = cstate->out_functions;
2120 	MemoryContext oldcontext;
2121 	ListCell   *cur;
2122 	char	   *string;
2123 
2124 	MemoryContextReset(cstate->rowcontext);
2125 	oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2126 
2127 	if (cstate->binary)
2128 	{
2129 		/* Binary per-tuple header */
2130 		CopySendInt16(cstate, list_length(cstate->attnumlist));
2131 		/* Send OID if wanted --- note attnumlist doesn't include it */
2132 		if (cstate->oids)
2133 		{
2134 			/* Hack --- assume Oid is same size as int32 */
2135 			CopySendInt32(cstate, sizeof(int32));
2136 			CopySendInt32(cstate, tupleOid);
2137 		}
2138 	}
2139 	else
2140 	{
2141 		/* Text format has no per-tuple header, but send OID if wanted */
2142 		/* Assume digits don't need any quoting or encoding conversion */
2143 		if (cstate->oids)
2144 		{
2145 			string = DatumGetCString(DirectFunctionCall1(oidout,
2146 														 ObjectIdGetDatum(tupleOid)));
2147 			CopySendString(cstate, string);
2148 			need_delim = true;
2149 		}
2150 	}
2151 
2152 	foreach(cur, cstate->attnumlist)
2153 	{
2154 		int			attnum = lfirst_int(cur);
2155 		Datum		value = values[attnum - 1];
2156 		bool		isnull = nulls[attnum - 1];
2157 
2158 		if (!cstate->binary)
2159 		{
2160 			if (need_delim)
2161 				CopySendChar(cstate, cstate->delim[0]);
2162 			need_delim = true;
2163 		}
2164 
2165 		if (isnull)
2166 		{
2167 			if (!cstate->binary)
2168 				CopySendString(cstate, cstate->null_print_client);
2169 			else
2170 				CopySendInt32(cstate, -1);
2171 		}
2172 		else
2173 		{
2174 			if (!cstate->binary)
2175 			{
2176 				string = OutputFunctionCall(&out_functions[attnum - 1],
2177 											value);
2178 				if (cstate->csv_mode)
2179 					CopyAttributeOutCSV(cstate, string,
2180 										cstate->force_quote_flags[attnum - 1],
2181 										list_length(cstate->attnumlist) == 1);
2182 				else
2183 					CopyAttributeOutText(cstate, string);
2184 			}
2185 			else
2186 			{
2187 				bytea	   *outputbytes;
2188 
2189 				outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2190 											   value);
2191 				CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2192 				CopySendData(cstate, VARDATA(outputbytes),
2193 							 VARSIZE(outputbytes) - VARHDRSZ);
2194 			}
2195 		}
2196 	}
2197 
2198 	CopySendEndOfRow(cstate);
2199 
2200 	MemoryContextSwitchTo(oldcontext);
2201 }
2202 
2203 
2204 /*
2205  * error context callback for COPY FROM
2206  *
2207  * The argument for the error context must be CopyState.
2208  */
2209 void
CopyFromErrorCallback(void * arg)2210 CopyFromErrorCallback(void *arg)
2211 {
2212 	CopyState	cstate = (CopyState) arg;
2213 	char		curlineno_str[32];
2214 
2215 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2216 			 cstate->cur_lineno);
2217 
2218 	if (cstate->binary)
2219 	{
2220 		/* can't usefully display the data */
2221 		if (cstate->cur_attname)
2222 			errcontext("COPY %s, line %s, column %s",
2223 					   cstate->cur_relname, curlineno_str,
2224 					   cstate->cur_attname);
2225 		else
2226 			errcontext("COPY %s, line %s",
2227 					   cstate->cur_relname, curlineno_str);
2228 	}
2229 	else
2230 	{
2231 		if (cstate->cur_attname && cstate->cur_attval)
2232 		{
2233 			/* error is relevant to a particular column */
2234 			char	   *attval;
2235 
2236 			attval = limit_printout_length(cstate->cur_attval);
2237 			errcontext("COPY %s, line %s, column %s: \"%s\"",
2238 					   cstate->cur_relname, curlineno_str,
2239 					   cstate->cur_attname, attval);
2240 			pfree(attval);
2241 		}
2242 		else if (cstate->cur_attname)
2243 		{
2244 			/* error is relevant to a particular column, value is NULL */
2245 			errcontext("COPY %s, line %s, column %s: null input",
2246 					   cstate->cur_relname, curlineno_str,
2247 					   cstate->cur_attname);
2248 		}
2249 		else
2250 		{
2251 			/*
2252 			 * Error is relevant to a particular line.
2253 			 *
2254 			 * If line_buf still contains the correct line, and it's already
2255 			 * transcoded, print it. If it's still in a foreign encoding, it's
2256 			 * quite likely that the error is precisely a failure to do
2257 			 * encoding conversion (ie, bad data). We dare not try to convert
2258 			 * it, and at present there's no way to regurgitate it without
2259 			 * conversion. So we have to punt and just report the line number.
2260 			 */
2261 			if (cstate->line_buf_valid &&
2262 				(cstate->line_buf_converted || !cstate->need_transcoding))
2263 			{
2264 				char	   *lineval;
2265 
2266 				lineval = limit_printout_length(cstate->line_buf.data);
2267 				errcontext("COPY %s, line %s: \"%s\"",
2268 						   cstate->cur_relname, curlineno_str, lineval);
2269 				pfree(lineval);
2270 			}
2271 			else
2272 			{
2273 				errcontext("COPY %s, line %s",
2274 						   cstate->cur_relname, curlineno_str);
2275 			}
2276 		}
2277 	}
2278 }
2279 
2280 /*
2281  * Make sure we don't print an unreasonable amount of COPY data in a message.
2282  *
2283  * It would seem a lot easier to just use the sprintf "precision" limit to
2284  * truncate the string.  However, some versions of glibc have a bug/misfeature
2285  * that vsnprintf will always fail (return -1) if it is asked to truncate
2286  * a string that contains invalid byte sequences for the current encoding.
2287  * So, do our own truncation.  We return a pstrdup'd copy of the input.
2288  */
2289 static char *
limit_printout_length(const char * str)2290 limit_printout_length(const char *str)
2291 {
2292 #define MAX_COPY_DATA_DISPLAY 100
2293 
2294 	int			slen = strlen(str);
2295 	int			len;
2296 	char	   *res;
2297 
2298 	/* Fast path if definitely okay */
2299 	if (slen <= MAX_COPY_DATA_DISPLAY)
2300 		return pstrdup(str);
2301 
2302 	/* Apply encoding-dependent truncation */
2303 	len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2304 
2305 	/*
2306 	 * Truncate, and add "..." to show we truncated the input.
2307 	 */
2308 	res = (char *) palloc(len + 4);
2309 	memcpy(res, str, len);
2310 	strcpy(res + len, "...");
2311 
2312 	return res;
2313 }
2314 
2315 /*
2316  * Copy FROM file to relation.
2317  */
2318 uint64
CopyFrom(CopyState cstate)2319 CopyFrom(CopyState cstate)
2320 {
2321 	HeapTuple	tuple;
2322 	TupleDesc	tupDesc;
2323 	Datum	   *values;
2324 	bool	   *nulls;
2325 	ResultRelInfo *resultRelInfo;
2326 	ResultRelInfo *saved_resultRelInfo = NULL;
2327 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
2328 	ModifyTableState *mtstate;
2329 	ExprContext *econtext;
2330 	TupleTableSlot *myslot;
2331 	MemoryContext oldcontext = CurrentMemoryContext;
2332 
2333 	ErrorContextCallback errcallback;
2334 	CommandId	mycid = GetCurrentCommandId(true);
2335 	int			hi_options = 0; /* start with default heap_insert options */
2336 	BulkInsertState bistate;
2337 	uint64		processed = 0;
2338 	bool		useHeapMultiInsert;
2339 	int			nBufferedTuples = 0;
2340 	int			prev_leaf_part_index = -1;
2341 
2342 #define MAX_BUFFERED_TUPLES 1000
2343 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
2344 	Size		bufferedTuplesSize = 0;
2345 	uint64		firstBufferedLineNo = 0;
2346 
2347 	Assert(cstate->rel);
2348 
2349 	/*
2350 	 * The target must be a plain, foreign, or partitioned relation, or have
2351 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
2352 	 * allowed on views, so we only hint about them in the view case.)
2353 	 */
2354 	if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2355 		cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2356 		cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2357 		!(cstate->rel->trigdesc &&
2358 		  cstate->rel->trigdesc->trig_insert_instead_row))
2359 	{
2360 		if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2361 			ereport(ERROR,
2362 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2363 					 errmsg("cannot copy to view \"%s\"",
2364 							RelationGetRelationName(cstate->rel)),
2365 					 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2366 		else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2367 			ereport(ERROR,
2368 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2369 					 errmsg("cannot copy to materialized view \"%s\"",
2370 							RelationGetRelationName(cstate->rel))));
2371 		else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2372 			ereport(ERROR,
2373 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2374 					 errmsg("cannot copy to sequence \"%s\"",
2375 							RelationGetRelationName(cstate->rel))));
2376 		else
2377 			ereport(ERROR,
2378 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2379 					 errmsg("cannot copy to non-table relation \"%s\"",
2380 							RelationGetRelationName(cstate->rel))));
2381 	}
2382 
2383 	tupDesc = RelationGetDescr(cstate->rel);
2384 
2385 	/*----------
2386 	 * Check to see if we can avoid writing WAL
2387 	 *
2388 	 * If archive logging/streaming is not enabled *and* either
2389 	 *	- table was created in same transaction as this COPY
2390 	 *	- data is being written to relfilenode created in this transaction
2391 	 * then we can skip writing WAL.  It's safe because if the transaction
2392 	 * doesn't commit, we'll discard the table (or the new relfilenode file).
2393 	 * If it does commit, we'll have done the heap_sync at the bottom of this
2394 	 * routine first.
2395 	 *
2396 	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2397 	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2398 	 * can be cleared before the end of the transaction. The exact case is
2399 	 * when a relation sets a new relfilenode twice in same transaction, yet
2400 	 * the second one fails in an aborted subtransaction, e.g.
2401 	 *
2402 	 * BEGIN;
2403 	 * TRUNCATE t;
2404 	 * SAVEPOINT save;
2405 	 * TRUNCATE t;
2406 	 * ROLLBACK TO save;
2407 	 * COPY ...
2408 	 *
2409 	 * Also, if the target file is new-in-transaction, we assume that checking
2410 	 * FSM for free space is a waste of time, even if we must use WAL because
2411 	 * of archiving.  This could possibly be wrong, but it's unlikely.
2412 	 *
2413 	 * The comments for heap_insert and RelationGetBufferForTuple specify that
2414 	 * skipping WAL logging is only safe if we ensure that our tuples do not
2415 	 * go into pages containing tuples from any other transactions --- but this
2416 	 * must be the case if we have a new table or new relfilenode, so we need
2417 	 * no additional work to enforce that.
2418 	 *
2419 	 * We currently don't support this optimization if the COPY target is a
2420 	 * partitioned table as we currently only lazily initialize partition
2421 	 * information when routing the first tuple to the partition.  We cannot
2422 	 * know at this stage if we can perform this optimization.  It should be
2423 	 * possible to improve on this, but it does mean maintaining heap insert
2424 	 * option flags per partition and setting them when we first open the
2425 	 * partition.
2426 	 *
2427 	 * This optimization is not supported for relation types which do not
2428 	 * have any physical storage, with foreign tables and views using
2429 	 * INSTEAD OF triggers entering in this category.  Partitioned tables
2430 	 * are not supported as per the description above.
2431 	 *----------
2432 	 */
2433 	/* createSubid is creation check, newRelfilenodeSubid is truncation check */
2434 	if (cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2435 		cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2436 		cstate->rel->rd_rel->relkind != RELKIND_VIEW &&
2437 		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2438 		 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
2439 	{
2440 		hi_options |= HEAP_INSERT_SKIP_FSM;
2441 		if (!XLogIsNeeded())
2442 			hi_options |= HEAP_INSERT_SKIP_WAL;
2443 	}
2444 
2445 	/*
2446 	 * Optimize if new relfilenode was created in this subxact or one of its
2447 	 * committed children and we won't see those rows later as part of an
2448 	 * earlier scan or command. The subxact test ensures that if this subxact
2449 	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
2450 	 * that the stronger test of exactly which subtransaction created it is
2451 	 * crucial for correctness of this optimization. The test for an earlier
2452 	 * scan or command tolerates false negatives. FREEZE causes other sessions
2453 	 * to see rows they would not see under MVCC, and a false negative merely
2454 	 * spreads that anomaly to the current session.
2455 	 */
2456 	if (cstate->freeze)
2457 	{
2458 		/*
2459 		 * We currently disallow COPY FREEZE on partitioned tables.  The
2460 		 * reason for this is that we've simply not yet opened the partitions
2461 		 * to determine if the optimization can be applied to them.  We could
2462 		 * go and open them all here, but doing so may be quite a costly
2463 		 * overhead for small copies.  In any case, we may just end up routing
2464 		 * tuples to a small number of partitions.  It seems better just to
2465 		 * raise an ERROR for partitioned tables.
2466 		 */
2467 		if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2468 		{
2469 			ereport(ERROR,
2470 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2471 					errmsg("cannot perform FREEZE on a partitioned table")));
2472 		}
2473 
2474 		/*
2475 		 * Tolerate one registration for the benefit of FirstXactSnapshot.
2476 		 * Scan-bearing queries generally create at least two registrations,
2477 		 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2478 		 * Clear CatalogSnapshot to avoid counting its registration.  We'll
2479 		 * still detect ongoing catalog scans, each of which separately
2480 		 * registers the snapshot it uses.
2481 		 */
2482 		InvalidateCatalogSnapshot();
2483 		if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2484 			ereport(ERROR,
2485 					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2486 					 errmsg("cannot perform FREEZE because of prior transaction activity")));
2487 
2488 		if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2489 			cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2490 			ereport(ERROR,
2491 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2492 					 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2493 
2494 		hi_options |= HEAP_INSERT_FROZEN;
2495 	}
2496 
2497 	/*
2498 	 * We need a ResultRelInfo so we can use the regular executor's
2499 	 * index-entry-making machinery.  (There used to be a huge amount of code
2500 	 * here that basically duplicated execUtils.c ...)
2501 	 */
2502 	resultRelInfo = makeNode(ResultRelInfo);
2503 	InitResultRelInfo(resultRelInfo,
2504 					  cstate->rel,
2505 					  1,		/* dummy rangetable index */
2506 					  NULL,
2507 					  0);
2508 
2509 	/* Verify the named relation is a valid target for INSERT */
2510 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
2511 
2512 	ExecOpenIndices(resultRelInfo, false);
2513 
2514 	estate->es_result_relations = resultRelInfo;
2515 	estate->es_num_result_relations = 1;
2516 	estate->es_result_relation_info = resultRelInfo;
2517 	estate->es_range_table = cstate->range_table;
2518 
2519 	/* Set up a tuple slot too */
2520 	myslot = ExecInitExtraTupleSlot(estate, tupDesc);
2521 	/* Triggers might need a slot as well */
2522 	estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
2523 
2524 	/*
2525 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
2526 	 * foreign-table result relation(s).
2527 	 */
2528 	mtstate = makeNode(ModifyTableState);
2529 	mtstate->ps.plan = NULL;
2530 	mtstate->ps.state = estate;
2531 	mtstate->operation = CMD_INSERT;
2532 	mtstate->resultRelInfo = estate->es_result_relations;
2533 	mtstate->rootResultRelInfo = estate->es_result_relations;
2534 
2535 	if (resultRelInfo->ri_FdwRoutine != NULL &&
2536 		resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
2537 		resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
2538 														 resultRelInfo);
2539 
2540 	/* Prepare to catch AFTER triggers. */
2541 	AfterTriggerBeginQuery();
2542 
2543 	/*
2544 	 * If there are any triggers with transition tables on the named relation,
2545 	 * we need to be prepared to capture transition tuples.
2546 	 */
2547 	cstate->transition_capture =
2548 		MakeTransitionCaptureState(cstate->rel->trigdesc,
2549 								   RelationGetRelid(cstate->rel),
2550 								   CMD_INSERT);
2551 
2552 	/*
2553 	 * If the named relation is a partitioned table, initialize state for
2554 	 * CopyFrom tuple routing.
2555 	 */
2556 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2557 	{
2558 		PartitionTupleRouting *proute;
2559 
2560 		proute = cstate->partition_tuple_routing =
2561 			ExecSetupPartitionTupleRouting(NULL, resultRelInfo);
2562 
2563 		/*
2564 		 * If we are capturing transition tuples, they may need to be
2565 		 * converted from partition format back to partitioned table format
2566 		 * (this is only ever necessary if a BEFORE trigger modifies the
2567 		 * tuple).
2568 		 */
2569 		if (cstate->transition_capture != NULL)
2570 			ExecSetupChildParentMapForLeaf(proute);
2571 	}
2572 
2573 	/*
2574 	 * It's more efficient to prepare a bunch of tuples for insertion, and
2575 	 * insert them in one heap_multi_insert() call, than call heap_insert()
2576 	 * separately for every tuple. However, we can't do that if there are
2577 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2578 	 * expressions. Such triggers or expressions might query the table we're
2579 	 * inserting to, and act differently if the tuples that have already been
2580 	 * processed and prepared for insertion are not there.  We also can't do
2581 	 * it if the table is foreign or partitioned.
2582 	 */
2583 	if ((resultRelInfo->ri_TrigDesc != NULL &&
2584 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2585 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2586 		resultRelInfo->ri_FdwRoutine != NULL ||
2587 		cstate->partition_tuple_routing != NULL ||
2588 		cstate->volatile_defexprs)
2589 	{
2590 		useHeapMultiInsert = false;
2591 	}
2592 	else
2593 	{
2594 		useHeapMultiInsert = true;
2595 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2596 	}
2597 
2598 	/*
2599 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2600 	 * should do this for COPY, since it's not really an "INSERT" statement as
2601 	 * such. However, executing these triggers maintains consistency with the
2602 	 * EACH ROW triggers that we already fire on COPY.
2603 	 */
2604 	ExecBSInsertTriggers(estate, resultRelInfo);
2605 
2606 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2607 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2608 
2609 	bistate = GetBulkInsertState();
2610 	econtext = GetPerTupleExprContext(estate);
2611 
2612 	/* Set up callback to identify error line number */
2613 	errcallback.callback = CopyFromErrorCallback;
2614 	errcallback.arg = (void *) cstate;
2615 	errcallback.previous = error_context_stack;
2616 	error_context_stack = &errcallback;
2617 
2618 	for (;;)
2619 	{
2620 		TupleTableSlot *slot;
2621 		bool		skip_tuple;
2622 		Oid			loaded_oid = InvalidOid;
2623 
2624 		CHECK_FOR_INTERRUPTS();
2625 
2626 		if (nBufferedTuples == 0)
2627 		{
2628 			/*
2629 			 * Reset the per-tuple exprcontext. We can only do this if the
2630 			 * tuple buffer is empty. (Calling the context the per-tuple
2631 			 * memory context is a bit of a misnomer now.)
2632 			 */
2633 			ResetPerTupleExprContext(estate);
2634 		}
2635 
2636 		/* Switch into its memory context */
2637 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2638 
2639 		if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2640 			break;
2641 
2642 		/* And now we can form the input tuple. */
2643 		tuple = heap_form_tuple(tupDesc, values, nulls);
2644 
2645 		if (loaded_oid != InvalidOid)
2646 			HeapTupleSetOid(tuple, loaded_oid);
2647 
2648 		/*
2649 		 * Constraints might reference the tableoid column, so initialize
2650 		 * t_tableOid before evaluating them.
2651 		 */
2652 		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2653 
2654 		/* Triggers and stuff need to be invoked in query context. */
2655 		MemoryContextSwitchTo(oldcontext);
2656 
2657 		/* Place tuple in tuple slot --- but slot shouldn't free it */
2658 		slot = myslot;
2659 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2660 
2661 		/* Determine the partition to heap_insert the tuple into */
2662 		if (cstate->partition_tuple_routing)
2663 		{
2664 			int			leaf_part_index;
2665 			PartitionTupleRouting *proute = cstate->partition_tuple_routing;
2666 
2667 			/*
2668 			 * Away we go ... If we end up not finding a partition after all,
2669 			 * ExecFindPartition() does not return and errors out instead.
2670 			 * Otherwise, the returned value is to be used as an index into
2671 			 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2672 			 * will get us the ResultRelInfo and TupleConversionMap for the
2673 			 * partition, respectively.
2674 			 */
2675 			leaf_part_index = ExecFindPartition(resultRelInfo,
2676 												proute->partition_dispatch_info,
2677 												slot,
2678 												estate);
2679 			Assert(leaf_part_index >= 0 &&
2680 				   leaf_part_index < proute->num_partitions);
2681 
2682 			/*
2683 			 * If this tuple is mapped to a partition that is not same as the
2684 			 * previous one, we'd better make the bulk insert mechanism gets a
2685 			 * new buffer.
2686 			 */
2687 			if (prev_leaf_part_index != leaf_part_index)
2688 			{
2689 				ReleaseBulkInsertStatePin(bistate);
2690 				prev_leaf_part_index = leaf_part_index;
2691 			}
2692 
2693 			/*
2694 			 * Save the old ResultRelInfo and switch to the one corresponding
2695 			 * to the selected partition.
2696 			 */
2697 			saved_resultRelInfo = resultRelInfo;
2698 			resultRelInfo = proute->partitions[leaf_part_index];
2699 			if (resultRelInfo == NULL)
2700 			{
2701 				resultRelInfo = ExecInitPartitionInfo(mtstate,
2702 													  saved_resultRelInfo,
2703 													  proute, estate,
2704 													  leaf_part_index);
2705 				Assert(resultRelInfo != NULL);
2706 			}
2707 
2708 			/*
2709 			 * For ExecInsertIndexTuples() to work on the partition's indexes
2710 			 */
2711 			estate->es_result_relation_info = resultRelInfo;
2712 
2713 			/*
2714 			 * If we're capturing transition tuples, we might need to convert
2715 			 * from the partition rowtype to parent rowtype.
2716 			 */
2717 			if (cstate->transition_capture != NULL)
2718 			{
2719 				if (resultRelInfo->ri_TrigDesc &&
2720 					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2721 				{
2722 					/*
2723 					 * If there are any BEFORE triggers on the partition,
2724 					 * we'll have to be ready to convert their result back to
2725 					 * tuplestore format.
2726 					 */
2727 					cstate->transition_capture->tcs_original_insert_tuple = NULL;
2728 					cstate->transition_capture->tcs_map =
2729 						TupConvMapForLeaf(proute, saved_resultRelInfo,
2730 										  leaf_part_index);
2731 				}
2732 				else
2733 				{
2734 					/*
2735 					 * Otherwise, just remember the original unconverted
2736 					 * tuple, to avoid a needless round trip conversion.
2737 					 */
2738 					cstate->transition_capture->tcs_original_insert_tuple = tuple;
2739 					cstate->transition_capture->tcs_map = NULL;
2740 				}
2741 			}
2742 
2743 			/*
2744 			 * We might need to convert from the parent rowtype to the
2745 			 * partition rowtype.
2746 			 */
2747 			tuple = ConvertPartitionTupleSlot(proute->parent_child_tupconv_maps[leaf_part_index],
2748 											  tuple,
2749 											  proute->partition_tuple_slot,
2750 											  &slot);
2751 
2752 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2753 		}
2754 
2755 		skip_tuple = false;
2756 
2757 		/* BEFORE ROW INSERT Triggers */
2758 		if (resultRelInfo->ri_TrigDesc &&
2759 			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2760 		{
2761 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2762 
2763 			if (slot == NULL)	/* "do nothing" */
2764 				skip_tuple = true;
2765 			else				/* trigger might have changed tuple */
2766 				tuple = ExecMaterializeSlot(slot);
2767 		}
2768 
2769 		if (!skip_tuple)
2770 		{
2771 			if (resultRelInfo->ri_TrigDesc &&
2772 				resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2773 			{
2774 				/* Pass the data to the INSTEAD ROW INSERT trigger */
2775 				ExecIRInsertTriggers(estate, resultRelInfo, slot);
2776 			}
2777 			else
2778 			{
2779 				/*
2780 				 * If the target is a plain table, check the constraints of
2781 				 * the tuple.
2782 				 */
2783 				if (resultRelInfo->ri_FdwRoutine == NULL &&
2784 					resultRelInfo->ri_RelationDesc->rd_att->constr)
2785 					ExecConstraints(resultRelInfo, slot, estate);
2786 
2787 				/*
2788 				 * Also check the tuple against the partition constraint, if
2789 				 * there is one; except that if we got here via tuple-routing,
2790 				 * we don't need to if there's no BR trigger defined on the
2791 				 * partition.
2792 				 */
2793 				if (resultRelInfo->ri_PartitionCheck &&
2794 					(saved_resultRelInfo == NULL ||
2795 					 (resultRelInfo->ri_TrigDesc &&
2796 					  resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
2797 					ExecPartitionCheck(resultRelInfo, slot, estate, true);
2798 
2799 				if (useHeapMultiInsert)
2800 				{
2801 					/* Add this tuple to the tuple buffer */
2802 					if (nBufferedTuples == 0)
2803 						firstBufferedLineNo = cstate->cur_lineno;
2804 					bufferedTuples[nBufferedTuples++] = tuple;
2805 					bufferedTuplesSize += tuple->t_len;
2806 
2807 					/*
2808 					 * If the buffer filled up, flush it.  Also flush if the
2809 					 * total size of all the tuples in the buffer becomes
2810 					 * large, to avoid using large amounts of memory for the
2811 					 * buffer when the tuples are exceptionally wide.
2812 					 */
2813 					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2814 						bufferedTuplesSize > 65535)
2815 					{
2816 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2817 											resultRelInfo, myslot, bistate,
2818 											nBufferedTuples, bufferedTuples,
2819 											firstBufferedLineNo);
2820 						nBufferedTuples = 0;
2821 						bufferedTuplesSize = 0;
2822 					}
2823 				}
2824 				else
2825 				{
2826 					List	   *recheckIndexes = NIL;
2827 
2828 					/* OK, store the tuple */
2829 					if (resultRelInfo->ri_FdwRoutine != NULL)
2830 					{
2831 						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
2832 																			   resultRelInfo,
2833 																			   slot,
2834 																			   NULL);
2835 
2836 						if (slot == NULL)	/* "do nothing" */
2837 							goto next_tuple;
2838 
2839 						/* FDW might have changed tuple */
2840 						tuple = ExecMaterializeSlot(slot);
2841 
2842 						/*
2843 						 * AFTER ROW Triggers might reference the tableoid
2844 						 * column, so initialize t_tableOid before evaluating
2845 						 * them.
2846 						 */
2847 						tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2848 					}
2849 					else
2850 						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
2851 									mycid, hi_options, bistate);
2852 
2853 					/* And create index entries for it */
2854 					if (resultRelInfo->ri_NumIndices > 0)
2855 						recheckIndexes = ExecInsertIndexTuples(slot,
2856 															   &(tuple->t_self),
2857 															   estate,
2858 															   false,
2859 															   NULL,
2860 															   NIL);
2861 
2862 					/* AFTER ROW INSERT Triggers */
2863 					ExecARInsertTriggers(estate, resultRelInfo, tuple,
2864 										 recheckIndexes, cstate->transition_capture);
2865 
2866 					list_free(recheckIndexes);
2867 				}
2868 			}
2869 
2870 			/*
2871 			 * We count only tuples not suppressed by a BEFORE INSERT trigger
2872 			 * or FDW; this is the same definition used by nodeModifyTable.c
2873 			 * for counting tuples inserted by an INSERT command.
2874 			 */
2875 			processed++;
2876 		}
2877 
2878 next_tuple:
2879 		/* Restore the saved ResultRelInfo */
2880 		if (saved_resultRelInfo)
2881 		{
2882 			resultRelInfo = saved_resultRelInfo;
2883 			estate->es_result_relation_info = resultRelInfo;
2884 		}
2885 	}
2886 
2887 	/* Flush any remaining buffered tuples */
2888 	if (nBufferedTuples > 0)
2889 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2890 							resultRelInfo, myslot, bistate,
2891 							nBufferedTuples, bufferedTuples,
2892 							firstBufferedLineNo);
2893 
2894 	/* Done, clean up */
2895 	error_context_stack = errcallback.previous;
2896 
2897 	FreeBulkInsertState(bistate);
2898 
2899 	MemoryContextSwitchTo(oldcontext);
2900 
2901 	/*
2902 	 * In the old protocol, tell pqcomm that we can process normal protocol
2903 	 * messages again.
2904 	 */
2905 	if (cstate->copy_dest == COPY_OLD_FE)
2906 		pq_endmsgread();
2907 
2908 	/* Execute AFTER STATEMENT insertion triggers */
2909 	ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2910 
2911 	/* Handle queued AFTER triggers */
2912 	AfterTriggerEndQuery(estate);
2913 
2914 	pfree(values);
2915 	pfree(nulls);
2916 
2917 	ExecResetTupleTable(estate->es_tupleTable, false);
2918 
2919 	/* Allow the FDW to shut down */
2920 	if (resultRelInfo->ri_FdwRoutine != NULL &&
2921 		resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
2922 		resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
2923 													   resultRelInfo);
2924 
2925 	ExecCloseIndices(resultRelInfo);
2926 
2927 	/* Close all the partitioned tables, leaf partitions, and their indices */
2928 	if (cstate->partition_tuple_routing)
2929 		ExecCleanupTupleRouting(mtstate, cstate->partition_tuple_routing);
2930 
2931 	/* Close any trigger target relations */
2932 	ExecCleanUpTriggerState(estate);
2933 
2934 	FreeExecutorState(estate);
2935 
2936 	/*
2937 	 * If we skipped writing WAL, then we need to sync the heap (but not
2938 	 * indexes since those use WAL anyway)
2939 	 */
2940 	if (hi_options & HEAP_INSERT_SKIP_WAL)
2941 		heap_sync(cstate->rel);
2942 
2943 	return processed;
2944 }
2945 
2946 /*
2947  * A subroutine of CopyFrom, to write the current batch of buffered heap
2948  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2949  * triggers.
2950  */
2951 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2952 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2953 					int hi_options, ResultRelInfo *resultRelInfo,
2954 					TupleTableSlot *myslot, BulkInsertState bistate,
2955 					int nBufferedTuples, HeapTuple *bufferedTuples,
2956 					uint64 firstBufferedLineNo)
2957 {
2958 	MemoryContext oldcontext;
2959 	int			i;
2960 	uint64		save_cur_lineno;
2961 
2962 	/*
2963 	 * Print error context information correctly, if one of the operations
2964 	 * below fail.
2965 	 */
2966 	cstate->line_buf_valid = false;
2967 	save_cur_lineno = cstate->cur_lineno;
2968 
2969 	/*
2970 	 * heap_multi_insert leaks memory, so switch to short-lived memory context
2971 	 * before calling it.
2972 	 */
2973 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2974 	heap_multi_insert(cstate->rel,
2975 					  bufferedTuples,
2976 					  nBufferedTuples,
2977 					  mycid,
2978 					  hi_options,
2979 					  bistate);
2980 	MemoryContextSwitchTo(oldcontext);
2981 
2982 	/*
2983 	 * If there are any indexes, update them for all the inserted tuples, and
2984 	 * run AFTER ROW INSERT triggers.
2985 	 */
2986 	if (resultRelInfo->ri_NumIndices > 0)
2987 	{
2988 		for (i = 0; i < nBufferedTuples; i++)
2989 		{
2990 			List	   *recheckIndexes;
2991 
2992 			cstate->cur_lineno = firstBufferedLineNo + i;
2993 			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2994 			recheckIndexes =
2995 				ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2996 									  estate, false, NULL, NIL);
2997 			ExecARInsertTriggers(estate, resultRelInfo,
2998 								 bufferedTuples[i],
2999 								 recheckIndexes, cstate->transition_capture);
3000 			list_free(recheckIndexes);
3001 		}
3002 	}
3003 
3004 	/*
3005 	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
3006 	 * anyway.
3007 	 */
3008 	else if (resultRelInfo->ri_TrigDesc != NULL &&
3009 			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
3010 			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
3011 	{
3012 		for (i = 0; i < nBufferedTuples; i++)
3013 		{
3014 			cstate->cur_lineno = firstBufferedLineNo + i;
3015 			ExecARInsertTriggers(estate, resultRelInfo,
3016 								 bufferedTuples[i],
3017 								 NIL, cstate->transition_capture);
3018 		}
3019 	}
3020 
3021 	/* reset cur_lineno to where we were */
3022 	cstate->cur_lineno = save_cur_lineno;
3023 }
3024 
3025 /*
3026  * Setup to read tuples from a file for COPY FROM.
3027  *
3028  * 'rel': Used as a template for the tuples
3029  * 'filename': Name of server-local file to read
3030  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3031  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3032  *
3033  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3034  */
3035 CopyState
BeginCopyFrom(ParseState * pstate,Relation rel,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)3036 BeginCopyFrom(ParseState *pstate,
3037 			  Relation rel,
3038 			  const char *filename,
3039 			  bool is_program,
3040 			  copy_data_source_cb data_source_cb,
3041 			  List *attnamelist,
3042 			  List *options)
3043 {
3044 	CopyState	cstate;
3045 	bool		pipe = (filename == NULL);
3046 	TupleDesc	tupDesc;
3047 	AttrNumber	num_phys_attrs,
3048 				num_defaults;
3049 	FmgrInfo   *in_functions;
3050 	Oid		   *typioparams;
3051 	int			attnum;
3052 	Oid			in_func_oid;
3053 	int		   *defmap;
3054 	ExprState **defexprs;
3055 	MemoryContext oldcontext;
3056 	bool		volatile_defexprs;
3057 
3058 	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3059 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3060 
3061 	/* Initialize state variables */
3062 	cstate->reached_eof = false;
3063 	cstate->eol_type = EOL_UNKNOWN;
3064 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
3065 	cstate->cur_lineno = 0;
3066 	cstate->cur_attname = NULL;
3067 	cstate->cur_attval = NULL;
3068 
3069 	/* Set up variables to avoid per-attribute overhead. */
3070 	initStringInfo(&cstate->attribute_buf);
3071 	initStringInfo(&cstate->line_buf);
3072 	cstate->line_buf_converted = false;
3073 	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3074 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
3075 
3076 	/* Assign range table, we'll need it in CopyFrom. */
3077 	if (pstate)
3078 		cstate->range_table = pstate->p_rtable;
3079 
3080 	tupDesc = RelationGetDescr(cstate->rel);
3081 	num_phys_attrs = tupDesc->natts;
3082 	num_defaults = 0;
3083 	volatile_defexprs = false;
3084 
3085 	/*
3086 	 * Pick up the required catalog information for each attribute in the
3087 	 * relation, including the input function, the element type (to pass to
3088 	 * the input function), and info about defaults and constraints. (Which
3089 	 * input function we use depends on text/binary format choice.)
3090 	 */
3091 	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3092 	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3093 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3094 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3095 
3096 	for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3097 	{
3098 		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3099 
3100 		/* We don't need info for dropped attributes */
3101 		if (att->attisdropped)
3102 			continue;
3103 
3104 		/* Fetch the input function and typioparam info */
3105 		if (cstate->binary)
3106 			getTypeBinaryInputInfo(att->atttypid,
3107 								   &in_func_oid, &typioparams[attnum - 1]);
3108 		else
3109 			getTypeInputInfo(att->atttypid,
3110 							 &in_func_oid, &typioparams[attnum - 1]);
3111 		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3112 
3113 		/* Get default info if needed */
3114 		if (!list_member_int(cstate->attnumlist, attnum))
3115 		{
3116 			/* attribute is NOT to be copied from input */
3117 			/* use default value if one exists */
3118 			Expr	   *defexpr = (Expr *) build_column_default(cstate->rel,
3119 																attnum);
3120 
3121 			if (defexpr != NULL)
3122 			{
3123 				/* Run the expression through planner */
3124 				defexpr = expression_planner(defexpr);
3125 
3126 				/* Initialize executable expression in copycontext */
3127 				defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3128 				defmap[num_defaults] = attnum - 1;
3129 				num_defaults++;
3130 
3131 				/*
3132 				 * If a default expression looks at the table being loaded,
3133 				 * then it could give the wrong answer when using
3134 				 * multi-insert. Since database access can be dynamic this is
3135 				 * hard to test for exactly, so we use the much wider test of
3136 				 * whether the default expression is volatile. We allow for
3137 				 * the special case of when the default expression is the
3138 				 * nextval() of a sequence which in this specific case is
3139 				 * known to be safe for use with the multi-insert
3140 				 * optimization. Hence we use this special case function
3141 				 * checker rather than the standard check for
3142 				 * contain_volatile_functions().
3143 				 */
3144 				if (!volatile_defexprs)
3145 					volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3146 			}
3147 		}
3148 	}
3149 
3150 	/* We keep those variables in cstate. */
3151 	cstate->in_functions = in_functions;
3152 	cstate->typioparams = typioparams;
3153 	cstate->defmap = defmap;
3154 	cstate->defexprs = defexprs;
3155 	cstate->volatile_defexprs = volatile_defexprs;
3156 	cstate->num_defaults = num_defaults;
3157 	cstate->is_program = is_program;
3158 
3159 	if (data_source_cb)
3160 	{
3161 		cstate->copy_dest = COPY_CALLBACK;
3162 		cstate->data_source_cb = data_source_cb;
3163 	}
3164 	else if (pipe)
3165 	{
3166 		Assert(!is_program);	/* the grammar does not allow this */
3167 		if (whereToSendOutput == DestRemote)
3168 			ReceiveCopyBegin(cstate);
3169 		else
3170 			cstate->copy_file = stdin;
3171 	}
3172 	else
3173 	{
3174 		cstate->filename = pstrdup(filename);
3175 
3176 		if (cstate->is_program)
3177 		{
3178 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3179 			if (cstate->copy_file == NULL)
3180 				ereport(ERROR,
3181 						(errcode_for_file_access(),
3182 						 errmsg("could not execute command \"%s\": %m",
3183 								cstate->filename)));
3184 		}
3185 		else
3186 		{
3187 			struct stat st;
3188 
3189 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3190 			if (cstate->copy_file == NULL)
3191 			{
3192 				/* copy errno because ereport subfunctions might change it */
3193 				int			save_errno = errno;
3194 
3195 				ereport(ERROR,
3196 						(errcode_for_file_access(),
3197 						 errmsg("could not open file \"%s\" for reading: %m",
3198 								cstate->filename),
3199 						 (save_errno == ENOENT || save_errno == EACCES) ?
3200 						 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3201 								 "You may want a client-side facility such as psql's \\copy.") : 0));
3202 			}
3203 
3204 			if (fstat(fileno(cstate->copy_file), &st))
3205 				ereport(ERROR,
3206 						(errcode_for_file_access(),
3207 						 errmsg("could not stat file \"%s\": %m",
3208 								cstate->filename)));
3209 
3210 			if (S_ISDIR(st.st_mode))
3211 				ereport(ERROR,
3212 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3213 						 errmsg("\"%s\" is a directory", cstate->filename)));
3214 		}
3215 	}
3216 
3217 	if (!cstate->binary)
3218 	{
3219 		/* must rely on user to tell us... */
3220 		cstate->file_has_oids = cstate->oids;
3221 	}
3222 	else
3223 	{
3224 		/* Read and verify binary header */
3225 		char		readSig[11];
3226 		int32		tmp;
3227 
3228 		/* Signature */
3229 		if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3230 			memcmp(readSig, BinarySignature, 11) != 0)
3231 			ereport(ERROR,
3232 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3233 					 errmsg("COPY file signature not recognized")));
3234 		/* Flags field */
3235 		if (!CopyGetInt32(cstate, &tmp))
3236 			ereport(ERROR,
3237 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3238 					 errmsg("invalid COPY file header (missing flags)")));
3239 		cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3240 		tmp &= ~(1 << 16);
3241 		if ((tmp >> 16) != 0)
3242 			ereport(ERROR,
3243 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3244 					 errmsg("unrecognized critical flags in COPY file header")));
3245 		/* Header extension length */
3246 		if (!CopyGetInt32(cstate, &tmp) ||
3247 			tmp < 0)
3248 			ereport(ERROR,
3249 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3250 					 errmsg("invalid COPY file header (missing length)")));
3251 		/* Skip extension header, if present */
3252 		while (tmp-- > 0)
3253 		{
3254 			if (CopyGetData(cstate, readSig, 1, 1) != 1)
3255 				ereport(ERROR,
3256 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3257 						 errmsg("invalid COPY file header (wrong length)")));
3258 		}
3259 	}
3260 
3261 	if (cstate->file_has_oids && cstate->binary)
3262 	{
3263 		getTypeBinaryInputInfo(OIDOID,
3264 							   &in_func_oid, &cstate->oid_typioparam);
3265 		fmgr_info(in_func_oid, &cstate->oid_in_function);
3266 	}
3267 
3268 	/* create workspace for CopyReadAttributes results */
3269 	if (!cstate->binary)
3270 	{
3271 		AttrNumber	attr_count = list_length(cstate->attnumlist);
3272 		int			nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3273 
3274 		cstate->max_fields = nfields;
3275 		cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3276 	}
3277 
3278 	MemoryContextSwitchTo(oldcontext);
3279 
3280 	return cstate;
3281 }
3282 
3283 /*
3284  * Read raw fields in the next line for COPY FROM in text or csv mode.
3285  * Return false if no more lines.
3286  *
3287  * An internal temporary buffer is returned via 'fields'. It is valid until
3288  * the next call of the function. Since the function returns all raw fields
3289  * in the input file, 'nfields' could be different from the number of columns
3290  * in the relation.
3291  *
3292  * NOTE: force_not_null option are not applied to the returned fields.
3293  */
3294 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)3295 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3296 {
3297 	int			fldct;
3298 	bool		done;
3299 
3300 	/* only available for text or csv input */
3301 	Assert(!cstate->binary);
3302 
3303 	/* on input just throw the header line away */
3304 	if (cstate->cur_lineno == 0 && cstate->header_line)
3305 	{
3306 		cstate->cur_lineno++;
3307 		if (CopyReadLine(cstate))
3308 			return false;		/* done */
3309 	}
3310 
3311 	cstate->cur_lineno++;
3312 
3313 	/* Actually read the line into memory here */
3314 	done = CopyReadLine(cstate);
3315 
3316 	/*
3317 	 * EOF at start of line means we're done.  If we see EOF after some
3318 	 * characters, we act as though it was newline followed by EOF, ie,
3319 	 * process the line and then exit loop on next iteration.
3320 	 */
3321 	if (done && cstate->line_buf.len == 0)
3322 		return false;
3323 
3324 	/* Parse the line into de-escaped field values */
3325 	if (cstate->csv_mode)
3326 		fldct = CopyReadAttributesCSV(cstate);
3327 	else
3328 		fldct = CopyReadAttributesText(cstate);
3329 
3330 	*fields = cstate->raw_fields;
3331 	*nfields = fldct;
3332 	return true;
3333 }
3334 
3335 /*
3336  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3337  *
3338  * 'econtext' is used to evaluate default expression for each columns not
3339  * read from the file. It can be NULL when no default values are used, i.e.
3340  * when all columns are read from the file.
3341  *
3342  * 'values' and 'nulls' arrays must be the same length as columns of the
3343  * relation passed to BeginCopyFrom. This function fills the arrays.
3344  * Oid of the tuple is returned with 'tupleOid' separately.
3345  */
3346 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3347 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3348 			 Datum *values, bool *nulls, Oid *tupleOid)
3349 {
3350 	TupleDesc	tupDesc;
3351 	AttrNumber	num_phys_attrs,
3352 				attr_count,
3353 				num_defaults = cstate->num_defaults;
3354 	FmgrInfo   *in_functions = cstate->in_functions;
3355 	Oid		   *typioparams = cstate->typioparams;
3356 	int			i;
3357 	int			nfields;
3358 	bool		isnull;
3359 	bool		file_has_oids = cstate->file_has_oids;
3360 	int		   *defmap = cstate->defmap;
3361 	ExprState **defexprs = cstate->defexprs;
3362 
3363 	tupDesc = RelationGetDescr(cstate->rel);
3364 	num_phys_attrs = tupDesc->natts;
3365 	attr_count = list_length(cstate->attnumlist);
3366 	nfields = file_has_oids ? (attr_count + 1) : attr_count;
3367 
3368 	/* Initialize all values for row to NULL */
3369 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3370 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3371 
3372 	if (!cstate->binary)
3373 	{
3374 		char	  **field_strings;
3375 		ListCell   *cur;
3376 		int			fldct;
3377 		int			fieldno;
3378 		char	   *string;
3379 
3380 		/* read raw fields in the next line */
3381 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3382 			return false;
3383 
3384 		/* check for overflowing fields */
3385 		if (nfields > 0 && fldct > nfields)
3386 			ereport(ERROR,
3387 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3388 					 errmsg("extra data after last expected column")));
3389 
3390 		fieldno = 0;
3391 
3392 		/* Read the OID field if present */
3393 		if (file_has_oids)
3394 		{
3395 			if (fieldno >= fldct)
3396 				ereport(ERROR,
3397 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3398 						 errmsg("missing data for OID column")));
3399 			string = field_strings[fieldno++];
3400 
3401 			if (string == NULL)
3402 				ereport(ERROR,
3403 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3404 						 errmsg("null OID in COPY data")));
3405 			else if (cstate->oids && tupleOid != NULL)
3406 			{
3407 				cstate->cur_attname = "oid";
3408 				cstate->cur_attval = string;
3409 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3410 																 CStringGetDatum(string)));
3411 				if (*tupleOid == InvalidOid)
3412 					ereport(ERROR,
3413 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3414 							 errmsg("invalid OID in COPY data")));
3415 				cstate->cur_attname = NULL;
3416 				cstate->cur_attval = NULL;
3417 			}
3418 		}
3419 
3420 		/* Loop to read the user attributes on the line. */
3421 		foreach(cur, cstate->attnumlist)
3422 		{
3423 			int			attnum = lfirst_int(cur);
3424 			int			m = attnum - 1;
3425 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3426 
3427 			if (fieldno >= fldct)
3428 				ereport(ERROR,
3429 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3430 						 errmsg("missing data for column \"%s\"",
3431 								NameStr(att->attname))));
3432 			string = field_strings[fieldno++];
3433 
3434 			if (cstate->convert_select_flags &&
3435 				!cstate->convert_select_flags[m])
3436 			{
3437 				/* ignore input field, leaving column as NULL */
3438 				continue;
3439 			}
3440 
3441 			if (cstate->csv_mode)
3442 			{
3443 				if (string == NULL &&
3444 					cstate->force_notnull_flags[m])
3445 				{
3446 					/*
3447 					 * FORCE_NOT_NULL option is set and column is NULL -
3448 					 * convert it to the NULL string.
3449 					 */
3450 					string = cstate->null_print;
3451 				}
3452 				else if (string != NULL && cstate->force_null_flags[m]
3453 						 && strcmp(string, cstate->null_print) == 0)
3454 				{
3455 					/*
3456 					 * FORCE_NULL option is set and column matches the NULL
3457 					 * string. It must have been quoted, or otherwise the
3458 					 * string would already have been set to NULL. Convert it
3459 					 * to NULL as specified.
3460 					 */
3461 					string = NULL;
3462 				}
3463 			}
3464 
3465 			cstate->cur_attname = NameStr(att->attname);
3466 			cstate->cur_attval = string;
3467 			values[m] = InputFunctionCall(&in_functions[m],
3468 										  string,
3469 										  typioparams[m],
3470 										  att->atttypmod);
3471 			if (string != NULL)
3472 				nulls[m] = false;
3473 			cstate->cur_attname = NULL;
3474 			cstate->cur_attval = NULL;
3475 		}
3476 
3477 		Assert(fieldno == nfields);
3478 	}
3479 	else
3480 	{
3481 		/* binary */
3482 		int16		fld_count;
3483 		ListCell   *cur;
3484 
3485 		cstate->cur_lineno++;
3486 
3487 		if (!CopyGetInt16(cstate, &fld_count))
3488 		{
3489 			/* EOF detected (end of file, or protocol-level EOF) */
3490 			return false;
3491 		}
3492 
3493 		if (fld_count == -1)
3494 		{
3495 			/*
3496 			 * Received EOF marker.  In a V3-protocol copy, wait for the
3497 			 * protocol-level EOF, and complain if it doesn't come
3498 			 * immediately.  This ensures that we correctly handle CopyFail,
3499 			 * if client chooses to send that now.
3500 			 *
3501 			 * Note that we MUST NOT try to read more data in an old-protocol
3502 			 * copy, since there is no protocol-level EOF marker then.  We
3503 			 * could go either way for copy from file, but choose to throw
3504 			 * error if there's data after the EOF marker, for consistency
3505 			 * with the new-protocol case.
3506 			 */
3507 			char		dummy;
3508 
3509 			if (cstate->copy_dest != COPY_OLD_FE &&
3510 				CopyGetData(cstate, &dummy, 1, 1) > 0)
3511 				ereport(ERROR,
3512 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3513 						 errmsg("received copy data after EOF marker")));
3514 			return false;
3515 		}
3516 
3517 		if (fld_count != attr_count)
3518 			ereport(ERROR,
3519 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3520 					 errmsg("row field count is %d, expected %d",
3521 							(int) fld_count, attr_count)));
3522 
3523 		if (file_has_oids)
3524 		{
3525 			Oid			loaded_oid;
3526 
3527 			cstate->cur_attname = "oid";
3528 			loaded_oid =
3529 				DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3530 														 0,
3531 														 &cstate->oid_in_function,
3532 														 cstate->oid_typioparam,
3533 														 -1,
3534 														 &isnull));
3535 			if (isnull || loaded_oid == InvalidOid)
3536 				ereport(ERROR,
3537 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3538 						 errmsg("invalid OID in COPY data")));
3539 			cstate->cur_attname = NULL;
3540 			if (cstate->oids && tupleOid != NULL)
3541 				*tupleOid = loaded_oid;
3542 		}
3543 
3544 		i = 0;
3545 		foreach(cur, cstate->attnumlist)
3546 		{
3547 			int			attnum = lfirst_int(cur);
3548 			int			m = attnum - 1;
3549 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3550 
3551 			cstate->cur_attname = NameStr(att->attname);
3552 			i++;
3553 			values[m] = CopyReadBinaryAttribute(cstate,
3554 												i,
3555 												&in_functions[m],
3556 												typioparams[m],
3557 												att->atttypmod,
3558 												&nulls[m]);
3559 			cstate->cur_attname = NULL;
3560 		}
3561 	}
3562 
3563 	/*
3564 	 * Now compute and insert any defaults available for the columns not
3565 	 * provided by the input data.  Anything not processed here or above will
3566 	 * remain NULL.
3567 	 */
3568 	for (i = 0; i < num_defaults; i++)
3569 	{
3570 		/*
3571 		 * The caller must supply econtext and have switched into the
3572 		 * per-tuple memory context in it.
3573 		 */
3574 		Assert(econtext != NULL);
3575 		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3576 
3577 		values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3578 										 &nulls[defmap[i]]);
3579 	}
3580 
3581 	return true;
3582 }
3583 
3584 /*
3585  * Clean up storage and release resources for COPY FROM.
3586  */
3587 void
EndCopyFrom(CopyState cstate)3588 EndCopyFrom(CopyState cstate)
3589 {
3590 	/* No COPY FROM related resources except memory. */
3591 
3592 	EndCopy(cstate);
3593 }
3594 
3595 /*
3596  * Read the next input line and stash it in line_buf, with conversion to
3597  * server encoding.
3598  *
3599  * Result is true if read was terminated by EOF, false if terminated
3600  * by newline.  The terminating newline or EOF marker is not included
3601  * in the final value of line_buf.
3602  */
3603 static bool
CopyReadLine(CopyState cstate)3604 CopyReadLine(CopyState cstate)
3605 {
3606 	bool		result;
3607 
3608 	resetStringInfo(&cstate->line_buf);
3609 	cstate->line_buf_valid = true;
3610 
3611 	/* Mark that encoding conversion hasn't occurred yet */
3612 	cstate->line_buf_converted = false;
3613 
3614 	/* Parse data and transfer into line_buf */
3615 	result = CopyReadLineText(cstate);
3616 
3617 	if (result)
3618 	{
3619 		/*
3620 		 * Reached EOF.  In protocol version 3, we should ignore anything
3621 		 * after \. up to the protocol end of copy data.  (XXX maybe better
3622 		 * not to treat \. as special?)
3623 		 */
3624 		if (cstate->copy_dest == COPY_NEW_FE)
3625 		{
3626 			do
3627 			{
3628 				cstate->raw_buf_index = cstate->raw_buf_len;
3629 			} while (CopyLoadRawBuf(cstate));
3630 		}
3631 	}
3632 	else
3633 	{
3634 		/*
3635 		 * If we didn't hit EOF, then we must have transferred the EOL marker
3636 		 * to line_buf along with the data.  Get rid of it.
3637 		 */
3638 		switch (cstate->eol_type)
3639 		{
3640 			case EOL_NL:
3641 				Assert(cstate->line_buf.len >= 1);
3642 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3643 				cstate->line_buf.len--;
3644 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3645 				break;
3646 			case EOL_CR:
3647 				Assert(cstate->line_buf.len >= 1);
3648 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3649 				cstate->line_buf.len--;
3650 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3651 				break;
3652 			case EOL_CRNL:
3653 				Assert(cstate->line_buf.len >= 2);
3654 				Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3655 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3656 				cstate->line_buf.len -= 2;
3657 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3658 				break;
3659 			case EOL_UNKNOWN:
3660 				/* shouldn't get here */
3661 				Assert(false);
3662 				break;
3663 		}
3664 	}
3665 
3666 	/* Done reading the line.  Convert it to server encoding. */
3667 	if (cstate->need_transcoding)
3668 	{
3669 		char	   *cvt;
3670 
3671 		cvt = pg_any_to_server(cstate->line_buf.data,
3672 							   cstate->line_buf.len,
3673 							   cstate->file_encoding);
3674 		if (cvt != cstate->line_buf.data)
3675 		{
3676 			/* transfer converted data back to line_buf */
3677 			resetStringInfo(&cstate->line_buf);
3678 			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3679 			pfree(cvt);
3680 		}
3681 	}
3682 
3683 	/* Now it's safe to use the buffer in error messages */
3684 	cstate->line_buf_converted = true;
3685 
3686 	return result;
3687 }
3688 
3689 /*
3690  * CopyReadLineText - inner loop of CopyReadLine for text mode
3691  */
3692 static bool
CopyReadLineText(CopyState cstate)3693 CopyReadLineText(CopyState cstate)
3694 {
3695 	char	   *copy_raw_buf;
3696 	int			raw_buf_ptr;
3697 	int			copy_buf_len;
3698 	bool		need_data = false;
3699 	bool		hit_eof = false;
3700 	bool		result = false;
3701 	char		mblen_str[2];
3702 
3703 	/* CSV variables */
3704 	bool		first_char_in_line = true;
3705 	bool		in_quote = false,
3706 				last_was_esc = false;
3707 	char		quotec = '\0';
3708 	char		escapec = '\0';
3709 
3710 	if (cstate->csv_mode)
3711 	{
3712 		quotec = cstate->quote[0];
3713 		escapec = cstate->escape[0];
3714 		/* ignore special escape processing if it's the same as quotec */
3715 		if (quotec == escapec)
3716 			escapec = '\0';
3717 	}
3718 
3719 	mblen_str[1] = '\0';
3720 
3721 	/*
3722 	 * The objective of this loop is to transfer the entire next input line
3723 	 * into line_buf.  Hence, we only care for detecting newlines (\r and/or
3724 	 * \n) and the end-of-copy marker (\.).
3725 	 *
3726 	 * In CSV mode, \r and \n inside a quoted field are just part of the data
3727 	 * value and are put in line_buf.  We keep just enough state to know if we
3728 	 * are currently in a quoted field or not.
3729 	 *
3730 	 * These four characters, and the CSV escape and quote characters, are
3731 	 * assumed the same in frontend and backend encodings.
3732 	 *
3733 	 * For speed, we try to move data from raw_buf to line_buf in chunks
3734 	 * rather than one character at a time.  raw_buf_ptr points to the next
3735 	 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3736 	 * have been determined to be part of the line, but not yet transferred to
3737 	 * line_buf.
3738 	 *
3739 	 * For a little extra speed within the loop, we copy raw_buf and
3740 	 * raw_buf_len into local variables.
3741 	 */
3742 	copy_raw_buf = cstate->raw_buf;
3743 	raw_buf_ptr = cstate->raw_buf_index;
3744 	copy_buf_len = cstate->raw_buf_len;
3745 
3746 	for (;;)
3747 	{
3748 		int			prev_raw_ptr;
3749 		char		c;
3750 
3751 		/*
3752 		 * Load more data if needed.  Ideally we would just force four bytes
3753 		 * of read-ahead and avoid the many calls to
3754 		 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3755 		 * does not allow us to read too far ahead or we might read into the
3756 		 * next data, so we read-ahead only as far we know we can.  One
3757 		 * optimization would be to read-ahead four byte here if
3758 		 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3759 		 * considering the size of the buffer.
3760 		 */
3761 		if (raw_buf_ptr >= copy_buf_len || need_data)
3762 		{
3763 			REFILL_LINEBUF;
3764 
3765 			/*
3766 			 * Try to read some more data.  This will certainly reset
3767 			 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3768 			 */
3769 			if (!CopyLoadRawBuf(cstate))
3770 				hit_eof = true;
3771 			raw_buf_ptr = 0;
3772 			copy_buf_len = cstate->raw_buf_len;
3773 
3774 			/*
3775 			 * If we are completely out of data, break out of the loop,
3776 			 * reporting EOF.
3777 			 */
3778 			if (copy_buf_len <= 0)
3779 			{
3780 				result = true;
3781 				break;
3782 			}
3783 			need_data = false;
3784 		}
3785 
3786 		/* OK to fetch a character */
3787 		prev_raw_ptr = raw_buf_ptr;
3788 		c = copy_raw_buf[raw_buf_ptr++];
3789 
3790 		if (cstate->csv_mode)
3791 		{
3792 			/*
3793 			 * If character is '\\' or '\r', we may need to look ahead below.
3794 			 * Force fetch of the next character if we don't already have it.
3795 			 * We need to do this before changing CSV state, in case one of
3796 			 * these characters is also the quote or escape character.
3797 			 *
3798 			 * Note: old-protocol does not like forced prefetch, but it's OK
3799 			 * here since we cannot validly be at EOF.
3800 			 */
3801 			if (c == '\\' || c == '\r')
3802 			{
3803 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3804 			}
3805 
3806 			/*
3807 			 * Dealing with quotes and escapes here is mildly tricky. If the
3808 			 * quote char is also the escape char, there's no problem - we
3809 			 * just use the char as a toggle. If they are different, we need
3810 			 * to ensure that we only take account of an escape inside a
3811 			 * quoted field and immediately preceding a quote char, and not
3812 			 * the second in an escape-escape sequence.
3813 			 */
3814 			if (in_quote && c == escapec)
3815 				last_was_esc = !last_was_esc;
3816 			if (c == quotec && !last_was_esc)
3817 				in_quote = !in_quote;
3818 			if (c != escapec)
3819 				last_was_esc = false;
3820 
3821 			/*
3822 			 * Updating the line count for embedded CR and/or LF chars is
3823 			 * necessarily a little fragile - this test is probably about the
3824 			 * best we can do.  (XXX it's arguable whether we should do this
3825 			 * at all --- is cur_lineno a physical or logical count?)
3826 			 */
3827 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3828 				cstate->cur_lineno++;
3829 		}
3830 
3831 		/* Process \r */
3832 		if (c == '\r' && (!cstate->csv_mode || !in_quote))
3833 		{
3834 			/* Check for \r\n on first line, _and_ handle \r\n. */
3835 			if (cstate->eol_type == EOL_UNKNOWN ||
3836 				cstate->eol_type == EOL_CRNL)
3837 			{
3838 				/*
3839 				 * If need more data, go back to loop top to load it.
3840 				 *
3841 				 * Note that if we are at EOF, c will wind up as '\0' because
3842 				 * of the guaranteed pad of raw_buf.
3843 				 */
3844 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3845 
3846 				/* get next char */
3847 				c = copy_raw_buf[raw_buf_ptr];
3848 
3849 				if (c == '\n')
3850 				{
3851 					raw_buf_ptr++;	/* eat newline */
3852 					cstate->eol_type = EOL_CRNL;	/* in case not set yet */
3853 				}
3854 				else
3855 				{
3856 					/* found \r, but no \n */
3857 					if (cstate->eol_type == EOL_CRNL)
3858 						ereport(ERROR,
3859 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3860 								 !cstate->csv_mode ?
3861 								 errmsg("literal carriage return found in data") :
3862 								 errmsg("unquoted carriage return found in data"),
3863 								 !cstate->csv_mode ?
3864 								 errhint("Use \"\\r\" to represent carriage return.") :
3865 								 errhint("Use quoted CSV field to represent carriage return.")));
3866 
3867 					/*
3868 					 * if we got here, it is the first line and we didn't find
3869 					 * \n, so don't consume the peeked character
3870 					 */
3871 					cstate->eol_type = EOL_CR;
3872 				}
3873 			}
3874 			else if (cstate->eol_type == EOL_NL)
3875 				ereport(ERROR,
3876 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3877 						 !cstate->csv_mode ?
3878 						 errmsg("literal carriage return found in data") :
3879 						 errmsg("unquoted carriage return found in data"),
3880 						 !cstate->csv_mode ?
3881 						 errhint("Use \"\\r\" to represent carriage return.") :
3882 						 errhint("Use quoted CSV field to represent carriage return.")));
3883 			/* If reach here, we have found the line terminator */
3884 			break;
3885 		}
3886 
3887 		/* Process \n */
3888 		if (c == '\n' && (!cstate->csv_mode || !in_quote))
3889 		{
3890 			if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3891 				ereport(ERROR,
3892 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3893 						 !cstate->csv_mode ?
3894 						 errmsg("literal newline found in data") :
3895 						 errmsg("unquoted newline found in data"),
3896 						 !cstate->csv_mode ?
3897 						 errhint("Use \"\\n\" to represent newline.") :
3898 						 errhint("Use quoted CSV field to represent newline.")));
3899 			cstate->eol_type = EOL_NL;	/* in case not set yet */
3900 			/* If reach here, we have found the line terminator */
3901 			break;
3902 		}
3903 
3904 		/*
3905 		 * In CSV mode, we only recognize \. alone on a line.  This is because
3906 		 * \. is a valid CSV data value.
3907 		 */
3908 		if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3909 		{
3910 			char		c2;
3911 
3912 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3913 			IF_NEED_REFILL_AND_EOF_BREAK(0);
3914 
3915 			/* -----
3916 			 * get next character
3917 			 * Note: we do not change c so if it isn't \., we can fall
3918 			 * through and continue processing for file encoding.
3919 			 * -----
3920 			 */
3921 			c2 = copy_raw_buf[raw_buf_ptr];
3922 
3923 			if (c2 == '.')
3924 			{
3925 				raw_buf_ptr++;	/* consume the '.' */
3926 
3927 				/*
3928 				 * Note: if we loop back for more data here, it does not
3929 				 * matter that the CSV state change checks are re-executed; we
3930 				 * will come back here with no important state changed.
3931 				 */
3932 				if (cstate->eol_type == EOL_CRNL)
3933 				{
3934 					/* Get the next character */
3935 					IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3936 					/* if hit_eof, c2 will become '\0' */
3937 					c2 = copy_raw_buf[raw_buf_ptr++];
3938 
3939 					if (c2 == '\n')
3940 					{
3941 						if (!cstate->csv_mode)
3942 							ereport(ERROR,
3943 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3944 									 errmsg("end-of-copy marker does not match previous newline style")));
3945 						else
3946 							NO_END_OF_COPY_GOTO;
3947 					}
3948 					else if (c2 != '\r')
3949 					{
3950 						if (!cstate->csv_mode)
3951 							ereport(ERROR,
3952 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3953 									 errmsg("end-of-copy marker corrupt")));
3954 						else
3955 							NO_END_OF_COPY_GOTO;
3956 					}
3957 				}
3958 
3959 				/* Get the next character */
3960 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3961 				/* if hit_eof, c2 will become '\0' */
3962 				c2 = copy_raw_buf[raw_buf_ptr++];
3963 
3964 				if (c2 != '\r' && c2 != '\n')
3965 				{
3966 					if (!cstate->csv_mode)
3967 						ereport(ERROR,
3968 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3969 								 errmsg("end-of-copy marker corrupt")));
3970 					else
3971 						NO_END_OF_COPY_GOTO;
3972 				}
3973 
3974 				if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3975 					(cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3976 					(cstate->eol_type == EOL_CR && c2 != '\r'))
3977 				{
3978 					ereport(ERROR,
3979 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3980 							 errmsg("end-of-copy marker does not match previous newline style")));
3981 				}
3982 
3983 				/*
3984 				 * Transfer only the data before the \. into line_buf, then
3985 				 * discard the data and the \. sequence.
3986 				 */
3987 				if (prev_raw_ptr > cstate->raw_buf_index)
3988 					appendBinaryStringInfo(&cstate->line_buf,
3989 										   cstate->raw_buf + cstate->raw_buf_index,
3990 										   prev_raw_ptr - cstate->raw_buf_index);
3991 				cstate->raw_buf_index = raw_buf_ptr;
3992 				result = true;	/* report EOF */
3993 				break;
3994 			}
3995 			else if (!cstate->csv_mode)
3996 			{
3997 				/*
3998 				 * If we are here, it means we found a backslash followed by
3999 				 * something other than a period.  In non-CSV mode, anything
4000 				 * after a backslash is special, so we skip over that second
4001 				 * character too.  If we didn't do that \\. would be
4002 				 * considered an eof-of copy, while in non-CSV mode it is a
4003 				 * literal backslash followed by a period.  In CSV mode,
4004 				 * backslashes are not special, so we want to process the
4005 				 * character after the backslash just like a normal character,
4006 				 * so we don't increment in those cases.
4007 				 *
4008 				 * Set 'c' to skip whole character correctly in multi-byte
4009 				 * encodings.  If we don't have the whole character in the
4010 				 * buffer yet, we might loop back to process it, after all,
4011 				 * but that's OK because multi-byte characters cannot have any
4012 				 * special meaning.
4013 				 */
4014 				raw_buf_ptr++;
4015 				c = c2;
4016 			}
4017 		}
4018 
4019 		/*
4020 		 * This label is for CSV cases where \. appears at the start of a
4021 		 * line, but there is more text after it, meaning it was a data value.
4022 		 * We are more strict for \. in CSV mode because \. could be a data
4023 		 * value, while in non-CSV mode, \. cannot be a data value.
4024 		 */
4025 not_end_of_copy:
4026 
4027 		/*
4028 		 * Process all bytes of a multi-byte character as a group.
4029 		 *
4030 		 * We only support multi-byte sequences where the first byte has the
4031 		 * high-bit set, so as an optimization we can avoid this block
4032 		 * entirely if it is not set.
4033 		 */
4034 		if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4035 		{
4036 			int			mblen;
4037 
4038 			mblen_str[0] = c;
4039 			/* All our encodings only read the first byte to get the length */
4040 			mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4041 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
4042 			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4043 			raw_buf_ptr += mblen - 1;
4044 		}
4045 		first_char_in_line = false;
4046 	}							/* end of outer loop */
4047 
4048 	/*
4049 	 * Transfer any still-uncopied data to line_buf.
4050 	 */
4051 	REFILL_LINEBUF;
4052 
4053 	return result;
4054 }
4055 
4056 /*
4057  *	Return decimal value for a hexadecimal digit
4058  */
4059 static int
GetDecimalFromHex(char hex)4060 GetDecimalFromHex(char hex)
4061 {
4062 	if (isdigit((unsigned char) hex))
4063 		return hex - '0';
4064 	else
4065 		return tolower((unsigned char) hex) - 'a' + 10;
4066 }
4067 
4068 /*
4069  * Parse the current line into separate attributes (fields),
4070  * performing de-escaping as needed.
4071  *
4072  * The input is in line_buf.  We use attribute_buf to hold the result
4073  * strings.  cstate->raw_fields[k] is set to point to the k'th attribute
4074  * string, or NULL when the input matches the null marker string.
4075  * This array is expanded as necessary.
4076  *
4077  * (Note that the caller cannot check for nulls since the returned
4078  * string would be the post-de-escaping equivalent, which may look
4079  * the same as some valid data string.)
4080  *
4081  * delim is the column delimiter string (must be just one byte for now).
4082  * null_print is the null marker string.  Note that this is compared to
4083  * the pre-de-escaped input string.
4084  *
4085  * The return value is the number of fields actually read.
4086  */
4087 static int
CopyReadAttributesText(CopyState cstate)4088 CopyReadAttributesText(CopyState cstate)
4089 {
4090 	char		delimc = cstate->delim[0];
4091 	int			fieldno;
4092 	char	   *output_ptr;
4093 	char	   *cur_ptr;
4094 	char	   *line_end_ptr;
4095 
4096 	/*
4097 	 * We need a special case for zero-column tables: check that the input
4098 	 * line is empty, and return.
4099 	 */
4100 	if (cstate->max_fields <= 0)
4101 	{
4102 		if (cstate->line_buf.len != 0)
4103 			ereport(ERROR,
4104 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4105 					 errmsg("extra data after last expected column")));
4106 		return 0;
4107 	}
4108 
4109 	resetStringInfo(&cstate->attribute_buf);
4110 
4111 	/*
4112 	 * The de-escaped attributes will certainly not be longer than the input
4113 	 * data line, so we can just force attribute_buf to be large enough and
4114 	 * then transfer data without any checks for enough space.  We need to do
4115 	 * it this way because enlarging attribute_buf mid-stream would invalidate
4116 	 * pointers already stored into cstate->raw_fields[].
4117 	 */
4118 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4119 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4120 	output_ptr = cstate->attribute_buf.data;
4121 
4122 	/* set pointer variables for loop */
4123 	cur_ptr = cstate->line_buf.data;
4124 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4125 
4126 	/* Outer loop iterates over fields */
4127 	fieldno = 0;
4128 	for (;;)
4129 	{
4130 		bool		found_delim = false;
4131 		char	   *start_ptr;
4132 		char	   *end_ptr;
4133 		int			input_len;
4134 		bool		saw_non_ascii = false;
4135 
4136 		/* Make sure there is enough space for the next value */
4137 		if (fieldno >= cstate->max_fields)
4138 		{
4139 			cstate->max_fields *= 2;
4140 			cstate->raw_fields =
4141 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4142 		}
4143 
4144 		/* Remember start of field on both input and output sides */
4145 		start_ptr = cur_ptr;
4146 		cstate->raw_fields[fieldno] = output_ptr;
4147 
4148 		/*
4149 		 * Scan data for field.
4150 		 *
4151 		 * Note that in this loop, we are scanning to locate the end of field
4152 		 * and also speculatively performing de-escaping.  Once we find the
4153 		 * end-of-field, we can match the raw field contents against the null
4154 		 * marker string.  Only after that comparison fails do we know that
4155 		 * de-escaping is actually the right thing to do; therefore we *must
4156 		 * not* throw any syntax errors before we've done the null-marker
4157 		 * check.
4158 		 */
4159 		for (;;)
4160 		{
4161 			char		c;
4162 
4163 			end_ptr = cur_ptr;
4164 			if (cur_ptr >= line_end_ptr)
4165 				break;
4166 			c = *cur_ptr++;
4167 			if (c == delimc)
4168 			{
4169 				found_delim = true;
4170 				break;
4171 			}
4172 			if (c == '\\')
4173 			{
4174 				if (cur_ptr >= line_end_ptr)
4175 					break;
4176 				c = *cur_ptr++;
4177 				switch (c)
4178 				{
4179 					case '0':
4180 					case '1':
4181 					case '2':
4182 					case '3':
4183 					case '4':
4184 					case '5':
4185 					case '6':
4186 					case '7':
4187 						{
4188 							/* handle \013 */
4189 							int			val;
4190 
4191 							val = OCTVALUE(c);
4192 							if (cur_ptr < line_end_ptr)
4193 							{
4194 								c = *cur_ptr;
4195 								if (ISOCTAL(c))
4196 								{
4197 									cur_ptr++;
4198 									val = (val << 3) + OCTVALUE(c);
4199 									if (cur_ptr < line_end_ptr)
4200 									{
4201 										c = *cur_ptr;
4202 										if (ISOCTAL(c))
4203 										{
4204 											cur_ptr++;
4205 											val = (val << 3) + OCTVALUE(c);
4206 										}
4207 									}
4208 								}
4209 							}
4210 							c = val & 0377;
4211 							if (c == '\0' || IS_HIGHBIT_SET(c))
4212 								saw_non_ascii = true;
4213 						}
4214 						break;
4215 					case 'x':
4216 						/* Handle \x3F */
4217 						if (cur_ptr < line_end_ptr)
4218 						{
4219 							char		hexchar = *cur_ptr;
4220 
4221 							if (isxdigit((unsigned char) hexchar))
4222 							{
4223 								int			val = GetDecimalFromHex(hexchar);
4224 
4225 								cur_ptr++;
4226 								if (cur_ptr < line_end_ptr)
4227 								{
4228 									hexchar = *cur_ptr;
4229 									if (isxdigit((unsigned char) hexchar))
4230 									{
4231 										cur_ptr++;
4232 										val = (val << 4) + GetDecimalFromHex(hexchar);
4233 									}
4234 								}
4235 								c = val & 0xff;
4236 								if (c == '\0' || IS_HIGHBIT_SET(c))
4237 									saw_non_ascii = true;
4238 							}
4239 						}
4240 						break;
4241 					case 'b':
4242 						c = '\b';
4243 						break;
4244 					case 'f':
4245 						c = '\f';
4246 						break;
4247 					case 'n':
4248 						c = '\n';
4249 						break;
4250 					case 'r':
4251 						c = '\r';
4252 						break;
4253 					case 't':
4254 						c = '\t';
4255 						break;
4256 					case 'v':
4257 						c = '\v';
4258 						break;
4259 
4260 						/*
4261 						 * in all other cases, take the char after '\'
4262 						 * literally
4263 						 */
4264 				}
4265 			}
4266 
4267 			/* Add c to output string */
4268 			*output_ptr++ = c;
4269 		}
4270 
4271 		/* Check whether raw input matched null marker */
4272 		input_len = end_ptr - start_ptr;
4273 		if (input_len == cstate->null_print_len &&
4274 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
4275 			cstate->raw_fields[fieldno] = NULL;
4276 		else
4277 		{
4278 			/*
4279 			 * At this point we know the field is supposed to contain data.
4280 			 *
4281 			 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4282 			 * resulting string is valid data for the db encoding.
4283 			 */
4284 			if (saw_non_ascii)
4285 			{
4286 				char	   *fld = cstate->raw_fields[fieldno];
4287 
4288 				pg_verifymbstr(fld, output_ptr - fld, false);
4289 			}
4290 		}
4291 
4292 		/* Terminate attribute value in output area */
4293 		*output_ptr++ = '\0';
4294 
4295 		fieldno++;
4296 		/* Done if we hit EOL instead of a delim */
4297 		if (!found_delim)
4298 			break;
4299 	}
4300 
4301 	/* Clean up state of attribute_buf */
4302 	output_ptr--;
4303 	Assert(*output_ptr == '\0');
4304 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4305 
4306 	return fieldno;
4307 }
4308 
4309 /*
4310  * Parse the current line into separate attributes (fields),
4311  * performing de-escaping as needed.  This has exactly the same API as
4312  * CopyReadAttributesText, except we parse the fields according to
4313  * "standard" (i.e. common) CSV usage.
4314  */
4315 static int
CopyReadAttributesCSV(CopyState cstate)4316 CopyReadAttributesCSV(CopyState cstate)
4317 {
4318 	char		delimc = cstate->delim[0];
4319 	char		quotec = cstate->quote[0];
4320 	char		escapec = cstate->escape[0];
4321 	int			fieldno;
4322 	char	   *output_ptr;
4323 	char	   *cur_ptr;
4324 	char	   *line_end_ptr;
4325 
4326 	/*
4327 	 * We need a special case for zero-column tables: check that the input
4328 	 * line is empty, and return.
4329 	 */
4330 	if (cstate->max_fields <= 0)
4331 	{
4332 		if (cstate->line_buf.len != 0)
4333 			ereport(ERROR,
4334 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4335 					 errmsg("extra data after last expected column")));
4336 		return 0;
4337 	}
4338 
4339 	resetStringInfo(&cstate->attribute_buf);
4340 
4341 	/*
4342 	 * The de-escaped attributes will certainly not be longer than the input
4343 	 * data line, so we can just force attribute_buf to be large enough and
4344 	 * then transfer data without any checks for enough space.  We need to do
4345 	 * it this way because enlarging attribute_buf mid-stream would invalidate
4346 	 * pointers already stored into cstate->raw_fields[].
4347 	 */
4348 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4349 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4350 	output_ptr = cstate->attribute_buf.data;
4351 
4352 	/* set pointer variables for loop */
4353 	cur_ptr = cstate->line_buf.data;
4354 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4355 
4356 	/* Outer loop iterates over fields */
4357 	fieldno = 0;
4358 	for (;;)
4359 	{
4360 		bool		found_delim = false;
4361 		bool		saw_quote = false;
4362 		char	   *start_ptr;
4363 		char	   *end_ptr;
4364 		int			input_len;
4365 
4366 		/* Make sure there is enough space for the next value */
4367 		if (fieldno >= cstate->max_fields)
4368 		{
4369 			cstate->max_fields *= 2;
4370 			cstate->raw_fields =
4371 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4372 		}
4373 
4374 		/* Remember start of field on both input and output sides */
4375 		start_ptr = cur_ptr;
4376 		cstate->raw_fields[fieldno] = output_ptr;
4377 
4378 		/*
4379 		 * Scan data for field,
4380 		 *
4381 		 * The loop starts in "not quote" mode and then toggles between that
4382 		 * and "in quote" mode. The loop exits normally if it is in "not
4383 		 * quote" mode and a delimiter or line end is seen.
4384 		 */
4385 		for (;;)
4386 		{
4387 			char		c;
4388 
4389 			/* Not in quote */
4390 			for (;;)
4391 			{
4392 				end_ptr = cur_ptr;
4393 				if (cur_ptr >= line_end_ptr)
4394 					goto endfield;
4395 				c = *cur_ptr++;
4396 				/* unquoted field delimiter */
4397 				if (c == delimc)
4398 				{
4399 					found_delim = true;
4400 					goto endfield;
4401 				}
4402 				/* start of quoted field (or part of field) */
4403 				if (c == quotec)
4404 				{
4405 					saw_quote = true;
4406 					break;
4407 				}
4408 				/* Add c to output string */
4409 				*output_ptr++ = c;
4410 			}
4411 
4412 			/* In quote */
4413 			for (;;)
4414 			{
4415 				end_ptr = cur_ptr;
4416 				if (cur_ptr >= line_end_ptr)
4417 					ereport(ERROR,
4418 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4419 							 errmsg("unterminated CSV quoted field")));
4420 
4421 				c = *cur_ptr++;
4422 
4423 				/* escape within a quoted field */
4424 				if (c == escapec)
4425 				{
4426 					/*
4427 					 * peek at the next char if available, and escape it if it
4428 					 * is an escape char or a quote char
4429 					 */
4430 					if (cur_ptr < line_end_ptr)
4431 					{
4432 						char		nextc = *cur_ptr;
4433 
4434 						if (nextc == escapec || nextc == quotec)
4435 						{
4436 							*output_ptr++ = nextc;
4437 							cur_ptr++;
4438 							continue;
4439 						}
4440 					}
4441 				}
4442 
4443 				/*
4444 				 * end of quoted field. Must do this test after testing for
4445 				 * escape in case quote char and escape char are the same
4446 				 * (which is the common case).
4447 				 */
4448 				if (c == quotec)
4449 					break;
4450 
4451 				/* Add c to output string */
4452 				*output_ptr++ = c;
4453 			}
4454 		}
4455 endfield:
4456 
4457 		/* Terminate attribute value in output area */
4458 		*output_ptr++ = '\0';
4459 
4460 		/* Check whether raw input matched null marker */
4461 		input_len = end_ptr - start_ptr;
4462 		if (!saw_quote && input_len == cstate->null_print_len &&
4463 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
4464 			cstate->raw_fields[fieldno] = NULL;
4465 
4466 		fieldno++;
4467 		/* Done if we hit EOL instead of a delim */
4468 		if (!found_delim)
4469 			break;
4470 	}
4471 
4472 	/* Clean up state of attribute_buf */
4473 	output_ptr--;
4474 	Assert(*output_ptr == '\0');
4475 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4476 
4477 	return fieldno;
4478 }
4479 
4480 
4481 /*
4482  * Read a binary attribute
4483  */
4484 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4485 CopyReadBinaryAttribute(CopyState cstate,
4486 						int column_no, FmgrInfo *flinfo,
4487 						Oid typioparam, int32 typmod,
4488 						bool *isnull)
4489 {
4490 	int32		fld_size;
4491 	Datum		result;
4492 
4493 	if (!CopyGetInt32(cstate, &fld_size))
4494 		ereport(ERROR,
4495 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4496 				 errmsg("unexpected EOF in COPY data")));
4497 	if (fld_size == -1)
4498 	{
4499 		*isnull = true;
4500 		return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4501 	}
4502 	if (fld_size < 0)
4503 		ereport(ERROR,
4504 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4505 				 errmsg("invalid field size")));
4506 
4507 	/* reset attribute_buf to empty, and load raw data in it */
4508 	resetStringInfo(&cstate->attribute_buf);
4509 
4510 	enlargeStringInfo(&cstate->attribute_buf, fld_size);
4511 	if (CopyGetData(cstate, cstate->attribute_buf.data,
4512 					fld_size, fld_size) != fld_size)
4513 		ereport(ERROR,
4514 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4515 				 errmsg("unexpected EOF in COPY data")));
4516 
4517 	cstate->attribute_buf.len = fld_size;
4518 	cstate->attribute_buf.data[fld_size] = '\0';
4519 
4520 	/* Call the column type's binary input converter */
4521 	result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4522 								 typioparam, typmod);
4523 
4524 	/* Trouble if it didn't eat the whole buffer */
4525 	if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4526 		ereport(ERROR,
4527 				(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4528 				 errmsg("incorrect binary data format")));
4529 
4530 	*isnull = false;
4531 	return result;
4532 }
4533 
4534 /*
4535  * Send text representation of one attribute, with conversion and escaping
4536  */
4537 #define DUMPSOFAR() \
4538 	do { \
4539 		if (ptr > start) \
4540 			CopySendData(cstate, start, ptr - start); \
4541 	} while (0)
4542 
4543 static void
CopyAttributeOutText(CopyState cstate,char * string)4544 CopyAttributeOutText(CopyState cstate, char *string)
4545 {
4546 	char	   *ptr;
4547 	char	   *start;
4548 	char		c;
4549 	char		delimc = cstate->delim[0];
4550 
4551 	if (cstate->need_transcoding)
4552 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4553 	else
4554 		ptr = string;
4555 
4556 	/*
4557 	 * We have to grovel through the string searching for control characters
4558 	 * and instances of the delimiter character.  In most cases, though, these
4559 	 * are infrequent.  To avoid overhead from calling CopySendData once per
4560 	 * character, we dump out all characters between escaped characters in a
4561 	 * single call.  The loop invariant is that the data from "start" to "ptr"
4562 	 * can be sent literally, but hasn't yet been.
4563 	 *
4564 	 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4565 	 * in valid backend encodings, extra bytes of a multibyte character never
4566 	 * look like ASCII.  This loop is sufficiently performance-critical that
4567 	 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4568 	 * of the normal safe-encoding path.
4569 	 */
4570 	if (cstate->encoding_embeds_ascii)
4571 	{
4572 		start = ptr;
4573 		while ((c = *ptr) != '\0')
4574 		{
4575 			if ((unsigned char) c < (unsigned char) 0x20)
4576 			{
4577 				/*
4578 				 * \r and \n must be escaped, the others are traditional. We
4579 				 * prefer to dump these using the C-like notation, rather than
4580 				 * a backslash and the literal character, because it makes the
4581 				 * dump file a bit more proof against Microsoftish data
4582 				 * mangling.
4583 				 */
4584 				switch (c)
4585 				{
4586 					case '\b':
4587 						c = 'b';
4588 						break;
4589 					case '\f':
4590 						c = 'f';
4591 						break;
4592 					case '\n':
4593 						c = 'n';
4594 						break;
4595 					case '\r':
4596 						c = 'r';
4597 						break;
4598 					case '\t':
4599 						c = 't';
4600 						break;
4601 					case '\v':
4602 						c = 'v';
4603 						break;
4604 					default:
4605 						/* If it's the delimiter, must backslash it */
4606 						if (c == delimc)
4607 							break;
4608 						/* All ASCII control chars are length 1 */
4609 						ptr++;
4610 						continue;	/* fall to end of loop */
4611 				}
4612 				/* if we get here, we need to convert the control char */
4613 				DUMPSOFAR();
4614 				CopySendChar(cstate, '\\');
4615 				CopySendChar(cstate, c);
4616 				start = ++ptr;	/* do not include char in next run */
4617 			}
4618 			else if (c == '\\' || c == delimc)
4619 			{
4620 				DUMPSOFAR();
4621 				CopySendChar(cstate, '\\');
4622 				start = ptr++;	/* we include char in next run */
4623 			}
4624 			else if (IS_HIGHBIT_SET(c))
4625 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4626 			else
4627 				ptr++;
4628 		}
4629 	}
4630 	else
4631 	{
4632 		start = ptr;
4633 		while ((c = *ptr) != '\0')
4634 		{
4635 			if ((unsigned char) c < (unsigned char) 0x20)
4636 			{
4637 				/*
4638 				 * \r and \n must be escaped, the others are traditional. We
4639 				 * prefer to dump these using the C-like notation, rather than
4640 				 * a backslash and the literal character, because it makes the
4641 				 * dump file a bit more proof against Microsoftish data
4642 				 * mangling.
4643 				 */
4644 				switch (c)
4645 				{
4646 					case '\b':
4647 						c = 'b';
4648 						break;
4649 					case '\f':
4650 						c = 'f';
4651 						break;
4652 					case '\n':
4653 						c = 'n';
4654 						break;
4655 					case '\r':
4656 						c = 'r';
4657 						break;
4658 					case '\t':
4659 						c = 't';
4660 						break;
4661 					case '\v':
4662 						c = 'v';
4663 						break;
4664 					default:
4665 						/* If it's the delimiter, must backslash it */
4666 						if (c == delimc)
4667 							break;
4668 						/* All ASCII control chars are length 1 */
4669 						ptr++;
4670 						continue;	/* fall to end of loop */
4671 				}
4672 				/* if we get here, we need to convert the control char */
4673 				DUMPSOFAR();
4674 				CopySendChar(cstate, '\\');
4675 				CopySendChar(cstate, c);
4676 				start = ++ptr;	/* do not include char in next run */
4677 			}
4678 			else if (c == '\\' || c == delimc)
4679 			{
4680 				DUMPSOFAR();
4681 				CopySendChar(cstate, '\\');
4682 				start = ptr++;	/* we include char in next run */
4683 			}
4684 			else
4685 				ptr++;
4686 		}
4687 	}
4688 
4689 	DUMPSOFAR();
4690 }
4691 
4692 /*
4693  * Send text representation of one attribute, with conversion and
4694  * CSV-style escaping
4695  */
4696 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4697 CopyAttributeOutCSV(CopyState cstate, char *string,
4698 					bool use_quote, bool single_attr)
4699 {
4700 	char	   *ptr;
4701 	char	   *start;
4702 	char		c;
4703 	char		delimc = cstate->delim[0];
4704 	char		quotec = cstate->quote[0];
4705 	char		escapec = cstate->escape[0];
4706 
4707 	/* force quoting if it matches null_print (before conversion!) */
4708 	if (!use_quote && strcmp(string, cstate->null_print) == 0)
4709 		use_quote = true;
4710 
4711 	if (cstate->need_transcoding)
4712 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4713 	else
4714 		ptr = string;
4715 
4716 	/*
4717 	 * Make a preliminary pass to discover if it needs quoting
4718 	 */
4719 	if (!use_quote)
4720 	{
4721 		/*
4722 		 * Because '\.' can be a data value, quote it if it appears alone on a
4723 		 * line so it is not interpreted as the end-of-data marker.
4724 		 */
4725 		if (single_attr && strcmp(ptr, "\\.") == 0)
4726 			use_quote = true;
4727 		else
4728 		{
4729 			char	   *tptr = ptr;
4730 
4731 			while ((c = *tptr) != '\0')
4732 			{
4733 				if (c == delimc || c == quotec || c == '\n' || c == '\r')
4734 				{
4735 					use_quote = true;
4736 					break;
4737 				}
4738 				if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4739 					tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4740 				else
4741 					tptr++;
4742 			}
4743 		}
4744 	}
4745 
4746 	if (use_quote)
4747 	{
4748 		CopySendChar(cstate, quotec);
4749 
4750 		/*
4751 		 * We adopt the same optimization strategy as in CopyAttributeOutText
4752 		 */
4753 		start = ptr;
4754 		while ((c = *ptr) != '\0')
4755 		{
4756 			if (c == quotec || c == escapec)
4757 			{
4758 				DUMPSOFAR();
4759 				CopySendChar(cstate, escapec);
4760 				start = ptr;	/* we include char in next run */
4761 			}
4762 			if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4763 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4764 			else
4765 				ptr++;
4766 		}
4767 		DUMPSOFAR();
4768 
4769 		CopySendChar(cstate, quotec);
4770 	}
4771 	else
4772 	{
4773 		/* If it doesn't need quoting, we can just dump it as-is */
4774 		CopySendString(cstate, ptr);
4775 	}
4776 }
4777 
4778 /*
4779  * CopyGetAttnums - build an integer list of attnums to be copied
4780  *
4781  * The input attnamelist is either the user-specified column list,
4782  * or NIL if there was none (in which case we want all the non-dropped
4783  * columns).
4784  *
4785  * rel can be NULL ... it's only used for error reports.
4786  */
4787 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4788 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4789 {
4790 	List	   *attnums = NIL;
4791 
4792 	if (attnamelist == NIL)
4793 	{
4794 		/* Generate default column list */
4795 		int			attr_count = tupDesc->natts;
4796 		int			i;
4797 
4798 		for (i = 0; i < attr_count; i++)
4799 		{
4800 			if (TupleDescAttr(tupDesc, i)->attisdropped)
4801 				continue;
4802 			attnums = lappend_int(attnums, i + 1);
4803 		}
4804 	}
4805 	else
4806 	{
4807 		/* Validate the user-supplied list and extract attnums */
4808 		ListCell   *l;
4809 
4810 		foreach(l, attnamelist)
4811 		{
4812 			char	   *name = strVal(lfirst(l));
4813 			int			attnum;
4814 			int			i;
4815 
4816 			/* Lookup column name */
4817 			attnum = InvalidAttrNumber;
4818 			for (i = 0; i < tupDesc->natts; i++)
4819 			{
4820 				Form_pg_attribute att = TupleDescAttr(tupDesc, i);
4821 
4822 				if (att->attisdropped)
4823 					continue;
4824 				if (namestrcmp(&(att->attname), name) == 0)
4825 				{
4826 					attnum = att->attnum;
4827 					break;
4828 				}
4829 			}
4830 			if (attnum == InvalidAttrNumber)
4831 			{
4832 				if (rel != NULL)
4833 					ereport(ERROR,
4834 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4835 							 errmsg("column \"%s\" of relation \"%s\" does not exist",
4836 									name, RelationGetRelationName(rel))));
4837 				else
4838 					ereport(ERROR,
4839 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4840 							 errmsg("column \"%s\" does not exist",
4841 									name)));
4842 			}
4843 			/* Check for duplicates */
4844 			if (list_member_int(attnums, attnum))
4845 				ereport(ERROR,
4846 						(errcode(ERRCODE_DUPLICATE_COLUMN),
4847 						 errmsg("column \"%s\" specified more than once",
4848 								name)));
4849 			attnums = lappend_int(attnums, attnum);
4850 		}
4851 	}
4852 
4853 	return attnums;
4854 }
4855 
4856 
4857 /*
4858  * copy_dest_startup --- executor startup
4859  */
4860 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4861 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4862 {
4863 	/* no-op */
4864 }
4865 
4866 /*
4867  * copy_dest_receive --- receive one tuple
4868  */
4869 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4870 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4871 {
4872 	DR_copy    *myState = (DR_copy *) self;
4873 	CopyState	cstate = myState->cstate;
4874 
4875 	/* Make sure the tuple is fully deconstructed */
4876 	slot_getallattrs(slot);
4877 
4878 	/* And send the data */
4879 	CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4880 	myState->processed++;
4881 
4882 	return true;
4883 }
4884 
4885 /*
4886  * copy_dest_shutdown --- executor end
4887  */
4888 static void
copy_dest_shutdown(DestReceiver * self)4889 copy_dest_shutdown(DestReceiver *self)
4890 {
4891 	/* no-op */
4892 }
4893 
4894 /*
4895  * copy_dest_destroy --- release DestReceiver object
4896  */
4897 static void
copy_dest_destroy(DestReceiver * self)4898 copy_dest_destroy(DestReceiver *self)
4899 {
4900 	pfree(self);
4901 }
4902 
4903 /*
4904  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4905  */
4906 DestReceiver *
CreateCopyDestReceiver(void)4907 CreateCopyDestReceiver(void)
4908 {
4909 	DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));
4910 
4911 	self->pub.receiveSlot = copy_dest_receive;
4912 	self->pub.rStartup = copy_dest_startup;
4913 	self->pub.rShutdown = copy_dest_shutdown;
4914 	self->pub.rDestroy = copy_dest_destroy;
4915 	self->pub.mydest = DestCopyOut;
4916 
4917 	self->cstate = NULL;		/* will be set later */
4918 	self->processed = 0;
4919 
4920 	return (DestReceiver *) self;
4921 }
4922