1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  *		Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22 
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "parser/parse_relation.h"
41 #include "rewrite/rewriteHandler.h"
42 #include "storage/fd.h"
43 #include "tcop/tcopprot.h"
44 #include "utils/builtins.h"
45 #include "utils/lsyscache.h"
46 #include "utils/memutils.h"
47 #include "utils/portal.h"
48 #include "utils/rel.h"
49 #include "utils/rls.h"
50 #include "utils/snapmgr.h"
51 
52 
53 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
54 #define OCTVALUE(c) ((c) - '0')
55 
56 /*
57  * Represents the different source/dest cases we need to worry about at
58  * the bottom level
59  */
60 typedef enum CopyDest
61 {
62 	COPY_FILE,					/* to/from file (or a piped program) */
63 	COPY_OLD_FE,				/* to/from frontend (2.0 protocol) */
64 	COPY_NEW_FE,				/* to/from frontend (3.0 protocol) */
65 	COPY_CALLBACK				/* to/from callback function */
66 } CopyDest;
67 
68 /*
69  *	Represents the end-of-line terminator type of the input
70  */
71 typedef enum EolType
72 {
73 	EOL_UNKNOWN,
74 	EOL_NL,
75 	EOL_CR,
76 	EOL_CRNL
77 } EolType;
78 
79 /*
80  * This struct contains all the state variables used throughout a COPY
81  * operation. For simplicity, we use the same struct for all variants of COPY,
82  * even though some fields are used in only some cases.
83  *
84  * Multi-byte encodings: all supported client-side encodings encode multi-byte
85  * characters by having the first byte's high bit set. Subsequent bytes of the
86  * character can have the high bit not set. When scanning data in such an
87  * encoding to look for a match to a single-byte (ie ASCII) character, we must
88  * use the full pg_encoding_mblen() machinery to skip over multibyte
89  * characters, else we might find a false match to a trailing byte. In
90  * supported server encodings, there is no possibility of a false match, and
91  * it's faster to make useless comparisons to trailing bytes than it is to
92  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
93  * when we have to do it the hard way.
94  */
95 typedef struct CopyStateData
96 {
97 	/* low-level state data */
98 	CopyDest	copy_dest;		/* type of copy source/destination */
99 	FILE	   *copy_file;		/* used if copy_dest == COPY_FILE */
100 	StringInfo	fe_msgbuf;		/* used for all dests during COPY TO, only for
101 								 * dest == COPY_NEW_FE in COPY FROM */
102 	bool		is_copy_from;	/* COPY TO, or COPY FROM? */
103 	bool		reached_eof;	/* true if we read to end of copy data (not
104 								 * all copy_dest types maintain this) */
105 	EolType		eol_type;		/* EOL type of input */
106 	int			file_encoding;	/* file or remote side's character encoding */
107 	bool		need_transcoding;	/* file encoding diff from server? */
108 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
109 
110 	/* parameters from the COPY command */
111 	Relation	rel;			/* relation to copy to or from */
112 	QueryDesc  *queryDesc;		/* executable query to copy from */
113 	List	   *attnumlist;		/* integer list of attnums to copy */
114 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
115 	bool		is_program;		/* is 'filename' a program to popen? */
116 	copy_data_source_cb data_source_cb; /* function for reading data */
117 	bool		binary;			/* binary format? */
118 	bool		oids;			/* include OIDs? */
119 	bool		freeze;			/* freeze rows on loading? */
120 	bool		csv_mode;		/* Comma Separated Value format? */
121 	bool		header_line;	/* CSV header line? */
122 	char	   *null_print;		/* NULL marker string (server encoding!) */
123 	int			null_print_len; /* length of same */
124 	char	   *null_print_client;	/* same converted to file encoding */
125 	char	   *delim;			/* column delimiter (must be 1 byte) */
126 	char	   *quote;			/* CSV quote char (must be 1 byte) */
127 	char	   *escape;			/* CSV escape char (must be 1 byte) */
128 	List	   *force_quote;	/* list of column names */
129 	bool		force_quote_all;	/* FORCE_QUOTE *? */
130 	bool	   *force_quote_flags;	/* per-column CSV FQ flags */
131 	List	   *force_notnull;	/* list of column names */
132 	bool	   *force_notnull_flags;	/* per-column CSV FNN flags */
133 	List	   *force_null;		/* list of column names */
134 	bool	   *force_null_flags;	/* per-column CSV FN flags */
135 	bool		convert_selectively;	/* do selective binary conversion? */
136 	List	   *convert_select; /* list of column names (can be NIL) */
137 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
138 
139 	/* these are just for error messages, see CopyFromErrorCallback */
140 	const char *cur_relname;	/* table name for error messages */
141 	uint64		cur_lineno;		/* line number for error messages */
142 	const char *cur_attname;	/* current att for error messages */
143 	const char *cur_attval;		/* current att value for error messages */
144 
145 	/*
146 	 * Working state for COPY TO/FROM
147 	 */
148 	MemoryContext copycontext;	/* per-copy execution context */
149 
150 	/*
151 	 * Working state for COPY TO
152 	 */
153 	FmgrInfo   *out_functions;	/* lookup info for output functions */
154 	MemoryContext rowcontext;	/* per-row evaluation context */
155 
156 	/*
157 	 * Working state for COPY FROM
158 	 */
159 	AttrNumber	num_defaults;
160 	bool		file_has_oids;
161 	FmgrInfo	oid_in_function;
162 	Oid			oid_typioparam;
163 	FmgrInfo   *in_functions;	/* array of input functions for each attrs */
164 	Oid		   *typioparams;	/* array of element types for in_functions */
165 	int		   *defmap;			/* array of default att numbers */
166 	ExprState **defexprs;		/* array of default att expressions */
167 	bool		volatile_defexprs;	/* is any of defexprs volatile? */
168 	List	   *range_table;
169 
170 	PartitionDispatch *partition_dispatch_info;
171 	int			num_dispatch;	/* Number of entries in the above array */
172 	int			num_partitions; /* Number of members in the following arrays */
173 	ResultRelInfo *partitions;	/* Per partition result relation */
174 	TupleConversionMap **partition_tupconv_maps;
175 	TupleTableSlot *partition_tuple_slot;
176 	TransitionCaptureState *transition_capture;
177 	TupleConversionMap **transition_tupconv_maps;
178 
179 	/*
180 	 * These variables are used to reduce overhead in textual COPY FROM.
181 	 *
182 	 * attribute_buf holds the separated, de-escaped text for each field of
183 	 * the current line.  The CopyReadAttributes functions return arrays of
184 	 * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
185 	 * the buffer on each cycle.
186 	 */
187 	StringInfoData attribute_buf;
188 
189 	/* field raw data pointers found by COPY FROM */
190 
191 	int			max_fields;
192 	char	  **raw_fields;
193 
194 	/*
195 	 * Similarly, line_buf holds the whole input line being processed. The
196 	 * input cycle is first to read the whole line into line_buf, convert it
197 	 * to server encoding there, and then extract the individual attribute
198 	 * fields into attribute_buf.  line_buf is preserved unmodified so that we
199 	 * can display it in error messages if appropriate.
200 	 */
201 	StringInfoData line_buf;
202 	bool		line_buf_converted; /* converted to server encoding? */
203 	bool		line_buf_valid; /* contains the row being processed? */
204 
205 	/*
206 	 * Finally, raw_buf holds raw data read from the data source (file or
207 	 * client connection).  CopyReadLine parses this data sufficiently to
208 	 * locate line boundaries, then transfers the data to line_buf and
209 	 * converts it.  Note: we guarantee that there is a \0 at
210 	 * raw_buf[raw_buf_len].
211 	 */
212 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
213 	char	   *raw_buf;
214 	int			raw_buf_index;	/* next byte to process */
215 	int			raw_buf_len;	/* total # of bytes stored */
216 } CopyStateData;
217 
218 /* DestReceiver for COPY (query) TO */
219 typedef struct
220 {
221 	DestReceiver pub;			/* publicly-known function pointers */
222 	CopyState	cstate;			/* CopyStateData for the command */
223 	uint64		processed;		/* # of tuples processed */
224 } DR_copy;
225 
226 
227 /*
228  * These macros centralize code used to process line_buf and raw_buf buffers.
229  * They are macros because they often do continue/break control and to avoid
230  * function call overhead in tight COPY loops.
231  *
232  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
233  * prevent the continue/break processing from working.  We end the "if (1)"
234  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
235  * any "else" in the calling code, and to avoid any compiler warnings about
236  * empty statements.  See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
237  */
238 
239 /*
240  * This keeps the character read at the top of the loop in the buffer
241  * even if there is more than one read-ahead.
242  */
243 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
244 if (1) \
245 { \
246 	if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
247 	{ \
248 		raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
249 		need_data = true; \
250 		continue; \
251 	} \
252 } else ((void) 0)
253 
254 /* This consumes the remainder of the buffer and breaks */
255 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
256 if (1) \
257 { \
258 	if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
259 	{ \
260 		if (extralen) \
261 			raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
262 		/* backslash just before EOF, treat as data char */ \
263 		result = true; \
264 		break; \
265 	} \
266 } else ((void) 0)
267 
268 /*
269  * Transfer any approved data to line_buf; must do this to be sure
270  * there is some room in raw_buf.
271  */
272 #define REFILL_LINEBUF \
273 if (1) \
274 { \
275 	if (raw_buf_ptr > cstate->raw_buf_index) \
276 	{ \
277 		appendBinaryStringInfo(&cstate->line_buf, \
278 							 cstate->raw_buf + cstate->raw_buf_index, \
279 							   raw_buf_ptr - cstate->raw_buf_index); \
280 		cstate->raw_buf_index = raw_buf_ptr; \
281 	} \
282 } else ((void) 0)
283 
284 /* Undo any read-ahead and jump out of the block. */
285 #define NO_END_OF_COPY_GOTO \
286 if (1) \
287 { \
288 	raw_buf_ptr = prev_raw_ptr + 1; \
289 	goto not_end_of_copy; \
290 } else ((void) 0)
291 
292 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
293 
294 
295 /* non-export function prototypes */
296 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
297 		  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
298 		  List *options);
299 static void EndCopy(CopyState cstate);
300 static void ClosePipeToProgram(CopyState cstate);
301 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
302 			Oid queryRelId, const char *filename, bool is_program,
303 			List *attnamelist, List *options);
304 static void EndCopyTo(CopyState cstate);
305 static uint64 DoCopyTo(CopyState cstate);
306 static uint64 CopyTo(CopyState cstate);
307 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
308 			 Datum *values, bool *nulls);
309 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
310 					CommandId mycid, int hi_options,
311 					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
312 					BulkInsertState bistate,
313 					int nBufferedTuples, HeapTuple *bufferedTuples,
314 					uint64 firstBufferedLineNo);
315 static bool CopyReadLine(CopyState cstate);
316 static bool CopyReadLineText(CopyState cstate);
317 static int	CopyReadAttributesText(CopyState cstate);
318 static int	CopyReadAttributesCSV(CopyState cstate);
319 static Datum CopyReadBinaryAttribute(CopyState cstate,
320 						int column_no, FmgrInfo *flinfo,
321 						Oid typioparam, int32 typmod,
322 						bool *isnull);
323 static void CopyAttributeOutText(CopyState cstate, char *string);
324 static void CopyAttributeOutCSV(CopyState cstate, char *string,
325 					bool use_quote, bool single_attr);
326 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
327 			   List *attnamelist);
328 static char *limit_printout_length(const char *str);
329 
330 /* Low-level communications functions */
331 static void SendCopyBegin(CopyState cstate);
332 static void ReceiveCopyBegin(CopyState cstate);
333 static void SendCopyEnd(CopyState cstate);
334 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
335 static void CopySendString(CopyState cstate, const char *str);
336 static void CopySendChar(CopyState cstate, char c);
337 static void CopySendEndOfRow(CopyState cstate);
338 static int CopyGetData(CopyState cstate, void *databuf,
339 			int minread, int maxread);
340 static void CopySendInt32(CopyState cstate, int32 val);
341 static bool CopyGetInt32(CopyState cstate, int32 *val);
342 static void CopySendInt16(CopyState cstate, int16 val);
343 static bool CopyGetInt16(CopyState cstate, int16 *val);
344 
345 
346 /*
347  * Send copy start/stop messages for frontend copies.  These have changed
348  * in past protocol redesigns.
349  */
350 static void
SendCopyBegin(CopyState cstate)351 SendCopyBegin(CopyState cstate)
352 {
353 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
354 	{
355 		/* new way */
356 		StringInfoData buf;
357 		int			natts = list_length(cstate->attnumlist);
358 		int16		format = (cstate->binary ? 1 : 0);
359 		int			i;
360 
361 		pq_beginmessage(&buf, 'H');
362 		pq_sendbyte(&buf, format);	/* overall format */
363 		pq_sendint(&buf, natts, 2);
364 		for (i = 0; i < natts; i++)
365 			pq_sendint(&buf, format, 2);	/* per-column formats */
366 		pq_endmessage(&buf);
367 		cstate->copy_dest = COPY_NEW_FE;
368 	}
369 	else
370 	{
371 		/* old way */
372 		if (cstate->binary)
373 			ereport(ERROR,
374 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
375 					 errmsg("COPY BINARY is not supported to stdout or from stdin")));
376 		pq_putemptymessage('H');
377 		/* grottiness needed for old COPY OUT protocol */
378 		pq_startcopyout();
379 		cstate->copy_dest = COPY_OLD_FE;
380 	}
381 }
382 
383 static void
ReceiveCopyBegin(CopyState cstate)384 ReceiveCopyBegin(CopyState cstate)
385 {
386 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
387 	{
388 		/* new way */
389 		StringInfoData buf;
390 		int			natts = list_length(cstate->attnumlist);
391 		int16		format = (cstate->binary ? 1 : 0);
392 		int			i;
393 
394 		pq_beginmessage(&buf, 'G');
395 		pq_sendbyte(&buf, format);	/* overall format */
396 		pq_sendint(&buf, natts, 2);
397 		for (i = 0; i < natts; i++)
398 			pq_sendint(&buf, format, 2);	/* per-column formats */
399 		pq_endmessage(&buf);
400 		cstate->copy_dest = COPY_NEW_FE;
401 		cstate->fe_msgbuf = makeStringInfo();
402 	}
403 	else
404 	{
405 		/* old way */
406 		if (cstate->binary)
407 			ereport(ERROR,
408 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
409 					 errmsg("COPY BINARY is not supported to stdout or from stdin")));
410 		pq_putemptymessage('G');
411 		/* any error in old protocol will make us lose sync */
412 		pq_startmsgread();
413 		cstate->copy_dest = COPY_OLD_FE;
414 	}
415 	/* We *must* flush here to ensure FE knows it can send. */
416 	pq_flush();
417 }
418 
419 static void
SendCopyEnd(CopyState cstate)420 SendCopyEnd(CopyState cstate)
421 {
422 	if (cstate->copy_dest == COPY_NEW_FE)
423 	{
424 		/* Shouldn't have any unsent data */
425 		Assert(cstate->fe_msgbuf->len == 0);
426 		/* Send Copy Done message */
427 		pq_putemptymessage('c');
428 	}
429 	else
430 	{
431 		CopySendData(cstate, "\\.", 2);
432 		/* Need to flush out the trailer (this also appends a newline) */
433 		CopySendEndOfRow(cstate);
434 		pq_endcopyout(false);
435 	}
436 }
437 
438 /*----------
439  * CopySendData sends output data to the destination (file or frontend)
440  * CopySendString does the same for null-terminated strings
441  * CopySendChar does the same for single characters
442  * CopySendEndOfRow does the appropriate thing at end of each data row
443  *	(data is not actually flushed except by CopySendEndOfRow)
444  *
445  * NB: no data conversion is applied by these functions
446  *----------
447  */
448 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)449 CopySendData(CopyState cstate, const void *databuf, int datasize)
450 {
451 	appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
452 }
453 
454 static void
CopySendString(CopyState cstate,const char * str)455 CopySendString(CopyState cstate, const char *str)
456 {
457 	appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
458 }
459 
460 static void
CopySendChar(CopyState cstate,char c)461 CopySendChar(CopyState cstate, char c)
462 {
463 	appendStringInfoCharMacro(cstate->fe_msgbuf, c);
464 }
465 
466 static void
CopySendEndOfRow(CopyState cstate)467 CopySendEndOfRow(CopyState cstate)
468 {
469 	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
470 
471 	switch (cstate->copy_dest)
472 	{
473 		case COPY_FILE:
474 			if (!cstate->binary)
475 			{
476 				/* Default line termination depends on platform */
477 #ifndef WIN32
478 				CopySendChar(cstate, '\n');
479 #else
480 				CopySendString(cstate, "\r\n");
481 #endif
482 			}
483 
484 			if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
485 					   cstate->copy_file) != 1 ||
486 				ferror(cstate->copy_file))
487 			{
488 				if (cstate->is_program)
489 				{
490 					if (errno == EPIPE)
491 					{
492 						/*
493 						 * The pipe will be closed automatically on error at
494 						 * the end of transaction, but we might get a better
495 						 * error message from the subprocess' exit code than
496 						 * just "Broken Pipe"
497 						 */
498 						ClosePipeToProgram(cstate);
499 
500 						/*
501 						 * If ClosePipeToProgram() didn't throw an error, the
502 						 * program terminated normally, but closed the pipe
503 						 * first. Restore errno, and throw an error.
504 						 */
505 						errno = EPIPE;
506 					}
507 					ereport(ERROR,
508 							(errcode_for_file_access(),
509 							 errmsg("could not write to COPY program: %m")));
510 				}
511 				else
512 					ereport(ERROR,
513 							(errcode_for_file_access(),
514 							 errmsg("could not write to COPY file: %m")));
515 			}
516 			break;
517 		case COPY_OLD_FE:
518 			/* The FE/BE protocol uses \n as newline for all platforms */
519 			if (!cstate->binary)
520 				CopySendChar(cstate, '\n');
521 
522 			if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
523 			{
524 				/* no hope of recovering connection sync, so FATAL */
525 				ereport(FATAL,
526 						(errcode(ERRCODE_CONNECTION_FAILURE),
527 						 errmsg("connection lost during COPY to stdout")));
528 			}
529 			break;
530 		case COPY_NEW_FE:
531 			/* The FE/BE protocol uses \n as newline for all platforms */
532 			if (!cstate->binary)
533 				CopySendChar(cstate, '\n');
534 
535 			/* Dump the accumulated row as one CopyData message */
536 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
537 			break;
538 		case COPY_CALLBACK:
539 			Assert(false);		/* Not yet supported. */
540 			break;
541 	}
542 
543 	resetStringInfo(fe_msgbuf);
544 }
545 
546 /*
547  * CopyGetData reads data from the source (file or frontend)
548  *
549  * We attempt to read at least minread, and at most maxread, bytes from
550  * the source.  The actual number of bytes read is returned; if this is
551  * less than minread, EOF was detected.
552  *
553  * Note: when copying from the frontend, we expect a proper EOF mark per
554  * protocol; if the frontend simply drops the connection, we raise error.
555  * It seems unwise to allow the COPY IN to complete normally in that case.
556  *
557  * NB: no data conversion is applied here.
558  */
559 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)560 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
561 {
562 	int			bytesread = 0;
563 
564 	switch (cstate->copy_dest)
565 	{
566 		case COPY_FILE:
567 			bytesread = fread(databuf, 1, maxread, cstate->copy_file);
568 			if (ferror(cstate->copy_file))
569 				ereport(ERROR,
570 						(errcode_for_file_access(),
571 						 errmsg("could not read from COPY file: %m")));
572 			if (bytesread == 0)
573 				cstate->reached_eof = true;
574 			break;
575 		case COPY_OLD_FE:
576 
577 			/*
578 			 * We cannot read more than minread bytes (which in practice is 1)
579 			 * because old protocol doesn't have any clear way of separating
580 			 * the COPY stream from following data.  This is slow, but not any
581 			 * slower than the code path was originally, and we don't care
582 			 * much anymore about the performance of old protocol.
583 			 */
584 			if (pq_getbytes((char *) databuf, minread))
585 			{
586 				/* Only a \. terminator is legal EOF in old protocol */
587 				ereport(ERROR,
588 						(errcode(ERRCODE_CONNECTION_FAILURE),
589 						 errmsg("unexpected EOF on client connection with an open transaction")));
590 			}
591 			bytesread = minread;
592 			break;
593 		case COPY_NEW_FE:
594 			while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
595 			{
596 				int			avail;
597 
598 				while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
599 				{
600 					/* Try to receive another message */
601 					int			mtype;
602 
603 			readmessage:
604 					HOLD_CANCEL_INTERRUPTS();
605 					pq_startmsgread();
606 					mtype = pq_getbyte();
607 					if (mtype == EOF)
608 						ereport(ERROR,
609 								(errcode(ERRCODE_CONNECTION_FAILURE),
610 								 errmsg("unexpected EOF on client connection with an open transaction")));
611 					if (pq_getmessage(cstate->fe_msgbuf, 0))
612 						ereport(ERROR,
613 								(errcode(ERRCODE_CONNECTION_FAILURE),
614 								 errmsg("unexpected EOF on client connection with an open transaction")));
615 					RESUME_CANCEL_INTERRUPTS();
616 					switch (mtype)
617 					{
618 						case 'd':	/* CopyData */
619 							break;
620 						case 'c':	/* CopyDone */
621 							/* COPY IN correctly terminated by frontend */
622 							cstate->reached_eof = true;
623 							return bytesread;
624 						case 'f':	/* CopyFail */
625 							ereport(ERROR,
626 									(errcode(ERRCODE_QUERY_CANCELED),
627 									 errmsg("COPY from stdin failed: %s",
628 											pq_getmsgstring(cstate->fe_msgbuf))));
629 							break;
630 						case 'H':	/* Flush */
631 						case 'S':	/* Sync */
632 
633 							/*
634 							 * Ignore Flush/Sync for the convenience of client
635 							 * libraries (such as libpq) that may send those
636 							 * without noticing that the command they just
637 							 * sent was COPY.
638 							 */
639 							goto readmessage;
640 						default:
641 							ereport(ERROR,
642 									(errcode(ERRCODE_PROTOCOL_VIOLATION),
643 									 errmsg("unexpected message type 0x%02X during COPY from stdin",
644 											mtype)));
645 							break;
646 					}
647 				}
648 				avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
649 				if (avail > maxread)
650 					avail = maxread;
651 				pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
652 				databuf = (void *) ((char *) databuf + avail);
653 				maxread -= avail;
654 				bytesread += avail;
655 			}
656 			break;
657 		case COPY_CALLBACK:
658 			bytesread = cstate->data_source_cb(databuf, minread, maxread);
659 			break;
660 	}
661 
662 	return bytesread;
663 }
664 
665 
666 /*
667  * These functions do apply some data conversion
668  */
669 
670 /*
671  * CopySendInt32 sends an int32 in network byte order
672  */
673 static void
CopySendInt32(CopyState cstate,int32 val)674 CopySendInt32(CopyState cstate, int32 val)
675 {
676 	uint32		buf;
677 
678 	buf = htonl((uint32) val);
679 	CopySendData(cstate, &buf, sizeof(buf));
680 }
681 
682 /*
683  * CopyGetInt32 reads an int32 that appears in network byte order
684  *
685  * Returns true if OK, false if EOF
686  */
687 static bool
CopyGetInt32(CopyState cstate,int32 * val)688 CopyGetInt32(CopyState cstate, int32 *val)
689 {
690 	uint32		buf;
691 
692 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
693 	{
694 		*val = 0;				/* suppress compiler warning */
695 		return false;
696 	}
697 	*val = (int32) ntohl(buf);
698 	return true;
699 }
700 
701 /*
702  * CopySendInt16 sends an int16 in network byte order
703  */
704 static void
CopySendInt16(CopyState cstate,int16 val)705 CopySendInt16(CopyState cstate, int16 val)
706 {
707 	uint16		buf;
708 
709 	buf = htons((uint16) val);
710 	CopySendData(cstate, &buf, sizeof(buf));
711 }
712 
713 /*
714  * CopyGetInt16 reads an int16 that appears in network byte order
715  */
716 static bool
CopyGetInt16(CopyState cstate,int16 * val)717 CopyGetInt16(CopyState cstate, int16 *val)
718 {
719 	uint16		buf;
720 
721 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
722 	{
723 		*val = 0;				/* suppress compiler warning */
724 		return false;
725 	}
726 	*val = (int16) ntohs(buf);
727 	return true;
728 }
729 
730 
731 /*
732  * CopyLoadRawBuf loads some more data into raw_buf
733  *
734  * Returns TRUE if able to obtain at least one more byte, else FALSE.
735  *
736  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
737  * down to the start of the buffer and then we load more data after that.
738  * This case is used only when a frontend multibyte character crosses a
739  * bufferload boundary.
740  */
741 static bool
CopyLoadRawBuf(CopyState cstate)742 CopyLoadRawBuf(CopyState cstate)
743 {
744 	int			nbytes;
745 	int			inbytes;
746 
747 	if (cstate->raw_buf_index < cstate->raw_buf_len)
748 	{
749 		/* Copy down the unprocessed data */
750 		nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
751 		memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
752 				nbytes);
753 	}
754 	else
755 		nbytes = 0;				/* no data need be saved */
756 
757 	inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
758 						  1, RAW_BUF_SIZE - nbytes);
759 	nbytes += inbytes;
760 	cstate->raw_buf[nbytes] = '\0';
761 	cstate->raw_buf_index = 0;
762 	cstate->raw_buf_len = nbytes;
763 	return (inbytes > 0);
764 }
765 
766 
767 /*
768  *	 DoCopy executes the SQL COPY statement
769  *
770  * Either unload or reload contents of table <relation>, depending on <from>.
771  * (<from> = TRUE means we are inserting into the table.)  In the "TO" case
772  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
773  * or DELETE query.
774  *
775  * If <pipe> is false, transfer is between the table and the file named
776  * <filename>.  Otherwise, transfer is between the table and our regular
777  * input/output stream. The latter could be either stdin/stdout or a
778  * socket, depending on whether we're running under Postmaster control.
779  *
780  * Do not allow a Postgres user without superuser privilege to read from
781  * or write to a file.
782  *
783  * Do not allow the copy if user doesn't have proper permission to access
784  * the table or the specifically requested columns.
785  */
786 void
DoCopy(ParseState * pstate,const CopyStmt * stmt,int stmt_location,int stmt_len,uint64 * processed)787 DoCopy(ParseState *pstate, const CopyStmt *stmt,
788 	   int stmt_location, int stmt_len,
789 	   uint64 *processed)
790 {
791 	CopyState	cstate;
792 	bool		is_from = stmt->is_from;
793 	bool		pipe = (stmt->filename == NULL);
794 	Relation	rel;
795 	Oid			relid;
796 	RawStmt    *query = NULL;
797 
798 	/* Disallow COPY to/from file or program except to superusers. */
799 	if (!pipe && !superuser())
800 	{
801 		if (stmt->is_program)
802 			ereport(ERROR,
803 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
804 					 errmsg("must be superuser to COPY to or from an external program"),
805 					 errhint("Anyone can COPY to stdout or from stdin. "
806 							 "psql's \\copy command also works for anyone.")));
807 		else
808 			ereport(ERROR,
809 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
810 					 errmsg("must be superuser to COPY to or from a file"),
811 					 errhint("Anyone can COPY to stdout or from stdin. "
812 							 "psql's \\copy command also works for anyone.")));
813 	}
814 
815 	if (stmt->relation)
816 	{
817 		TupleDesc	tupDesc;
818 		List	   *attnums;
819 		ListCell   *cur;
820 		RangeTblEntry *rte;
821 
822 		Assert(!stmt->query);
823 
824 		/* Open and lock the relation, using the appropriate lock type. */
825 		rel = heap_openrv(stmt->relation,
826 						  (is_from ? RowExclusiveLock : AccessShareLock));
827 
828 		relid = RelationGetRelid(rel);
829 
830 		rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
831 		rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
832 
833 		tupDesc = RelationGetDescr(rel);
834 		attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
835 		foreach(cur, attnums)
836 		{
837 			int			attno = lfirst_int(cur) -
838 			FirstLowInvalidHeapAttributeNumber;
839 
840 			if (is_from)
841 				rte->insertedCols = bms_add_member(rte->insertedCols, attno);
842 			else
843 				rte->selectedCols = bms_add_member(rte->selectedCols, attno);
844 		}
845 		ExecCheckRTPerms(pstate->p_rtable, true);
846 
847 		/*
848 		 * Permission check for row security policies.
849 		 *
850 		 * check_enable_rls will ereport(ERROR) if the user has requested
851 		 * something invalid and will otherwise indicate if we should enable
852 		 * RLS (returns RLS_ENABLED) or not for this COPY statement.
853 		 *
854 		 * If the relation has a row security policy and we are to apply it
855 		 * then perform a "query" copy and allow the normal query processing
856 		 * to handle the policies.
857 		 *
858 		 * If RLS is not enabled for this, then just fall through to the
859 		 * normal non-filtering relation handling.
860 		 */
861 		if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
862 		{
863 			SelectStmt *select;
864 			ColumnRef  *cr;
865 			ResTarget  *target;
866 			RangeVar   *from;
867 			List	   *targetList = NIL;
868 
869 			if (is_from)
870 				ereport(ERROR,
871 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 						 errmsg("COPY FROM not supported with row-level security"),
873 						 errhint("Use INSERT statements instead.")));
874 
875 			/*
876 			 * Build target list
877 			 *
878 			 * If no columns are specified in the attribute list of the COPY
879 			 * command, then the target list is 'all' columns. Therefore, '*'
880 			 * should be used as the target list for the resulting SELECT
881 			 * statement.
882 			 *
883 			 * In the case that columns are specified in the attribute list,
884 			 * create a ColumnRef and ResTarget for each column and add them
885 			 * to the target list for the resulting SELECT statement.
886 			 */
887 			if (!stmt->attlist)
888 			{
889 				cr = makeNode(ColumnRef);
890 				cr->fields = list_make1(makeNode(A_Star));
891 				cr->location = -1;
892 
893 				target = makeNode(ResTarget);
894 				target->name = NULL;
895 				target->indirection = NIL;
896 				target->val = (Node *) cr;
897 				target->location = -1;
898 
899 				targetList = list_make1(target);
900 			}
901 			else
902 			{
903 				ListCell   *lc;
904 
905 				foreach(lc, stmt->attlist)
906 				{
907 					/*
908 					 * Build the ColumnRef for each column.  The ColumnRef
909 					 * 'fields' property is a String 'Value' node (see
910 					 * nodes/value.h) that corresponds to the column name
911 					 * respectively.
912 					 */
913 					cr = makeNode(ColumnRef);
914 					cr->fields = list_make1(lfirst(lc));
915 					cr->location = -1;
916 
917 					/* Build the ResTarget and add the ColumnRef to it. */
918 					target = makeNode(ResTarget);
919 					target->name = NULL;
920 					target->indirection = NIL;
921 					target->val = (Node *) cr;
922 					target->location = -1;
923 
924 					/* Add each column to the SELECT statement's target list */
925 					targetList = lappend(targetList, target);
926 				}
927 			}
928 
929 			/*
930 			 * Build RangeVar for from clause, fully qualified based on the
931 			 * relation which we have opened and locked.
932 			 */
933 			from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
934 								pstrdup(RelationGetRelationName(rel)),
935 								-1);
936 
937 			/* Build query */
938 			select = makeNode(SelectStmt);
939 			select->targetList = targetList;
940 			select->fromClause = list_make1(from);
941 
942 			query = makeNode(RawStmt);
943 			query->stmt = (Node *) select;
944 			query->stmt_location = stmt_location;
945 			query->stmt_len = stmt_len;
946 
947 			/*
948 			 * Close the relation for now, but keep the lock on it to prevent
949 			 * changes between now and when we start the query-based COPY.
950 			 *
951 			 * We'll reopen it later as part of the query-based COPY.
952 			 */
953 			heap_close(rel, NoLock);
954 			rel = NULL;
955 		}
956 	}
957 	else
958 	{
959 		Assert(stmt->query);
960 
961 		query = makeNode(RawStmt);
962 		query->stmt = stmt->query;
963 		query->stmt_location = stmt_location;
964 		query->stmt_len = stmt_len;
965 
966 		relid = InvalidOid;
967 		rel = NULL;
968 	}
969 
970 	if (is_from)
971 	{
972 		Assert(rel);
973 
974 		/* check read-only transaction and parallel mode */
975 		if (XactReadOnly && !rel->rd_islocaltemp)
976 			PreventCommandIfReadOnly("COPY FROM");
977 		PreventCommandIfParallelMode("COPY FROM");
978 
979 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
980 							   NULL, stmt->attlist, stmt->options);
981 		*processed = CopyFrom(cstate);	/* copy from file to database */
982 		EndCopyFrom(cstate);
983 	}
984 	else
985 	{
986 		cstate = BeginCopyTo(pstate, rel, query, relid,
987 							 stmt->filename, stmt->is_program,
988 							 stmt->attlist, stmt->options);
989 		*processed = DoCopyTo(cstate);	/* copy from database to file */
990 		EndCopyTo(cstate);
991 	}
992 
993 	/*
994 	 * Close the relation. If reading, we can release the AccessShareLock we
995 	 * got; if writing, we should hold the lock until end of transaction to
996 	 * ensure that updates will be committed before lock is released.
997 	 */
998 	if (rel != NULL)
999 		heap_close(rel, (is_from ? NoLock : AccessShareLock));
1000 }
1001 
1002 /*
1003  * Process the statement option list for COPY.
1004  *
1005  * Scan the options list (a list of DefElem) and transpose the information
1006  * into cstate, applying appropriate error checking.
1007  *
1008  * cstate is assumed to be filled with zeroes initially.
1009  *
1010  * This is exported so that external users of the COPY API can sanity-check
1011  * a list of options.  In that usage, cstate should be passed as NULL
1012  * (since external users don't know sizeof(CopyStateData)) and the collected
1013  * data is just leaked until CurrentMemoryContext is reset.
1014  *
1015  * Note that additional checking, such as whether column names listed in FORCE
1016  * QUOTE actually exist, has to be applied later.  This just checks for
1017  * self-consistency of the options list.
1018  */
1019 void
ProcessCopyOptions(ParseState * pstate,CopyState cstate,bool is_from,List * options)1020 ProcessCopyOptions(ParseState *pstate,
1021 				   CopyState cstate,
1022 				   bool is_from,
1023 				   List *options)
1024 {
1025 	bool		format_specified = false;
1026 	ListCell   *option;
1027 
1028 	/* Support external use for option sanity checking */
1029 	if (cstate == NULL)
1030 		cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1031 
1032 	cstate->is_copy_from = is_from;
1033 
1034 	cstate->file_encoding = -1;
1035 
1036 	/* Extract options from the statement node tree */
1037 	foreach(option, options)
1038 	{
1039 		DefElem    *defel = lfirst_node(DefElem, option);
1040 
1041 		if (strcmp(defel->defname, "format") == 0)
1042 		{
1043 			char	   *fmt = defGetString(defel);
1044 
1045 			if (format_specified)
1046 				ereport(ERROR,
1047 						(errcode(ERRCODE_SYNTAX_ERROR),
1048 						 errmsg("conflicting or redundant options"),
1049 						 parser_errposition(pstate, defel->location)));
1050 			format_specified = true;
1051 			if (strcmp(fmt, "text") == 0)
1052 				 /* default format */ ;
1053 			else if (strcmp(fmt, "csv") == 0)
1054 				cstate->csv_mode = true;
1055 			else if (strcmp(fmt, "binary") == 0)
1056 				cstate->binary = true;
1057 			else
1058 				ereport(ERROR,
1059 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1060 						 errmsg("COPY format \"%s\" not recognized", fmt),
1061 						 parser_errposition(pstate, defel->location)));
1062 		}
1063 		else if (strcmp(defel->defname, "oids") == 0)
1064 		{
1065 			if (cstate->oids)
1066 				ereport(ERROR,
1067 						(errcode(ERRCODE_SYNTAX_ERROR),
1068 						 errmsg("conflicting or redundant options"),
1069 						 parser_errposition(pstate, defel->location)));
1070 			cstate->oids = defGetBoolean(defel);
1071 		}
1072 		else if (strcmp(defel->defname, "freeze") == 0)
1073 		{
1074 			if (cstate->freeze)
1075 				ereport(ERROR,
1076 						(errcode(ERRCODE_SYNTAX_ERROR),
1077 						 errmsg("conflicting or redundant options"),
1078 						 parser_errposition(pstate, defel->location)));
1079 			cstate->freeze = defGetBoolean(defel);
1080 		}
1081 		else if (strcmp(defel->defname, "delimiter") == 0)
1082 		{
1083 			if (cstate->delim)
1084 				ereport(ERROR,
1085 						(errcode(ERRCODE_SYNTAX_ERROR),
1086 						 errmsg("conflicting or redundant options"),
1087 						 parser_errposition(pstate, defel->location)));
1088 			cstate->delim = defGetString(defel);
1089 		}
1090 		else if (strcmp(defel->defname, "null") == 0)
1091 		{
1092 			if (cstate->null_print)
1093 				ereport(ERROR,
1094 						(errcode(ERRCODE_SYNTAX_ERROR),
1095 						 errmsg("conflicting or redundant options"),
1096 						 parser_errposition(pstate, defel->location)));
1097 			cstate->null_print = defGetString(defel);
1098 		}
1099 		else if (strcmp(defel->defname, "header") == 0)
1100 		{
1101 			if (cstate->header_line)
1102 				ereport(ERROR,
1103 						(errcode(ERRCODE_SYNTAX_ERROR),
1104 						 errmsg("conflicting or redundant options"),
1105 						 parser_errposition(pstate, defel->location)));
1106 			cstate->header_line = defGetBoolean(defel);
1107 		}
1108 		else if (strcmp(defel->defname, "quote") == 0)
1109 		{
1110 			if (cstate->quote)
1111 				ereport(ERROR,
1112 						(errcode(ERRCODE_SYNTAX_ERROR),
1113 						 errmsg("conflicting or redundant options"),
1114 						 parser_errposition(pstate, defel->location)));
1115 			cstate->quote = defGetString(defel);
1116 		}
1117 		else if (strcmp(defel->defname, "escape") == 0)
1118 		{
1119 			if (cstate->escape)
1120 				ereport(ERROR,
1121 						(errcode(ERRCODE_SYNTAX_ERROR),
1122 						 errmsg("conflicting or redundant options"),
1123 						 parser_errposition(pstate, defel->location)));
1124 			cstate->escape = defGetString(defel);
1125 		}
1126 		else if (strcmp(defel->defname, "force_quote") == 0)
1127 		{
1128 			if (cstate->force_quote || cstate->force_quote_all)
1129 				ereport(ERROR,
1130 						(errcode(ERRCODE_SYNTAX_ERROR),
1131 						 errmsg("conflicting or redundant options"),
1132 						 parser_errposition(pstate, defel->location)));
1133 			if (defel->arg && IsA(defel->arg, A_Star))
1134 				cstate->force_quote_all = true;
1135 			else if (defel->arg && IsA(defel->arg, List))
1136 				cstate->force_quote = castNode(List, defel->arg);
1137 			else
1138 				ereport(ERROR,
1139 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1140 						 errmsg("argument to option \"%s\" must be a list of column names",
1141 								defel->defname),
1142 						 parser_errposition(pstate, defel->location)));
1143 		}
1144 		else if (strcmp(defel->defname, "force_not_null") == 0)
1145 		{
1146 			if (cstate->force_notnull)
1147 				ereport(ERROR,
1148 						(errcode(ERRCODE_SYNTAX_ERROR),
1149 						 errmsg("conflicting or redundant options"),
1150 						 parser_errposition(pstate, defel->location)));
1151 			if (defel->arg && IsA(defel->arg, List))
1152 				cstate->force_notnull = castNode(List, defel->arg);
1153 			else
1154 				ereport(ERROR,
1155 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1156 						 errmsg("argument to option \"%s\" must be a list of column names",
1157 								defel->defname),
1158 						 parser_errposition(pstate, defel->location)));
1159 		}
1160 		else if (strcmp(defel->defname, "force_null") == 0)
1161 		{
1162 			if (cstate->force_null)
1163 				ereport(ERROR,
1164 						(errcode(ERRCODE_SYNTAX_ERROR),
1165 						 errmsg("conflicting or redundant options")));
1166 			if (defel->arg && IsA(defel->arg, List))
1167 				cstate->force_null = castNode(List, defel->arg);
1168 			else
1169 				ereport(ERROR,
1170 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1171 						 errmsg("argument to option \"%s\" must be a list of column names",
1172 								defel->defname),
1173 						 parser_errposition(pstate, defel->location)));
1174 		}
1175 		else if (strcmp(defel->defname, "convert_selectively") == 0)
1176 		{
1177 			/*
1178 			 * Undocumented, not-accessible-from-SQL option: convert only the
1179 			 * named columns to binary form, storing the rest as NULLs. It's
1180 			 * allowed for the column list to be NIL.
1181 			 */
1182 			if (cstate->convert_selectively)
1183 				ereport(ERROR,
1184 						(errcode(ERRCODE_SYNTAX_ERROR),
1185 						 errmsg("conflicting or redundant options"),
1186 						 parser_errposition(pstate, defel->location)));
1187 			cstate->convert_selectively = true;
1188 			if (defel->arg == NULL || IsA(defel->arg, List))
1189 				cstate->convert_select = castNode(List, defel->arg);
1190 			else
1191 				ereport(ERROR,
1192 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1193 						 errmsg("argument to option \"%s\" must be a list of column names",
1194 								defel->defname),
1195 						 parser_errposition(pstate, defel->location)));
1196 		}
1197 		else if (strcmp(defel->defname, "encoding") == 0)
1198 		{
1199 			if (cstate->file_encoding >= 0)
1200 				ereport(ERROR,
1201 						(errcode(ERRCODE_SYNTAX_ERROR),
1202 						 errmsg("conflicting or redundant options"),
1203 						 parser_errposition(pstate, defel->location)));
1204 			cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1205 			if (cstate->file_encoding < 0)
1206 				ereport(ERROR,
1207 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1208 						 errmsg("argument to option \"%s\" must be a valid encoding name",
1209 								defel->defname),
1210 						 parser_errposition(pstate, defel->location)));
1211 		}
1212 		else
1213 			ereport(ERROR,
1214 					(errcode(ERRCODE_SYNTAX_ERROR),
1215 					 errmsg("option \"%s\" not recognized",
1216 							defel->defname),
1217 					 parser_errposition(pstate, defel->location)));
1218 	}
1219 
1220 	/*
1221 	 * Check for incompatible options (must do these two before inserting
1222 	 * defaults)
1223 	 */
1224 	if (cstate->binary && cstate->delim)
1225 		ereport(ERROR,
1226 				(errcode(ERRCODE_SYNTAX_ERROR),
1227 				 errmsg("cannot specify DELIMITER in BINARY mode")));
1228 
1229 	if (cstate->binary && cstate->null_print)
1230 		ereport(ERROR,
1231 				(errcode(ERRCODE_SYNTAX_ERROR),
1232 				 errmsg("cannot specify NULL in BINARY mode")));
1233 
1234 	/* Set defaults for omitted options */
1235 	if (!cstate->delim)
1236 		cstate->delim = cstate->csv_mode ? "," : "\t";
1237 
1238 	if (!cstate->null_print)
1239 		cstate->null_print = cstate->csv_mode ? "" : "\\N";
1240 	cstate->null_print_len = strlen(cstate->null_print);
1241 
1242 	if (cstate->csv_mode)
1243 	{
1244 		if (!cstate->quote)
1245 			cstate->quote = "\"";
1246 		if (!cstate->escape)
1247 			cstate->escape = cstate->quote;
1248 	}
1249 
1250 	/* Only single-byte delimiter strings are supported. */
1251 	if (strlen(cstate->delim) != 1)
1252 		ereport(ERROR,
1253 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1254 				 errmsg("COPY delimiter must be a single one-byte character")));
1255 
1256 	/* Disallow end-of-line characters */
1257 	if (strchr(cstate->delim, '\r') != NULL ||
1258 		strchr(cstate->delim, '\n') != NULL)
1259 		ereport(ERROR,
1260 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1261 				 errmsg("COPY delimiter cannot be newline or carriage return")));
1262 
1263 	if (strchr(cstate->null_print, '\r') != NULL ||
1264 		strchr(cstate->null_print, '\n') != NULL)
1265 		ereport(ERROR,
1266 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1267 				 errmsg("COPY null representation cannot use newline or carriage return")));
1268 
1269 	/*
1270 	 * Disallow unsafe delimiter characters in non-CSV mode.  We can't allow
1271 	 * backslash because it would be ambiguous.  We can't allow the other
1272 	 * cases because data characters matching the delimiter must be
1273 	 * backslashed, and certain backslash combinations are interpreted
1274 	 * non-literally by COPY IN.  Disallowing all lower case ASCII letters is
1275 	 * more than strictly necessary, but seems best for consistency and
1276 	 * future-proofing.  Likewise we disallow all digits though only octal
1277 	 * digits are actually dangerous.
1278 	 */
1279 	if (!cstate->csv_mode &&
1280 		strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1281 			   cstate->delim[0]) != NULL)
1282 		ereport(ERROR,
1283 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1284 				 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1285 
1286 	/* Check header */
1287 	if (!cstate->csv_mode && cstate->header_line)
1288 		ereport(ERROR,
1289 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1290 				 errmsg("COPY HEADER available only in CSV mode")));
1291 
1292 	/* Check quote */
1293 	if (!cstate->csv_mode && cstate->quote != NULL)
1294 		ereport(ERROR,
1295 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1296 				 errmsg("COPY quote available only in CSV mode")));
1297 
1298 	if (cstate->csv_mode && strlen(cstate->quote) != 1)
1299 		ereport(ERROR,
1300 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1301 				 errmsg("COPY quote must be a single one-byte character")));
1302 
1303 	if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1304 		ereport(ERROR,
1305 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1306 				 errmsg("COPY delimiter and quote must be different")));
1307 
1308 	/* Check escape */
1309 	if (!cstate->csv_mode && cstate->escape != NULL)
1310 		ereport(ERROR,
1311 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1312 				 errmsg("COPY escape available only in CSV mode")));
1313 
1314 	if (cstate->csv_mode && strlen(cstate->escape) != 1)
1315 		ereport(ERROR,
1316 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317 				 errmsg("COPY escape must be a single one-byte character")));
1318 
1319 	/* Check force_quote */
1320 	if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1321 		ereport(ERROR,
1322 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1323 				 errmsg("COPY force quote available only in CSV mode")));
1324 	if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1325 		ereport(ERROR,
1326 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 				 errmsg("COPY force quote only available using COPY TO")));
1328 
1329 	/* Check force_notnull */
1330 	if (!cstate->csv_mode && cstate->force_notnull != NIL)
1331 		ereport(ERROR,
1332 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1333 				 errmsg("COPY force not null available only in CSV mode")));
1334 	if (cstate->force_notnull != NIL && !is_from)
1335 		ereport(ERROR,
1336 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337 				 errmsg("COPY force not null only available using COPY FROM")));
1338 
1339 	/* Check force_null */
1340 	if (!cstate->csv_mode && cstate->force_null != NIL)
1341 		ereport(ERROR,
1342 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1343 				 errmsg("COPY force null available only in CSV mode")));
1344 
1345 	if (cstate->force_null != NIL && !is_from)
1346 		ereport(ERROR,
1347 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 				 errmsg("COPY force null only available using COPY FROM")));
1349 
1350 	/* Don't allow the delimiter to appear in the null string. */
1351 	if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1352 		ereport(ERROR,
1353 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1354 				 errmsg("COPY delimiter must not appear in the NULL specification")));
1355 
1356 	/* Don't allow the CSV quote char to appear in the null string. */
1357 	if (cstate->csv_mode &&
1358 		strchr(cstate->null_print, cstate->quote[0]) != NULL)
1359 		ereport(ERROR,
1360 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1361 				 errmsg("CSV quote character must not appear in the NULL specification")));
1362 }
1363 
1364 /*
1365  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1366  *
1367  * Iff <binary>, unload or reload in the binary format, as opposed to the
1368  * more wasteful but more robust and portable text format.
1369  *
1370  * Iff <oids>, unload or reload the format that includes OID information.
1371  * On input, we accept OIDs whether or not the table has an OID column,
1372  * but silently drop them if it does not.  On output, we report an error
1373  * if the user asks for OIDs in a table that has none (not providing an
1374  * OID column might seem friendlier, but could seriously confuse programs).
1375  *
1376  * If in the text format, delimit columns with delimiter <delim> and print
1377  * NULL values as <null_print>.
1378  */
1379 static CopyState
BeginCopy(ParseState * pstate,bool is_from,Relation rel,RawStmt * raw_query,Oid queryRelId,List * attnamelist,List * options)1380 BeginCopy(ParseState *pstate,
1381 		  bool is_from,
1382 		  Relation rel,
1383 		  RawStmt *raw_query,
1384 		  Oid queryRelId,
1385 		  List *attnamelist,
1386 		  List *options)
1387 {
1388 	CopyState	cstate;
1389 	TupleDesc	tupDesc;
1390 	int			num_phys_attrs;
1391 	MemoryContext oldcontext;
1392 
1393 	/* Allocate workspace and zero all fields */
1394 	cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1395 
1396 	/*
1397 	 * We allocate everything used by a cstate in a new memory context. This
1398 	 * avoids memory leaks during repeated use of COPY in a query.
1399 	 */
1400 	cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1401 												"COPY",
1402 												ALLOCSET_DEFAULT_SIZES);
1403 
1404 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1405 
1406 	/* Extract options from the statement node tree */
1407 	ProcessCopyOptions(pstate, cstate, is_from, options);
1408 
1409 	/* Process the source/target relation or query */
1410 	if (rel)
1411 	{
1412 		Assert(!raw_query);
1413 
1414 		cstate->rel = rel;
1415 
1416 		tupDesc = RelationGetDescr(cstate->rel);
1417 
1418 		/* Don't allow COPY w/ OIDs to or from a table without them */
1419 		if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1420 			ereport(ERROR,
1421 					(errcode(ERRCODE_UNDEFINED_COLUMN),
1422 					 errmsg("table \"%s\" does not have OIDs",
1423 							RelationGetRelationName(cstate->rel))));
1424 	}
1425 	else
1426 	{
1427 		List	   *rewritten;
1428 		Query	   *query;
1429 		PlannedStmt *plan;
1430 		DestReceiver *dest;
1431 
1432 		Assert(!is_from);
1433 		cstate->rel = NULL;
1434 
1435 		/* Don't allow COPY w/ OIDs from a query */
1436 		if (cstate->oids)
1437 			ereport(ERROR,
1438 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1439 					 errmsg("COPY (query) WITH OIDS is not supported")));
1440 
1441 		/*
1442 		 * Run parse analysis and rewrite.  Note this also acquires sufficient
1443 		 * locks on the source table(s).
1444 		 *
1445 		 * Because the parser and planner tend to scribble on their input, we
1446 		 * make a preliminary copy of the source querytree.  This prevents
1447 		 * problems in the case that the COPY is in a portal or plpgsql
1448 		 * function and is executed repeatedly.  (See also the same hack in
1449 		 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
1450 		 */
1451 		rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1452 										   pstate->p_sourcetext, NULL, 0,
1453 										   NULL);
1454 
1455 		/* check that we got back something we can work with */
1456 		if (rewritten == NIL)
1457 		{
1458 			ereport(ERROR,
1459 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1460 					 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1461 		}
1462 		else if (list_length(rewritten) > 1)
1463 		{
1464 			ListCell   *lc;
1465 
1466 			/* examine queries to determine which error message to issue */
1467 			foreach(lc, rewritten)
1468 			{
1469 				Query	   *q = lfirst_node(Query, lc);
1470 
1471 				if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1472 					ereport(ERROR,
1473 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1474 							 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1475 				if (q->querySource == QSRC_NON_INSTEAD_RULE)
1476 					ereport(ERROR,
1477 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1478 							 errmsg("DO ALSO rules are not supported for the COPY")));
1479 			}
1480 
1481 			ereport(ERROR,
1482 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1483 					 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1484 		}
1485 
1486 		query = linitial_node(Query, rewritten);
1487 
1488 		/* The grammar allows SELECT INTO, but we don't support that */
1489 		if (query->utilityStmt != NULL &&
1490 			IsA(query->utilityStmt, CreateTableAsStmt))
1491 			ereport(ERROR,
1492 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1493 					 errmsg("COPY (SELECT INTO) is not supported")));
1494 
1495 		Assert(query->utilityStmt == NULL);
1496 
1497 		/*
1498 		 * Similarly the grammar doesn't enforce the presence of a RETURNING
1499 		 * clause, but this is required here.
1500 		 */
1501 		if (query->commandType != CMD_SELECT &&
1502 			query->returningList == NIL)
1503 		{
1504 			Assert(query->commandType == CMD_INSERT ||
1505 				   query->commandType == CMD_UPDATE ||
1506 				   query->commandType == CMD_DELETE);
1507 
1508 			ereport(ERROR,
1509 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1510 					 errmsg("COPY query must have a RETURNING clause")));
1511 		}
1512 
1513 		/* plan the query */
1514 		plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1515 
1516 		/*
1517 		 * With row level security and a user using "COPY relation TO", we
1518 		 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1519 		 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1520 		 * in any RLS clauses.
1521 		 *
1522 		 * When this happens, we are passed in the relid of the originally
1523 		 * found relation (which we have locked).  As the planner will look up
1524 		 * the relation again, we double-check here to make sure it found the
1525 		 * same one that we have locked.
1526 		 */
1527 		if (queryRelId != InvalidOid)
1528 		{
1529 			/*
1530 			 * Note that with RLS involved there may be multiple relations,
1531 			 * and while the one we need is almost certainly first, we don't
1532 			 * make any guarantees of that in the planner, so check the whole
1533 			 * list and make sure we find the original relation.
1534 			 */
1535 			if (!list_member_oid(plan->relationOids, queryRelId))
1536 				ereport(ERROR,
1537 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1538 						 errmsg("relation referenced by COPY statement has changed")));
1539 		}
1540 
1541 		/*
1542 		 * Use a snapshot with an updated command ID to ensure this query sees
1543 		 * results of any previously executed queries.
1544 		 */
1545 		PushCopiedSnapshot(GetActiveSnapshot());
1546 		UpdateActiveSnapshotCommandId();
1547 
1548 		/* Create dest receiver for COPY OUT */
1549 		dest = CreateDestReceiver(DestCopyOut);
1550 		((DR_copy *) dest)->cstate = cstate;
1551 
1552 		/* Create a QueryDesc requesting no output */
1553 		cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1554 											GetActiveSnapshot(),
1555 											InvalidSnapshot,
1556 											dest, NULL, NULL, 0);
1557 
1558 		/*
1559 		 * Call ExecutorStart to prepare the plan for execution.
1560 		 *
1561 		 * ExecutorStart computes a result tupdesc for us
1562 		 */
1563 		ExecutorStart(cstate->queryDesc, 0);
1564 
1565 		tupDesc = cstate->queryDesc->tupDesc;
1566 	}
1567 
1568 	/* Generate or convert list of attributes to process */
1569 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1570 
1571 	num_phys_attrs = tupDesc->natts;
1572 
1573 	/* Convert FORCE_QUOTE name list to per-column flags, check validity */
1574 	cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1575 	if (cstate->force_quote_all)
1576 	{
1577 		int			i;
1578 
1579 		for (i = 0; i < num_phys_attrs; i++)
1580 			cstate->force_quote_flags[i] = true;
1581 	}
1582 	else if (cstate->force_quote)
1583 	{
1584 		List	   *attnums;
1585 		ListCell   *cur;
1586 
1587 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1588 
1589 		foreach(cur, attnums)
1590 		{
1591 			int			attnum = lfirst_int(cur);
1592 
1593 			if (!list_member_int(cstate->attnumlist, attnum))
1594 				ereport(ERROR,
1595 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1596 						 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1597 								NameStr(tupDesc->attrs[attnum - 1]->attname))));
1598 			cstate->force_quote_flags[attnum - 1] = true;
1599 		}
1600 	}
1601 
1602 	/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1603 	cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1604 	if (cstate->force_notnull)
1605 	{
1606 		List	   *attnums;
1607 		ListCell   *cur;
1608 
1609 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1610 
1611 		foreach(cur, attnums)
1612 		{
1613 			int			attnum = lfirst_int(cur);
1614 
1615 			if (!list_member_int(cstate->attnumlist, attnum))
1616 				ereport(ERROR,
1617 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1618 						 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1619 								NameStr(tupDesc->attrs[attnum - 1]->attname))));
1620 			cstate->force_notnull_flags[attnum - 1] = true;
1621 		}
1622 	}
1623 
1624 	/* Convert FORCE_NULL name list to per-column flags, check validity */
1625 	cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1626 	if (cstate->force_null)
1627 	{
1628 		List	   *attnums;
1629 		ListCell   *cur;
1630 
1631 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1632 
1633 		foreach(cur, attnums)
1634 		{
1635 			int			attnum = lfirst_int(cur);
1636 
1637 			if (!list_member_int(cstate->attnumlist, attnum))
1638 				ereport(ERROR,
1639 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1640 						 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1641 								NameStr(tupDesc->attrs[attnum - 1]->attname))));
1642 			cstate->force_null_flags[attnum - 1] = true;
1643 		}
1644 	}
1645 
1646 	/* Convert convert_selectively name list to per-column flags */
1647 	if (cstate->convert_selectively)
1648 	{
1649 		List	   *attnums;
1650 		ListCell   *cur;
1651 
1652 		cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1653 
1654 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1655 
1656 		foreach(cur, attnums)
1657 		{
1658 			int			attnum = lfirst_int(cur);
1659 
1660 			if (!list_member_int(cstate->attnumlist, attnum))
1661 				ereport(ERROR,
1662 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1663 						 errmsg_internal("selected column \"%s\" not referenced by COPY",
1664 										 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1665 			cstate->convert_select_flags[attnum - 1] = true;
1666 		}
1667 	}
1668 
1669 	/* Use client encoding when ENCODING option is not specified. */
1670 	if (cstate->file_encoding < 0)
1671 		cstate->file_encoding = pg_get_client_encoding();
1672 
1673 	/*
1674 	 * Set up encoding conversion info.  Even if the file and server encodings
1675 	 * are the same, we must apply pg_any_to_server() to validate data in
1676 	 * multibyte encodings.
1677 	 */
1678 	cstate->need_transcoding =
1679 		(cstate->file_encoding != GetDatabaseEncoding() ||
1680 		 pg_database_encoding_max_length() > 1);
1681 	/* See Multibyte encoding comment above */
1682 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1683 
1684 	cstate->copy_dest = COPY_FILE;	/* default */
1685 
1686 	MemoryContextSwitchTo(oldcontext);
1687 
1688 	return cstate;
1689 }
1690 
1691 /*
1692  * Closes the pipe to an external program, checking the pclose() return code.
1693  */
1694 static void
ClosePipeToProgram(CopyState cstate)1695 ClosePipeToProgram(CopyState cstate)
1696 {
1697 	int			pclose_rc;
1698 
1699 	Assert(cstate->is_program);
1700 
1701 	pclose_rc = ClosePipeStream(cstate->copy_file);
1702 	if (pclose_rc == -1)
1703 		ereport(ERROR,
1704 				(errcode_for_file_access(),
1705 				 errmsg("could not close pipe to external command: %m")));
1706 	else if (pclose_rc != 0)
1707 	{
1708 		/*
1709 		 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1710 		 * expectable for the called program to fail with SIGPIPE, and we
1711 		 * should not report that as an error.  Otherwise, SIGPIPE indicates a
1712 		 * problem.
1713 		 */
1714 		if (cstate->is_copy_from && !cstate->reached_eof &&
1715 			wait_result_is_signal(pclose_rc, SIGPIPE))
1716 			return;
1717 
1718 		ereport(ERROR,
1719 				(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1720 				 errmsg("program \"%s\" failed",
1721 						cstate->filename),
1722 				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1723 	}
1724 }
1725 
1726 /*
1727  * Release resources allocated in a cstate for COPY TO/FROM.
1728  */
1729 static void
EndCopy(CopyState cstate)1730 EndCopy(CopyState cstate)
1731 {
1732 	if (cstate->is_program)
1733 	{
1734 		ClosePipeToProgram(cstate);
1735 	}
1736 	else
1737 	{
1738 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1739 			ereport(ERROR,
1740 					(errcode_for_file_access(),
1741 					 errmsg("could not close file \"%s\": %m",
1742 							cstate->filename)));
1743 	}
1744 
1745 	MemoryContextDelete(cstate->copycontext);
1746 	pfree(cstate);
1747 }
1748 
1749 /*
1750  * Setup CopyState to read tuples from a table or a query for COPY TO.
1751  */
1752 static CopyState
BeginCopyTo(ParseState * pstate,Relation rel,RawStmt * query,Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1753 BeginCopyTo(ParseState *pstate,
1754 			Relation rel,
1755 			RawStmt *query,
1756 			Oid queryRelId,
1757 			const char *filename,
1758 			bool is_program,
1759 			List *attnamelist,
1760 			List *options)
1761 {
1762 	CopyState	cstate;
1763 	bool		pipe = (filename == NULL);
1764 	MemoryContext oldcontext;
1765 
1766 	if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1767 	{
1768 		if (rel->rd_rel->relkind == RELKIND_VIEW)
1769 			ereport(ERROR,
1770 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1771 					 errmsg("cannot copy from view \"%s\"",
1772 							RelationGetRelationName(rel)),
1773 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1774 		else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1775 			ereport(ERROR,
1776 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1777 					 errmsg("cannot copy from materialized view \"%s\"",
1778 							RelationGetRelationName(rel)),
1779 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1780 		else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1781 			ereport(ERROR,
1782 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1783 					 errmsg("cannot copy from foreign table \"%s\"",
1784 							RelationGetRelationName(rel)),
1785 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1786 		else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1787 			ereport(ERROR,
1788 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1789 					 errmsg("cannot copy from sequence \"%s\"",
1790 							RelationGetRelationName(rel))));
1791 		else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1792 			ereport(ERROR,
1793 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1794 					 errmsg("cannot copy from partitioned table \"%s\"",
1795 							RelationGetRelationName(rel)),
1796 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1797 		else
1798 			ereport(ERROR,
1799 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1800 					 errmsg("cannot copy from non-table relation \"%s\"",
1801 							RelationGetRelationName(rel))));
1802 	}
1803 
1804 	cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1805 					   options);
1806 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1807 
1808 	if (pipe)
1809 	{
1810 		Assert(!is_program);	/* the grammar does not allow this */
1811 		if (whereToSendOutput != DestRemote)
1812 			cstate->copy_file = stdout;
1813 	}
1814 	else
1815 	{
1816 		cstate->filename = pstrdup(filename);
1817 		cstate->is_program = is_program;
1818 
1819 		if (is_program)
1820 		{
1821 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1822 			if (cstate->copy_file == NULL)
1823 				ereport(ERROR,
1824 						(errcode_for_file_access(),
1825 						 errmsg("could not execute command \"%s\": %m",
1826 								cstate->filename)));
1827 		}
1828 		else
1829 		{
1830 			mode_t		oumask; /* Pre-existing umask value */
1831 			struct stat st;
1832 
1833 			/*
1834 			 * Prevent write to relative path ... too easy to shoot oneself in
1835 			 * the foot by overwriting a database file ...
1836 			 */
1837 			if (!is_absolute_path(filename))
1838 				ereport(ERROR,
1839 						(errcode(ERRCODE_INVALID_NAME),
1840 						 errmsg("relative path not allowed for COPY to file")));
1841 
1842 			oumask = umask(S_IWGRP | S_IWOTH);
1843 			PG_TRY();
1844 			{
1845 				cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1846 			}
1847 			PG_CATCH();
1848 			{
1849 				umask(oumask);
1850 				PG_RE_THROW();
1851 			}
1852 			PG_END_TRY();
1853 			umask(oumask);
1854 			if (cstate->copy_file == NULL)
1855 			{
1856 				/* copy errno because ereport subfunctions might change it */
1857 				int			save_errno = errno;
1858 
1859 				ereport(ERROR,
1860 						(errcode_for_file_access(),
1861 						 errmsg("could not open file \"%s\" for writing: %m",
1862 								cstate->filename),
1863 						 (save_errno == ENOENT || save_errno == EACCES) ?
1864 						 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1865 								 "You may want a client-side facility such as psql's \\copy.") : 0));
1866 			}
1867 
1868 			if (fstat(fileno(cstate->copy_file), &st))
1869 				ereport(ERROR,
1870 						(errcode_for_file_access(),
1871 						 errmsg("could not stat file \"%s\": %m",
1872 								cstate->filename)));
1873 
1874 			if (S_ISDIR(st.st_mode))
1875 				ereport(ERROR,
1876 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1877 						 errmsg("\"%s\" is a directory", cstate->filename)));
1878 		}
1879 	}
1880 
1881 	MemoryContextSwitchTo(oldcontext);
1882 
1883 	return cstate;
1884 }
1885 
1886 /*
1887  * This intermediate routine exists mainly to localize the effects of setjmp
1888  * so we don't need to plaster a lot of variables with "volatile".
1889  */
1890 static uint64
DoCopyTo(CopyState cstate)1891 DoCopyTo(CopyState cstate)
1892 {
1893 	bool		pipe = (cstate->filename == NULL);
1894 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
1895 	uint64		processed;
1896 
1897 	PG_TRY();
1898 	{
1899 		if (fe_copy)
1900 			SendCopyBegin(cstate);
1901 
1902 		processed = CopyTo(cstate);
1903 
1904 		if (fe_copy)
1905 			SendCopyEnd(cstate);
1906 	}
1907 	PG_CATCH();
1908 	{
1909 		/*
1910 		 * Make sure we turn off old-style COPY OUT mode upon error. It is
1911 		 * okay to do this in all cases, since it does nothing if the mode is
1912 		 * not on.
1913 		 */
1914 		pq_endcopyout(true);
1915 		PG_RE_THROW();
1916 	}
1917 	PG_END_TRY();
1918 
1919 	return processed;
1920 }
1921 
1922 /*
1923  * Clean up storage and release resources for COPY TO.
1924  */
1925 static void
EndCopyTo(CopyState cstate)1926 EndCopyTo(CopyState cstate)
1927 {
1928 	if (cstate->queryDesc != NULL)
1929 	{
1930 		/* Close down the query and free resources. */
1931 		ExecutorFinish(cstate->queryDesc);
1932 		ExecutorEnd(cstate->queryDesc);
1933 		FreeQueryDesc(cstate->queryDesc);
1934 		PopActiveSnapshot();
1935 	}
1936 
1937 	/* Clean up storage */
1938 	EndCopy(cstate);
1939 }
1940 
1941 /*
1942  * Copy from relation or query TO file.
1943  */
1944 static uint64
CopyTo(CopyState cstate)1945 CopyTo(CopyState cstate)
1946 {
1947 	TupleDesc	tupDesc;
1948 	int			num_phys_attrs;
1949 	Form_pg_attribute *attr;
1950 	ListCell   *cur;
1951 	uint64		processed;
1952 
1953 	if (cstate->rel)
1954 		tupDesc = RelationGetDescr(cstate->rel);
1955 	else
1956 		tupDesc = cstate->queryDesc->tupDesc;
1957 	attr = tupDesc->attrs;
1958 	num_phys_attrs = tupDesc->natts;
1959 	cstate->null_print_client = cstate->null_print; /* default */
1960 
1961 	/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1962 	cstate->fe_msgbuf = makeStringInfo();
1963 
1964 	/* Get info about the columns we need to process. */
1965 	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1966 	foreach(cur, cstate->attnumlist)
1967 	{
1968 		int			attnum = lfirst_int(cur);
1969 		Oid			out_func_oid;
1970 		bool		isvarlena;
1971 
1972 		if (cstate->binary)
1973 			getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1974 									&out_func_oid,
1975 									&isvarlena);
1976 		else
1977 			getTypeOutputInfo(attr[attnum - 1]->atttypid,
1978 							  &out_func_oid,
1979 							  &isvarlena);
1980 		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1981 	}
1982 
1983 	/*
1984 	 * Create a temporary memory context that we can reset once per row to
1985 	 * recover palloc'd memory.  This avoids any problems with leaks inside
1986 	 * datatype output routines, and should be faster than retail pfree's
1987 	 * anyway.  (We don't need a whole econtext as CopyFrom does.)
1988 	 */
1989 	cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1990 											   "COPY TO",
1991 											   ALLOCSET_DEFAULT_SIZES);
1992 
1993 	if (cstate->binary)
1994 	{
1995 		/* Generate header for a binary copy */
1996 		int32		tmp;
1997 
1998 		/* Signature */
1999 		CopySendData(cstate, BinarySignature, 11);
2000 		/* Flags field */
2001 		tmp = 0;
2002 		if (cstate->oids)
2003 			tmp |= (1 << 16);
2004 		CopySendInt32(cstate, tmp);
2005 		/* No header extension */
2006 		tmp = 0;
2007 		CopySendInt32(cstate, tmp);
2008 	}
2009 	else
2010 	{
2011 		/*
2012 		 * For non-binary copy, we need to convert null_print to file
2013 		 * encoding, because it will be sent directly with CopySendString.
2014 		 */
2015 		if (cstate->need_transcoding)
2016 			cstate->null_print_client = pg_server_to_any(cstate->null_print,
2017 														 cstate->null_print_len,
2018 														 cstate->file_encoding);
2019 
2020 		/* if a header has been requested send the line */
2021 		if (cstate->header_line)
2022 		{
2023 			bool		hdr_delim = false;
2024 
2025 			foreach(cur, cstate->attnumlist)
2026 			{
2027 				int			attnum = lfirst_int(cur);
2028 				char	   *colname;
2029 
2030 				if (hdr_delim)
2031 					CopySendChar(cstate, cstate->delim[0]);
2032 				hdr_delim = true;
2033 
2034 				colname = NameStr(attr[attnum - 1]->attname);
2035 
2036 				CopyAttributeOutCSV(cstate, colname, false,
2037 									list_length(cstate->attnumlist) == 1);
2038 			}
2039 
2040 			CopySendEndOfRow(cstate);
2041 		}
2042 	}
2043 
2044 	if (cstate->rel)
2045 	{
2046 		Datum	   *values;
2047 		bool	   *nulls;
2048 		HeapScanDesc scandesc;
2049 		HeapTuple	tuple;
2050 
2051 		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2052 		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2053 
2054 		scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2055 
2056 		processed = 0;
2057 		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2058 		{
2059 			CHECK_FOR_INTERRUPTS();
2060 
2061 			/* Deconstruct the tuple ... faster than repeated heap_getattr */
2062 			heap_deform_tuple(tuple, tupDesc, values, nulls);
2063 
2064 			/* Format and send the data */
2065 			CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2066 			processed++;
2067 		}
2068 
2069 		heap_endscan(scandesc);
2070 
2071 		pfree(values);
2072 		pfree(nulls);
2073 	}
2074 	else
2075 	{
2076 		/* run the plan --- the dest receiver will send tuples */
2077 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2078 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2079 	}
2080 
2081 	if (cstate->binary)
2082 	{
2083 		/* Generate trailer for a binary copy */
2084 		CopySendInt16(cstate, -1);
2085 		/* Need to flush out the trailer */
2086 		CopySendEndOfRow(cstate);
2087 	}
2088 
2089 	MemoryContextDelete(cstate->rowcontext);
2090 
2091 	return processed;
2092 }
2093 
2094 /*
2095  * Emit one row during CopyTo().
2096  */
2097 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2098 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2099 {
2100 	bool		need_delim = false;
2101 	FmgrInfo   *out_functions = cstate->out_functions;
2102 	MemoryContext oldcontext;
2103 	ListCell   *cur;
2104 	char	   *string;
2105 
2106 	MemoryContextReset(cstate->rowcontext);
2107 	oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2108 
2109 	if (cstate->binary)
2110 	{
2111 		/* Binary per-tuple header */
2112 		CopySendInt16(cstate, list_length(cstate->attnumlist));
2113 		/* Send OID if wanted --- note attnumlist doesn't include it */
2114 		if (cstate->oids)
2115 		{
2116 			/* Hack --- assume Oid is same size as int32 */
2117 			CopySendInt32(cstate, sizeof(int32));
2118 			CopySendInt32(cstate, tupleOid);
2119 		}
2120 	}
2121 	else
2122 	{
2123 		/* Text format has no per-tuple header, but send OID if wanted */
2124 		/* Assume digits don't need any quoting or encoding conversion */
2125 		if (cstate->oids)
2126 		{
2127 			string = DatumGetCString(DirectFunctionCall1(oidout,
2128 														 ObjectIdGetDatum(tupleOid)));
2129 			CopySendString(cstate, string);
2130 			need_delim = true;
2131 		}
2132 	}
2133 
2134 	foreach(cur, cstate->attnumlist)
2135 	{
2136 		int			attnum = lfirst_int(cur);
2137 		Datum		value = values[attnum - 1];
2138 		bool		isnull = nulls[attnum - 1];
2139 
2140 		if (!cstate->binary)
2141 		{
2142 			if (need_delim)
2143 				CopySendChar(cstate, cstate->delim[0]);
2144 			need_delim = true;
2145 		}
2146 
2147 		if (isnull)
2148 		{
2149 			if (!cstate->binary)
2150 				CopySendString(cstate, cstate->null_print_client);
2151 			else
2152 				CopySendInt32(cstate, -1);
2153 		}
2154 		else
2155 		{
2156 			if (!cstate->binary)
2157 			{
2158 				string = OutputFunctionCall(&out_functions[attnum - 1],
2159 											value);
2160 				if (cstate->csv_mode)
2161 					CopyAttributeOutCSV(cstate, string,
2162 										cstate->force_quote_flags[attnum - 1],
2163 										list_length(cstate->attnumlist) == 1);
2164 				else
2165 					CopyAttributeOutText(cstate, string);
2166 			}
2167 			else
2168 			{
2169 				bytea	   *outputbytes;
2170 
2171 				outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2172 											   value);
2173 				CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2174 				CopySendData(cstate, VARDATA(outputbytes),
2175 							 VARSIZE(outputbytes) - VARHDRSZ);
2176 			}
2177 		}
2178 	}
2179 
2180 	CopySendEndOfRow(cstate);
2181 
2182 	MemoryContextSwitchTo(oldcontext);
2183 }
2184 
2185 
2186 /*
2187  * error context callback for COPY FROM
2188  *
2189  * The argument for the error context must be CopyState.
2190  */
2191 void
CopyFromErrorCallback(void * arg)2192 CopyFromErrorCallback(void *arg)
2193 {
2194 	CopyState	cstate = (CopyState) arg;
2195 	char		curlineno_str[32];
2196 
2197 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2198 			 cstate->cur_lineno);
2199 
2200 	if (cstate->binary)
2201 	{
2202 		/* can't usefully display the data */
2203 		if (cstate->cur_attname)
2204 			errcontext("COPY %s, line %s, column %s",
2205 					   cstate->cur_relname, curlineno_str,
2206 					   cstate->cur_attname);
2207 		else
2208 			errcontext("COPY %s, line %s",
2209 					   cstate->cur_relname, curlineno_str);
2210 	}
2211 	else
2212 	{
2213 		if (cstate->cur_attname && cstate->cur_attval)
2214 		{
2215 			/* error is relevant to a particular column */
2216 			char	   *attval;
2217 
2218 			attval = limit_printout_length(cstate->cur_attval);
2219 			errcontext("COPY %s, line %s, column %s: \"%s\"",
2220 					   cstate->cur_relname, curlineno_str,
2221 					   cstate->cur_attname, attval);
2222 			pfree(attval);
2223 		}
2224 		else if (cstate->cur_attname)
2225 		{
2226 			/* error is relevant to a particular column, value is NULL */
2227 			errcontext("COPY %s, line %s, column %s: null input",
2228 					   cstate->cur_relname, curlineno_str,
2229 					   cstate->cur_attname);
2230 		}
2231 		else
2232 		{
2233 			/*
2234 			 * Error is relevant to a particular line.
2235 			 *
2236 			 * If line_buf still contains the correct line, and it's already
2237 			 * transcoded, print it. If it's still in a foreign encoding, it's
2238 			 * quite likely that the error is precisely a failure to do
2239 			 * encoding conversion (ie, bad data). We dare not try to convert
2240 			 * it, and at present there's no way to regurgitate it without
2241 			 * conversion. So we have to punt and just report the line number.
2242 			 */
2243 			if (cstate->line_buf_valid &&
2244 				(cstate->line_buf_converted || !cstate->need_transcoding))
2245 			{
2246 				char	   *lineval;
2247 
2248 				lineval = limit_printout_length(cstate->line_buf.data);
2249 				errcontext("COPY %s, line %s: \"%s\"",
2250 						   cstate->cur_relname, curlineno_str, lineval);
2251 				pfree(lineval);
2252 			}
2253 			else
2254 			{
2255 				errcontext("COPY %s, line %s",
2256 						   cstate->cur_relname, curlineno_str);
2257 			}
2258 		}
2259 	}
2260 }
2261 
2262 /*
2263  * Make sure we don't print an unreasonable amount of COPY data in a message.
2264  *
2265  * It would seem a lot easier to just use the sprintf "precision" limit to
2266  * truncate the string.  However, some versions of glibc have a bug/misfeature
2267  * that vsnprintf will always fail (return -1) if it is asked to truncate
2268  * a string that contains invalid byte sequences for the current encoding.
2269  * So, do our own truncation.  We return a pstrdup'd copy of the input.
2270  */
2271 static char *
limit_printout_length(const char * str)2272 limit_printout_length(const char *str)
2273 {
2274 #define MAX_COPY_DATA_DISPLAY 100
2275 
2276 	int			slen = strlen(str);
2277 	int			len;
2278 	char	   *res;
2279 
2280 	/* Fast path if definitely okay */
2281 	if (slen <= MAX_COPY_DATA_DISPLAY)
2282 		return pstrdup(str);
2283 
2284 	/* Apply encoding-dependent truncation */
2285 	len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2286 
2287 	/*
2288 	 * Truncate, and add "..." to show we truncated the input.
2289 	 */
2290 	res = (char *) palloc(len + 4);
2291 	memcpy(res, str, len);
2292 	strcpy(res + len, "...");
2293 
2294 	return res;
2295 }
2296 
2297 /*
2298  * Copy FROM file to relation.
2299  */
2300 uint64
CopyFrom(CopyState cstate)2301 CopyFrom(CopyState cstate)
2302 {
2303 	HeapTuple	tuple;
2304 	TupleDesc	tupDesc;
2305 	Datum	   *values;
2306 	bool	   *nulls;
2307 	ResultRelInfo *resultRelInfo;
2308 	ResultRelInfo *saved_resultRelInfo = NULL;
2309 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
2310 	ExprContext *econtext;
2311 	TupleTableSlot *myslot;
2312 	MemoryContext oldcontext = CurrentMemoryContext;
2313 
2314 	ErrorContextCallback errcallback;
2315 	CommandId	mycid = GetCurrentCommandId(true);
2316 	int			hi_options = 0; /* start with default heap_insert options */
2317 	BulkInsertState bistate;
2318 	uint64		processed = 0;
2319 	bool		useHeapMultiInsert;
2320 	int			nBufferedTuples = 0;
2321 	int			prev_leaf_part_index = -1;
2322 
2323 #define MAX_BUFFERED_TUPLES 1000
2324 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
2325 	Size		bufferedTuplesSize = 0;
2326 	uint64		firstBufferedLineNo = 0;
2327 
2328 	Assert(cstate->rel);
2329 
2330 	/*
2331 	 * The target must be a plain relation or have an INSTEAD OF INSERT row
2332 	 * trigger.  (Currently, such triggers are only allowed on views, so we
2333 	 * only hint about them in the view case.)
2334 	 */
2335 	if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2336 		cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2337 		!(cstate->rel->trigdesc &&
2338 		  cstate->rel->trigdesc->trig_insert_instead_row))
2339 	{
2340 		if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2341 			ereport(ERROR,
2342 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2343 					 errmsg("cannot copy to view \"%s\"",
2344 							RelationGetRelationName(cstate->rel)),
2345 					 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2346 		else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2347 			ereport(ERROR,
2348 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2349 					 errmsg("cannot copy to materialized view \"%s\"",
2350 							RelationGetRelationName(cstate->rel))));
2351 		else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2352 			ereport(ERROR,
2353 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2354 					 errmsg("cannot copy to foreign table \"%s\"",
2355 							RelationGetRelationName(cstate->rel))));
2356 		else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2357 			ereport(ERROR,
2358 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2359 					 errmsg("cannot copy to sequence \"%s\"",
2360 							RelationGetRelationName(cstate->rel))));
2361 		else
2362 			ereport(ERROR,
2363 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2364 					 errmsg("cannot copy to non-table relation \"%s\"",
2365 							RelationGetRelationName(cstate->rel))));
2366 	}
2367 
2368 	tupDesc = RelationGetDescr(cstate->rel);
2369 
2370 	/*----------
2371 	 * Check to see if we can avoid writing WAL
2372 	 *
2373 	 * If archive logging/streaming is not enabled *and* either
2374 	 *	- table was created in same transaction as this COPY
2375 	 *	- data is being written to relfilenode created in this transaction
2376 	 * then we can skip writing WAL.  It's safe because if the transaction
2377 	 * doesn't commit, we'll discard the table (or the new relfilenode file).
2378 	 * If it does commit, we'll have done the heap_sync at the bottom of this
2379 	 * routine first.
2380 	 *
2381 	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2382 	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2383 	 * can be cleared before the end of the transaction. The exact case is
2384 	 * when a relation sets a new relfilenode twice in same transaction, yet
2385 	 * the second one fails in an aborted subtransaction, e.g.
2386 	 *
2387 	 * BEGIN;
2388 	 * TRUNCATE t;
2389 	 * SAVEPOINT save;
2390 	 * TRUNCATE t;
2391 	 * ROLLBACK TO save;
2392 	 * COPY ...
2393 	 *
2394 	 * Also, if the target file is new-in-transaction, we assume that checking
2395 	 * FSM for free space is a waste of time, even if we must use WAL because
2396 	 * of archiving.  This could possibly be wrong, but it's unlikely.
2397 	 *
2398 	 * The comments for heap_insert and RelationGetBufferForTuple specify that
2399 	 * skipping WAL logging is only safe if we ensure that our tuples do not
2400 	 * go into pages containing tuples from any other transactions --- but this
2401 	 * must be the case if we have a new table or new relfilenode, so we need
2402 	 * no additional work to enforce that.
2403 	 *
2404 	 * We currently don't support this optimization if the COPY target is a
2405 	 * partitioned table as we currently only lazily initialize partition
2406 	 * information when routing the first tuple to the partition.  We cannot
2407 	 * know at this stage if we can perform this optimization.  It should be
2408 	 * possible to improve on this, but it does mean maintaining heap insert
2409 	 * option flags per partition and setting them when we first open the
2410 	 * partition.
2411 	 *
2412 	 * This optimization is not supported for relation types which do not
2413 	 * have any physical storage, with views entering in this category.
2414 	 * Partitioned tables are not supported as per the description above.
2415 	 *----------
2416 	 */
2417 	/* createSubid is creation check, newRelfilenodeSubid is truncation check */
2418 	if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2419 		cstate->rel->rd_rel->relkind != RELKIND_VIEW &&
2420 		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2421 		 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
2422 	{
2423 		hi_options |= HEAP_INSERT_SKIP_FSM;
2424 		if (!XLogIsNeeded())
2425 			hi_options |= HEAP_INSERT_SKIP_WAL;
2426 	}
2427 
2428 	/*
2429 	 * Optimize if new relfilenode was created in this subxact or one of its
2430 	 * committed children and we won't see those rows later as part of an
2431 	 * earlier scan or command. The subxact test ensures that if this subxact
2432 	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
2433 	 * that the stronger test of exactly which subtransaction created it is
2434 	 * crucial for correctness of this optimization. The test for an earlier
2435 	 * scan or command tolerates false negatives. FREEZE causes other sessions
2436 	 * to see rows they would not see under MVCC, and a false negative merely
2437 	 * spreads that anomaly to the current session.
2438 	 */
2439 	if (cstate->freeze)
2440 	{
2441 		/*
2442 		 * We currently disallow COPY FREEZE on partitioned tables.  The
2443 		 * reason for this is that we've simply not yet opened the partitions
2444 		 * to determine if the optimization can be applied to them.  We could
2445 		 * go and open them all here, but doing so may be quite a costly
2446 		 * overhead for small copies.  In any case, we may just end up routing
2447 		 * tuples to a small number of partitions.  It seems better just to
2448 		 * raise an ERROR for partitioned tables.
2449 		 */
2450 		if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2451 		{
2452 			ereport(ERROR,
2453 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2454 					errmsg("cannot perform FREEZE on a partitioned table")));
2455 		}
2456 
2457 		/*
2458 		 * Tolerate one registration for the benefit of FirstXactSnapshot.
2459 		 * Scan-bearing queries generally create at least two registrations,
2460 		 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2461 		 * Clear CatalogSnapshot to avoid counting its registration.  We'll
2462 		 * still detect ongoing catalog scans, each of which separately
2463 		 * registers the snapshot it uses.
2464 		 */
2465 		InvalidateCatalogSnapshot();
2466 		if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2467 			ereport(ERROR,
2468 					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2469 					 errmsg("cannot perform FREEZE because of prior transaction activity")));
2470 
2471 		if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2472 			cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2473 			ereport(ERROR,
2474 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2475 					 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2476 
2477 		hi_options |= HEAP_INSERT_FROZEN;
2478 	}
2479 
2480 	/*
2481 	 * We need a ResultRelInfo so we can use the regular executor's
2482 	 * index-entry-making machinery.  (There used to be a huge amount of code
2483 	 * here that basically duplicated execUtils.c ...)
2484 	 */
2485 	resultRelInfo = makeNode(ResultRelInfo);
2486 	InitResultRelInfo(resultRelInfo,
2487 					  cstate->rel,
2488 					  1,		/* dummy rangetable index */
2489 					  NULL,
2490 					  0);
2491 
2492 	ExecOpenIndices(resultRelInfo, false);
2493 
2494 	estate->es_result_relations = resultRelInfo;
2495 	estate->es_num_result_relations = 1;
2496 	estate->es_result_relation_info = resultRelInfo;
2497 	estate->es_range_table = cstate->range_table;
2498 
2499 	/* Set up a tuple slot too */
2500 	myslot = ExecInitExtraTupleSlot(estate);
2501 	ExecSetSlotDescriptor(myslot, tupDesc);
2502 	/* Triggers might need a slot as well */
2503 	estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2504 
2505 	/* Prepare to catch AFTER triggers. */
2506 	AfterTriggerBeginQuery();
2507 
2508 	/*
2509 	 * If there are any triggers with transition tables on the named relation,
2510 	 * we need to be prepared to capture transition tuples.
2511 	 */
2512 	cstate->transition_capture =
2513 		MakeTransitionCaptureState(cstate->rel->trigdesc,
2514 								   RelationGetRelid(cstate->rel),
2515 								   CMD_INSERT);
2516 
2517 	/*
2518 	 * If the named relation is a partitioned table, initialize state for
2519 	 * CopyFrom tuple routing.
2520 	 */
2521 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2522 	{
2523 		PartitionDispatch *partition_dispatch_info;
2524 		ResultRelInfo *partitions;
2525 		TupleConversionMap **partition_tupconv_maps;
2526 		TupleTableSlot *partition_tuple_slot;
2527 		int			num_parted,
2528 					num_partitions;
2529 
2530 		ExecSetupPartitionTupleRouting(cstate->rel,
2531 									   1,
2532 									   estate,
2533 									   &partition_dispatch_info,
2534 									   &partitions,
2535 									   &partition_tupconv_maps,
2536 									   &partition_tuple_slot,
2537 									   &num_parted, &num_partitions);
2538 		cstate->partition_dispatch_info = partition_dispatch_info;
2539 		cstate->num_dispatch = num_parted;
2540 		cstate->partitions = partitions;
2541 		cstate->num_partitions = num_partitions;
2542 		cstate->partition_tupconv_maps = partition_tupconv_maps;
2543 		cstate->partition_tuple_slot = partition_tuple_slot;
2544 
2545 		/*
2546 		 * If we are capturing transition tuples, they may need to be
2547 		 * converted from partition format back to partitioned table format
2548 		 * (this is only ever necessary if a BEFORE trigger modifies the
2549 		 * tuple).
2550 		 */
2551 		if (cstate->transition_capture != NULL)
2552 		{
2553 			int			i;
2554 
2555 			cstate->transition_tupconv_maps = (TupleConversionMap **)
2556 				palloc0(sizeof(TupleConversionMap *) * cstate->num_partitions);
2557 			for (i = 0; i < cstate->num_partitions; ++i)
2558 			{
2559 				cstate->transition_tupconv_maps[i] =
2560 					convert_tuples_by_name(RelationGetDescr(cstate->partitions[i].ri_RelationDesc),
2561 										   RelationGetDescr(cstate->rel),
2562 										   gettext_noop("could not convert row type"));
2563 			}
2564 		}
2565 	}
2566 
2567 	/*
2568 	 * It's more efficient to prepare a bunch of tuples for insertion, and
2569 	 * insert them in one heap_multi_insert() call, than call heap_insert()
2570 	 * separately for every tuple. However, we can't do that if there are
2571 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2572 	 * expressions. Such triggers or expressions might query the table we're
2573 	 * inserting to, and act differently if the tuples that have already been
2574 	 * processed and prepared for insertion are not there.  We also can't do
2575 	 * it if the table is partitioned.
2576 	 */
2577 	if ((resultRelInfo->ri_TrigDesc != NULL &&
2578 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2579 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2580 		cstate->partition_dispatch_info != NULL ||
2581 		cstate->volatile_defexprs)
2582 	{
2583 		useHeapMultiInsert = false;
2584 	}
2585 	else
2586 	{
2587 		useHeapMultiInsert = true;
2588 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2589 	}
2590 
2591 	/*
2592 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2593 	 * should do this for COPY, since it's not really an "INSERT" statement as
2594 	 * such. However, executing these triggers maintains consistency with the
2595 	 * EACH ROW triggers that we already fire on COPY.
2596 	 */
2597 	ExecBSInsertTriggers(estate, resultRelInfo);
2598 
2599 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2600 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2601 
2602 	bistate = GetBulkInsertState();
2603 	econtext = GetPerTupleExprContext(estate);
2604 
2605 	/* Set up callback to identify error line number */
2606 	errcallback.callback = CopyFromErrorCallback;
2607 	errcallback.arg = (void *) cstate;
2608 	errcallback.previous = error_context_stack;
2609 	error_context_stack = &errcallback;
2610 
2611 	for (;;)
2612 	{
2613 		TupleTableSlot *slot;
2614 		bool		skip_tuple;
2615 		Oid			loaded_oid = InvalidOid;
2616 
2617 		CHECK_FOR_INTERRUPTS();
2618 
2619 		if (nBufferedTuples == 0)
2620 		{
2621 			/*
2622 			 * Reset the per-tuple exprcontext. We can only do this if the
2623 			 * tuple buffer is empty. (Calling the context the per-tuple
2624 			 * memory context is a bit of a misnomer now.)
2625 			 */
2626 			ResetPerTupleExprContext(estate);
2627 		}
2628 
2629 		/* Switch into its memory context */
2630 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2631 
2632 		if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2633 			break;
2634 
2635 		/* And now we can form the input tuple. */
2636 		tuple = heap_form_tuple(tupDesc, values, nulls);
2637 
2638 		if (loaded_oid != InvalidOid)
2639 			HeapTupleSetOid(tuple, loaded_oid);
2640 
2641 		/*
2642 		 * Constraints might reference the tableoid column, so initialize
2643 		 * t_tableOid before evaluating them.
2644 		 */
2645 		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2646 
2647 		/* Triggers and stuff need to be invoked in query context. */
2648 		MemoryContextSwitchTo(oldcontext);
2649 
2650 		/* Place tuple in tuple slot --- but slot shouldn't free it */
2651 		slot = myslot;
2652 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2653 
2654 		/* Determine the partition to heap_insert the tuple into */
2655 		if (cstate->partition_dispatch_info)
2656 		{
2657 			int			leaf_part_index;
2658 			TupleConversionMap *map;
2659 
2660 			/*
2661 			 * Away we go ... If we end up not finding a partition after all,
2662 			 * ExecFindPartition() does not return and errors out instead.
2663 			 * Otherwise, the returned value is to be used as an index into
2664 			 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2665 			 * will get us the ResultRelInfo and TupleConversionMap for the
2666 			 * partition, respectively.
2667 			 */
2668 			leaf_part_index = ExecFindPartition(resultRelInfo,
2669 												cstate->partition_dispatch_info,
2670 												slot,
2671 												estate);
2672 			Assert(leaf_part_index >= 0 &&
2673 				   leaf_part_index < cstate->num_partitions);
2674 
2675 			/*
2676 			 * If this tuple is mapped to a partition that is not same as the
2677 			 * previous one, we'd better make the bulk insert mechanism gets a
2678 			 * new buffer.
2679 			 */
2680 			if (prev_leaf_part_index != leaf_part_index)
2681 			{
2682 				ReleaseBulkInsertStatePin(bistate);
2683 				prev_leaf_part_index = leaf_part_index;
2684 			}
2685 
2686 			/*
2687 			 * Save the old ResultRelInfo and switch to the one corresponding
2688 			 * to the selected partition.
2689 			 */
2690 			saved_resultRelInfo = resultRelInfo;
2691 			resultRelInfo = cstate->partitions + leaf_part_index;
2692 
2693 			/* We do not yet have a way to insert into a foreign partition */
2694 			if (resultRelInfo->ri_FdwRoutine)
2695 				ereport(ERROR,
2696 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2697 						 errmsg("cannot route inserted tuples to a foreign table")));
2698 
2699 			/*
2700 			 * For ExecInsertIndexTuples() to work on the partition's indexes
2701 			 */
2702 			estate->es_result_relation_info = resultRelInfo;
2703 
2704 			/*
2705 			 * If we're capturing transition tuples, we might need to convert
2706 			 * from the partition rowtype to parent rowtype.
2707 			 */
2708 			if (cstate->transition_capture != NULL)
2709 			{
2710 				if (resultRelInfo->ri_TrigDesc &&
2711 					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2712 				{
2713 					/*
2714 					 * If there are any BEFORE triggers on the partition,
2715 					 * we'll have to be ready to convert their result back to
2716 					 * tuplestore format.
2717 					 */
2718 					cstate->transition_capture->tcs_original_insert_tuple = NULL;
2719 					cstate->transition_capture->tcs_map =
2720 						cstate->transition_tupconv_maps[leaf_part_index];
2721 				}
2722 				else
2723 				{
2724 					/*
2725 					 * Otherwise, just remember the original unconverted
2726 					 * tuple, to avoid a needless round trip conversion.
2727 					 */
2728 					cstate->transition_capture->tcs_original_insert_tuple = tuple;
2729 					cstate->transition_capture->tcs_map = NULL;
2730 				}
2731 			}
2732 
2733 			/*
2734 			 * We might need to convert from the parent rowtype to the
2735 			 * partition rowtype.
2736 			 */
2737 			map = cstate->partition_tupconv_maps[leaf_part_index];
2738 			if (map)
2739 			{
2740 				Relation	partrel = resultRelInfo->ri_RelationDesc;
2741 
2742 				tuple = do_convert_tuple(tuple, map);
2743 
2744 				/*
2745 				 * We must use the partition's tuple descriptor from this
2746 				 * point on.  Use a dedicated slot from this point on until
2747 				 * we're finished dealing with the partition.
2748 				 */
2749 				slot = cstate->partition_tuple_slot;
2750 				Assert(slot != NULL);
2751 				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2752 				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2753 			}
2754 
2755 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2756 		}
2757 
2758 		skip_tuple = false;
2759 
2760 		/* BEFORE ROW INSERT Triggers */
2761 		if (resultRelInfo->ri_TrigDesc &&
2762 			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2763 		{
2764 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2765 
2766 			if (slot == NULL)	/* "do nothing" */
2767 				skip_tuple = true;
2768 			else				/* trigger might have changed tuple */
2769 				tuple = ExecMaterializeSlot(slot);
2770 		}
2771 
2772 		if (!skip_tuple)
2773 		{
2774 			if (resultRelInfo->ri_TrigDesc &&
2775 				resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2776 			{
2777 				/* Pass the data to the INSTEAD ROW INSERT trigger */
2778 				ExecIRInsertTriggers(estate, resultRelInfo, slot);
2779 			}
2780 			else
2781 			{
2782 				/*
2783 				 * We always check the partition constraint, including when
2784 				 * the tuple got here via tuple-routing.  However we don't
2785 				 * need to in the latter case if no BR trigger is defined on
2786 				 * the partition.  Note that a BR trigger might modify the
2787 				 * tuple such that the partition constraint is no longer
2788 				 * satisfied, so we need to check in that case.
2789 				 */
2790 				bool		check_partition_constr =
2791 				(resultRelInfo->ri_PartitionCheck != NIL);
2792 
2793 				if (saved_resultRelInfo != NULL &&
2794 					!(resultRelInfo->ri_TrigDesc &&
2795 					  resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2796 					check_partition_constr = false;
2797 
2798 				/* Check the constraints of the tuple */
2799 				if (resultRelInfo->ri_RelationDesc->rd_att->constr ||
2800 					check_partition_constr)
2801 					ExecConstraints(resultRelInfo, slot, estate);
2802 
2803 				if (useHeapMultiInsert)
2804 				{
2805 					/* Add this tuple to the tuple buffer */
2806 					if (nBufferedTuples == 0)
2807 						firstBufferedLineNo = cstate->cur_lineno;
2808 					bufferedTuples[nBufferedTuples++] = tuple;
2809 					bufferedTuplesSize += tuple->t_len;
2810 
2811 					/*
2812 					 * If the buffer filled up, flush it.  Also flush if the
2813 					 * total size of all the tuples in the buffer becomes
2814 					 * large, to avoid using large amounts of memory for the
2815 					 * buffer when the tuples are exceptionally wide.
2816 					 */
2817 					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2818 						bufferedTuplesSize > 65535)
2819 					{
2820 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2821 											resultRelInfo, myslot, bistate,
2822 											nBufferedTuples, bufferedTuples,
2823 											firstBufferedLineNo);
2824 						nBufferedTuples = 0;
2825 						bufferedTuplesSize = 0;
2826 					}
2827 				}
2828 				else
2829 				{
2830 					List	   *recheckIndexes = NIL;
2831 
2832 					/* OK, store the tuple and create index entries for it */
2833 					heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2834 								hi_options, bistate);
2835 
2836 					if (resultRelInfo->ri_NumIndices > 0)
2837 						recheckIndexes = ExecInsertIndexTuples(slot,
2838 															   &(tuple->t_self),
2839 															   estate,
2840 															   false,
2841 															   NULL,
2842 															   NIL);
2843 
2844 					/* AFTER ROW INSERT Triggers */
2845 					ExecARInsertTriggers(estate, resultRelInfo, tuple,
2846 										 recheckIndexes, cstate->transition_capture);
2847 
2848 					list_free(recheckIndexes);
2849 				}
2850 			}
2851 
2852 			/*
2853 			 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2854 			 * this is the same definition used by execMain.c for counting
2855 			 * tuples inserted by an INSERT command.
2856 			 */
2857 			processed++;
2858 		}
2859 
2860 		/* Restore the saved ResultRelInfo */
2861 		if (saved_resultRelInfo)
2862 		{
2863 			resultRelInfo = saved_resultRelInfo;
2864 			estate->es_result_relation_info = resultRelInfo;
2865 		}
2866 	}
2867 
2868 	/* Flush any remaining buffered tuples */
2869 	if (nBufferedTuples > 0)
2870 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2871 							resultRelInfo, myslot, bistate,
2872 							nBufferedTuples, bufferedTuples,
2873 							firstBufferedLineNo);
2874 
2875 	/* Done, clean up */
2876 	error_context_stack = errcallback.previous;
2877 
2878 	FreeBulkInsertState(bistate);
2879 
2880 	MemoryContextSwitchTo(oldcontext);
2881 
2882 	/*
2883 	 * In the old protocol, tell pqcomm that we can process normal protocol
2884 	 * messages again.
2885 	 */
2886 	if (cstate->copy_dest == COPY_OLD_FE)
2887 		pq_endmsgread();
2888 
2889 	/* Execute AFTER STATEMENT insertion triggers */
2890 	ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2891 
2892 	/* Handle queued AFTER triggers */
2893 	AfterTriggerEndQuery(estate);
2894 
2895 	pfree(values);
2896 	pfree(nulls);
2897 
2898 	ExecResetTupleTable(estate->es_tupleTable, false);
2899 
2900 	ExecCloseIndices(resultRelInfo);
2901 
2902 	/* Close all the partitioned tables, leaf partitions, and their indices */
2903 	if (cstate->partition_dispatch_info)
2904 	{
2905 		int			i;
2906 
2907 		/*
2908 		 * Remember cstate->partition_dispatch_info[0] corresponds to the root
2909 		 * partitioned table, which we must not try to close, because it is
2910 		 * the main target table of COPY that will be closed eventually by
2911 		 * DoCopy().  Also, tupslot is NULL for the root partitioned table.
2912 		 */
2913 		for (i = 1; i < cstate->num_dispatch; i++)
2914 		{
2915 			PartitionDispatch pd = cstate->partition_dispatch_info[i];
2916 
2917 			heap_close(pd->reldesc, NoLock);
2918 			ExecDropSingleTupleTableSlot(pd->tupslot);
2919 		}
2920 		for (i = 0; i < cstate->num_partitions; i++)
2921 		{
2922 			ResultRelInfo *resultRelInfo = cstate->partitions + i;
2923 
2924 			ExecCloseIndices(resultRelInfo);
2925 			heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2926 		}
2927 
2928 		/* Release the standalone partition tuple descriptor */
2929 		ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
2930 	}
2931 
2932 	/* Close any trigger target relations */
2933 	ExecCleanUpTriggerState(estate);
2934 
2935 	FreeExecutorState(estate);
2936 
2937 	/*
2938 	 * If we skipped writing WAL, then we need to sync the heap (but not
2939 	 * indexes since those use WAL anyway)
2940 	 */
2941 	if (hi_options & HEAP_INSERT_SKIP_WAL)
2942 		heap_sync(cstate->rel);
2943 
2944 	return processed;
2945 }
2946 
2947 /*
2948  * A subroutine of CopyFrom, to write the current batch of buffered heap
2949  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2950  * triggers.
2951  */
2952 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2953 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2954 					int hi_options, ResultRelInfo *resultRelInfo,
2955 					TupleTableSlot *myslot, BulkInsertState bistate,
2956 					int nBufferedTuples, HeapTuple *bufferedTuples,
2957 					uint64 firstBufferedLineNo)
2958 {
2959 	MemoryContext oldcontext;
2960 	int			i;
2961 	uint64		save_cur_lineno;
2962 
2963 	/*
2964 	 * Print error context information correctly, if one of the operations
2965 	 * below fail.
2966 	 */
2967 	cstate->line_buf_valid = false;
2968 	save_cur_lineno = cstate->cur_lineno;
2969 
2970 	/*
2971 	 * heap_multi_insert leaks memory, so switch to short-lived memory context
2972 	 * before calling it.
2973 	 */
2974 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2975 	heap_multi_insert(cstate->rel,
2976 					  bufferedTuples,
2977 					  nBufferedTuples,
2978 					  mycid,
2979 					  hi_options,
2980 					  bistate);
2981 	MemoryContextSwitchTo(oldcontext);
2982 
2983 	/*
2984 	 * If there are any indexes, update them for all the inserted tuples, and
2985 	 * run AFTER ROW INSERT triggers.
2986 	 */
2987 	if (resultRelInfo->ri_NumIndices > 0)
2988 	{
2989 		for (i = 0; i < nBufferedTuples; i++)
2990 		{
2991 			List	   *recheckIndexes;
2992 
2993 			cstate->cur_lineno = firstBufferedLineNo + i;
2994 			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2995 			recheckIndexes =
2996 				ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2997 									  estate, false, NULL, NIL);
2998 			ExecARInsertTriggers(estate, resultRelInfo,
2999 								 bufferedTuples[i],
3000 								 recheckIndexes, cstate->transition_capture);
3001 			list_free(recheckIndexes);
3002 		}
3003 	}
3004 
3005 	/*
3006 	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
3007 	 * anyway.
3008 	 */
3009 	else if (resultRelInfo->ri_TrigDesc != NULL &&
3010 			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
3011 			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
3012 	{
3013 		for (i = 0; i < nBufferedTuples; i++)
3014 		{
3015 			cstate->cur_lineno = firstBufferedLineNo + i;
3016 			ExecARInsertTriggers(estate, resultRelInfo,
3017 								 bufferedTuples[i],
3018 								 NIL, cstate->transition_capture);
3019 		}
3020 	}
3021 
3022 	/* reset cur_lineno to where we were */
3023 	cstate->cur_lineno = save_cur_lineno;
3024 }
3025 
3026 /*
3027  * Setup to read tuples from a file for COPY FROM.
3028  *
3029  * 'rel': Used as a template for the tuples
3030  * 'filename': Name of server-local file to read
3031  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3032  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3033  *
3034  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3035  */
3036 CopyState
BeginCopyFrom(ParseState * pstate,Relation rel,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)3037 BeginCopyFrom(ParseState *pstate,
3038 			  Relation rel,
3039 			  const char *filename,
3040 			  bool is_program,
3041 			  copy_data_source_cb data_source_cb,
3042 			  List *attnamelist,
3043 			  List *options)
3044 {
3045 	CopyState	cstate;
3046 	bool		pipe = (filename == NULL);
3047 	TupleDesc	tupDesc;
3048 	Form_pg_attribute *attr;
3049 	AttrNumber	num_phys_attrs,
3050 				num_defaults;
3051 	FmgrInfo   *in_functions;
3052 	Oid		   *typioparams;
3053 	int			attnum;
3054 	Oid			in_func_oid;
3055 	int		   *defmap;
3056 	ExprState **defexprs;
3057 	MemoryContext oldcontext;
3058 	bool		volatile_defexprs;
3059 
3060 	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3061 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3062 
3063 	/* Initialize state variables */
3064 	cstate->reached_eof = false;
3065 	cstate->eol_type = EOL_UNKNOWN;
3066 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
3067 	cstate->cur_lineno = 0;
3068 	cstate->cur_attname = NULL;
3069 	cstate->cur_attval = NULL;
3070 
3071 	/* Set up variables to avoid per-attribute overhead. */
3072 	initStringInfo(&cstate->attribute_buf);
3073 	initStringInfo(&cstate->line_buf);
3074 	cstate->line_buf_converted = false;
3075 	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3076 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
3077 
3078 	/* Assign range table, we'll need it in CopyFrom. */
3079 	if (pstate)
3080 		cstate->range_table = pstate->p_rtable;
3081 
3082 	tupDesc = RelationGetDescr(cstate->rel);
3083 	attr = tupDesc->attrs;
3084 	num_phys_attrs = tupDesc->natts;
3085 	num_defaults = 0;
3086 	volatile_defexprs = false;
3087 
3088 	/*
3089 	 * Pick up the required catalog information for each attribute in the
3090 	 * relation, including the input function, the element type (to pass to
3091 	 * the input function), and info about defaults and constraints. (Which
3092 	 * input function we use depends on text/binary format choice.)
3093 	 */
3094 	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3095 	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3096 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3097 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3098 
3099 	for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3100 	{
3101 		/* We don't need info for dropped attributes */
3102 		if (attr[attnum - 1]->attisdropped)
3103 			continue;
3104 
3105 		/* Fetch the input function and typioparam info */
3106 		if (cstate->binary)
3107 			getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
3108 								   &in_func_oid, &typioparams[attnum - 1]);
3109 		else
3110 			getTypeInputInfo(attr[attnum - 1]->atttypid,
3111 							 &in_func_oid, &typioparams[attnum - 1]);
3112 		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3113 
3114 		/* Get default info if needed */
3115 		if (!list_member_int(cstate->attnumlist, attnum))
3116 		{
3117 			/* attribute is NOT to be copied from input */
3118 			/* use default value if one exists */
3119 			Expr	   *defexpr = (Expr *) build_column_default(cstate->rel,
3120 																attnum);
3121 
3122 			if (defexpr != NULL)
3123 			{
3124 				/* Run the expression through planner */
3125 				defexpr = expression_planner(defexpr);
3126 
3127 				/* Initialize executable expression in copycontext */
3128 				defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3129 				defmap[num_defaults] = attnum - 1;
3130 				num_defaults++;
3131 
3132 				/*
3133 				 * If a default expression looks at the table being loaded,
3134 				 * then it could give the wrong answer when using
3135 				 * multi-insert. Since database access can be dynamic this is
3136 				 * hard to test for exactly, so we use the much wider test of
3137 				 * whether the default expression is volatile. We allow for
3138 				 * the special case of when the default expression is the
3139 				 * nextval() of a sequence which in this specific case is
3140 				 * known to be safe for use with the multi-insert
3141 				 * optimization. Hence we use this special case function
3142 				 * checker rather than the standard check for
3143 				 * contain_volatile_functions().
3144 				 */
3145 				if (!volatile_defexprs)
3146 					volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3147 			}
3148 		}
3149 	}
3150 
3151 	/* We keep those variables in cstate. */
3152 	cstate->in_functions = in_functions;
3153 	cstate->typioparams = typioparams;
3154 	cstate->defmap = defmap;
3155 	cstate->defexprs = defexprs;
3156 	cstate->volatile_defexprs = volatile_defexprs;
3157 	cstate->num_defaults = num_defaults;
3158 	cstate->is_program = is_program;
3159 
3160 	if (data_source_cb)
3161 	{
3162 		cstate->copy_dest = COPY_CALLBACK;
3163 		cstate->data_source_cb = data_source_cb;
3164 	}
3165 	else if (pipe)
3166 	{
3167 		Assert(!is_program);	/* the grammar does not allow this */
3168 		if (whereToSendOutput == DestRemote)
3169 			ReceiveCopyBegin(cstate);
3170 		else
3171 			cstate->copy_file = stdin;
3172 	}
3173 	else
3174 	{
3175 		cstate->filename = pstrdup(filename);
3176 
3177 		if (cstate->is_program)
3178 		{
3179 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3180 			if (cstate->copy_file == NULL)
3181 				ereport(ERROR,
3182 						(errcode_for_file_access(),
3183 						 errmsg("could not execute command \"%s\": %m",
3184 								cstate->filename)));
3185 		}
3186 		else
3187 		{
3188 			struct stat st;
3189 
3190 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3191 			if (cstate->copy_file == NULL)
3192 			{
3193 				/* copy errno because ereport subfunctions might change it */
3194 				int			save_errno = errno;
3195 
3196 				ereport(ERROR,
3197 						(errcode_for_file_access(),
3198 						 errmsg("could not open file \"%s\" for reading: %m",
3199 								cstate->filename),
3200 						 (save_errno == ENOENT || save_errno == EACCES) ?
3201 						 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3202 								 "You may want a client-side facility such as psql's \\copy.") : 0));
3203 			}
3204 
3205 			if (fstat(fileno(cstate->copy_file), &st))
3206 				ereport(ERROR,
3207 						(errcode_for_file_access(),
3208 						 errmsg("could not stat file \"%s\": %m",
3209 								cstate->filename)));
3210 
3211 			if (S_ISDIR(st.st_mode))
3212 				ereport(ERROR,
3213 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3214 						 errmsg("\"%s\" is a directory", cstate->filename)));
3215 		}
3216 	}
3217 
3218 	if (!cstate->binary)
3219 	{
3220 		/* must rely on user to tell us... */
3221 		cstate->file_has_oids = cstate->oids;
3222 	}
3223 	else
3224 	{
3225 		/* Read and verify binary header */
3226 		char		readSig[11];
3227 		int32		tmp;
3228 
3229 		/* Signature */
3230 		if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3231 			memcmp(readSig, BinarySignature, 11) != 0)
3232 			ereport(ERROR,
3233 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3234 					 errmsg("COPY file signature not recognized")));
3235 		/* Flags field */
3236 		if (!CopyGetInt32(cstate, &tmp))
3237 			ereport(ERROR,
3238 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3239 					 errmsg("invalid COPY file header (missing flags)")));
3240 		cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3241 		tmp &= ~(1 << 16);
3242 		if ((tmp >> 16) != 0)
3243 			ereport(ERROR,
3244 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3245 					 errmsg("unrecognized critical flags in COPY file header")));
3246 		/* Header extension length */
3247 		if (!CopyGetInt32(cstate, &tmp) ||
3248 			tmp < 0)
3249 			ereport(ERROR,
3250 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3251 					 errmsg("invalid COPY file header (missing length)")));
3252 		/* Skip extension header, if present */
3253 		while (tmp-- > 0)
3254 		{
3255 			if (CopyGetData(cstate, readSig, 1, 1) != 1)
3256 				ereport(ERROR,
3257 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3258 						 errmsg("invalid COPY file header (wrong length)")));
3259 		}
3260 	}
3261 
3262 	if (cstate->file_has_oids && cstate->binary)
3263 	{
3264 		getTypeBinaryInputInfo(OIDOID,
3265 							   &in_func_oid, &cstate->oid_typioparam);
3266 		fmgr_info(in_func_oid, &cstate->oid_in_function);
3267 	}
3268 
3269 	/* create workspace for CopyReadAttributes results */
3270 	if (!cstate->binary)
3271 	{
3272 		AttrNumber	attr_count = list_length(cstate->attnumlist);
3273 		int			nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3274 
3275 		cstate->max_fields = nfields;
3276 		cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3277 	}
3278 
3279 	MemoryContextSwitchTo(oldcontext);
3280 
3281 	return cstate;
3282 }
3283 
3284 /*
3285  * Read raw fields in the next line for COPY FROM in text or csv mode.
3286  * Return false if no more lines.
3287  *
3288  * An internal temporary buffer is returned via 'fields'. It is valid until
3289  * the next call of the function. Since the function returns all raw fields
3290  * in the input file, 'nfields' could be different from the number of columns
3291  * in the relation.
3292  *
3293  * NOTE: force_not_null option are not applied to the returned fields.
3294  */
3295 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)3296 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3297 {
3298 	int			fldct;
3299 	bool		done;
3300 
3301 	/* only available for text or csv input */
3302 	Assert(!cstate->binary);
3303 
3304 	/* on input just throw the header line away */
3305 	if (cstate->cur_lineno == 0 && cstate->header_line)
3306 	{
3307 		cstate->cur_lineno++;
3308 		if (CopyReadLine(cstate))
3309 			return false;		/* done */
3310 	}
3311 
3312 	cstate->cur_lineno++;
3313 
3314 	/* Actually read the line into memory here */
3315 	done = CopyReadLine(cstate);
3316 
3317 	/*
3318 	 * EOF at start of line means we're done.  If we see EOF after some
3319 	 * characters, we act as though it was newline followed by EOF, ie,
3320 	 * process the line and then exit loop on next iteration.
3321 	 */
3322 	if (done && cstate->line_buf.len == 0)
3323 		return false;
3324 
3325 	/* Parse the line into de-escaped field values */
3326 	if (cstate->csv_mode)
3327 		fldct = CopyReadAttributesCSV(cstate);
3328 	else
3329 		fldct = CopyReadAttributesText(cstate);
3330 
3331 	*fields = cstate->raw_fields;
3332 	*nfields = fldct;
3333 	return true;
3334 }
3335 
3336 /*
3337  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3338  *
3339  * 'econtext' is used to evaluate default expression for each columns not
3340  * read from the file. It can be NULL when no default values are used, i.e.
3341  * when all columns are read from the file.
3342  *
3343  * 'values' and 'nulls' arrays must be the same length as columns of the
3344  * relation passed to BeginCopyFrom. This function fills the arrays.
3345  * Oid of the tuple is returned with 'tupleOid' separately.
3346  */
3347 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3348 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3349 			 Datum *values, bool *nulls, Oid *tupleOid)
3350 {
3351 	TupleDesc	tupDesc;
3352 	Form_pg_attribute *attr;
3353 	AttrNumber	num_phys_attrs,
3354 				attr_count,
3355 				num_defaults = cstate->num_defaults;
3356 	FmgrInfo   *in_functions = cstate->in_functions;
3357 	Oid		   *typioparams = cstate->typioparams;
3358 	int			i;
3359 	int			nfields;
3360 	bool		isnull;
3361 	bool		file_has_oids = cstate->file_has_oids;
3362 	int		   *defmap = cstate->defmap;
3363 	ExprState **defexprs = cstate->defexprs;
3364 
3365 	tupDesc = RelationGetDescr(cstate->rel);
3366 	attr = tupDesc->attrs;
3367 	num_phys_attrs = tupDesc->natts;
3368 	attr_count = list_length(cstate->attnumlist);
3369 	nfields = file_has_oids ? (attr_count + 1) : attr_count;
3370 
3371 	/* Initialize all values for row to NULL */
3372 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3373 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3374 
3375 	if (!cstate->binary)
3376 	{
3377 		char	  **field_strings;
3378 		ListCell   *cur;
3379 		int			fldct;
3380 		int			fieldno;
3381 		char	   *string;
3382 
3383 		/* read raw fields in the next line */
3384 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3385 			return false;
3386 
3387 		/* check for overflowing fields */
3388 		if (nfields > 0 && fldct > nfields)
3389 			ereport(ERROR,
3390 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3391 					 errmsg("extra data after last expected column")));
3392 
3393 		fieldno = 0;
3394 
3395 		/* Read the OID field if present */
3396 		if (file_has_oids)
3397 		{
3398 			if (fieldno >= fldct)
3399 				ereport(ERROR,
3400 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3401 						 errmsg("missing data for OID column")));
3402 			string = field_strings[fieldno++];
3403 
3404 			if (string == NULL)
3405 				ereport(ERROR,
3406 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3407 						 errmsg("null OID in COPY data")));
3408 			else if (cstate->oids && tupleOid != NULL)
3409 			{
3410 				cstate->cur_attname = "oid";
3411 				cstate->cur_attval = string;
3412 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3413 																 CStringGetDatum(string)));
3414 				if (*tupleOid == InvalidOid)
3415 					ereport(ERROR,
3416 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3417 							 errmsg("invalid OID in COPY data")));
3418 				cstate->cur_attname = NULL;
3419 				cstate->cur_attval = NULL;
3420 			}
3421 		}
3422 
3423 		/* Loop to read the user attributes on the line. */
3424 		foreach(cur, cstate->attnumlist)
3425 		{
3426 			int			attnum = lfirst_int(cur);
3427 			int			m = attnum - 1;
3428 
3429 			if (fieldno >= fldct)
3430 				ereport(ERROR,
3431 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3432 						 errmsg("missing data for column \"%s\"",
3433 								NameStr(attr[m]->attname))));
3434 			string = field_strings[fieldno++];
3435 
3436 			if (cstate->convert_select_flags &&
3437 				!cstate->convert_select_flags[m])
3438 			{
3439 				/* ignore input field, leaving column as NULL */
3440 				continue;
3441 			}
3442 
3443 			if (cstate->csv_mode)
3444 			{
3445 				if (string == NULL &&
3446 					cstate->force_notnull_flags[m])
3447 				{
3448 					/*
3449 					 * FORCE_NOT_NULL option is set and column is NULL -
3450 					 * convert it to the NULL string.
3451 					 */
3452 					string = cstate->null_print;
3453 				}
3454 				else if (string != NULL && cstate->force_null_flags[m]
3455 						 && strcmp(string, cstate->null_print) == 0)
3456 				{
3457 					/*
3458 					 * FORCE_NULL option is set and column matches the NULL
3459 					 * string. It must have been quoted, or otherwise the
3460 					 * string would already have been set to NULL. Convert it
3461 					 * to NULL as specified.
3462 					 */
3463 					string = NULL;
3464 				}
3465 			}
3466 
3467 			cstate->cur_attname = NameStr(attr[m]->attname);
3468 			cstate->cur_attval = string;
3469 			values[m] = InputFunctionCall(&in_functions[m],
3470 										  string,
3471 										  typioparams[m],
3472 										  attr[m]->atttypmod);
3473 			if (string != NULL)
3474 				nulls[m] = false;
3475 			cstate->cur_attname = NULL;
3476 			cstate->cur_attval = NULL;
3477 		}
3478 
3479 		Assert(fieldno == nfields);
3480 	}
3481 	else
3482 	{
3483 		/* binary */
3484 		int16		fld_count;
3485 		ListCell   *cur;
3486 
3487 		cstate->cur_lineno++;
3488 
3489 		if (!CopyGetInt16(cstate, &fld_count))
3490 		{
3491 			/* EOF detected (end of file, or protocol-level EOF) */
3492 			return false;
3493 		}
3494 
3495 		if (fld_count == -1)
3496 		{
3497 			/*
3498 			 * Received EOF marker.  In a V3-protocol copy, wait for the
3499 			 * protocol-level EOF, and complain if it doesn't come
3500 			 * immediately.  This ensures that we correctly handle CopyFail,
3501 			 * if client chooses to send that now.
3502 			 *
3503 			 * Note that we MUST NOT try to read more data in an old-protocol
3504 			 * copy, since there is no protocol-level EOF marker then.  We
3505 			 * could go either way for copy from file, but choose to throw
3506 			 * error if there's data after the EOF marker, for consistency
3507 			 * with the new-protocol case.
3508 			 */
3509 			char		dummy;
3510 
3511 			if (cstate->copy_dest != COPY_OLD_FE &&
3512 				CopyGetData(cstate, &dummy, 1, 1) > 0)
3513 				ereport(ERROR,
3514 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3515 						 errmsg("received copy data after EOF marker")));
3516 			return false;
3517 		}
3518 
3519 		if (fld_count != attr_count)
3520 			ereport(ERROR,
3521 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3522 					 errmsg("row field count is %d, expected %d",
3523 							(int) fld_count, attr_count)));
3524 
3525 		if (file_has_oids)
3526 		{
3527 			Oid			loaded_oid;
3528 
3529 			cstate->cur_attname = "oid";
3530 			loaded_oid =
3531 				DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3532 														 0,
3533 														 &cstate->oid_in_function,
3534 														 cstate->oid_typioparam,
3535 														 -1,
3536 														 &isnull));
3537 			if (isnull || loaded_oid == InvalidOid)
3538 				ereport(ERROR,
3539 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3540 						 errmsg("invalid OID in COPY data")));
3541 			cstate->cur_attname = NULL;
3542 			if (cstate->oids && tupleOid != NULL)
3543 				*tupleOid = loaded_oid;
3544 		}
3545 
3546 		i = 0;
3547 		foreach(cur, cstate->attnumlist)
3548 		{
3549 			int			attnum = lfirst_int(cur);
3550 			int			m = attnum - 1;
3551 
3552 			cstate->cur_attname = NameStr(attr[m]->attname);
3553 			i++;
3554 			values[m] = CopyReadBinaryAttribute(cstate,
3555 												i,
3556 												&in_functions[m],
3557 												typioparams[m],
3558 												attr[m]->atttypmod,
3559 												&nulls[m]);
3560 			cstate->cur_attname = NULL;
3561 		}
3562 	}
3563 
3564 	/*
3565 	 * Now compute and insert any defaults available for the columns not
3566 	 * provided by the input data.  Anything not processed here or above will
3567 	 * remain NULL.
3568 	 */
3569 	for (i = 0; i < num_defaults; i++)
3570 	{
3571 		/*
3572 		 * The caller must supply econtext and have switched into the
3573 		 * per-tuple memory context in it.
3574 		 */
3575 		Assert(econtext != NULL);
3576 		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3577 
3578 		values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3579 										 &nulls[defmap[i]]);
3580 	}
3581 
3582 	return true;
3583 }
3584 
3585 /*
3586  * Clean up storage and release resources for COPY FROM.
3587  */
3588 void
EndCopyFrom(CopyState cstate)3589 EndCopyFrom(CopyState cstate)
3590 {
3591 	/* No COPY FROM related resources except memory. */
3592 
3593 	EndCopy(cstate);
3594 }
3595 
3596 /*
3597  * Read the next input line and stash it in line_buf, with conversion to
3598  * server encoding.
3599  *
3600  * Result is true if read was terminated by EOF, false if terminated
3601  * by newline.  The terminating newline or EOF marker is not included
3602  * in the final value of line_buf.
3603  */
3604 static bool
CopyReadLine(CopyState cstate)3605 CopyReadLine(CopyState cstate)
3606 {
3607 	bool		result;
3608 
3609 	resetStringInfo(&cstate->line_buf);
3610 	cstate->line_buf_valid = true;
3611 
3612 	/* Mark that encoding conversion hasn't occurred yet */
3613 	cstate->line_buf_converted = false;
3614 
3615 	/* Parse data and transfer into line_buf */
3616 	result = CopyReadLineText(cstate);
3617 
3618 	if (result)
3619 	{
3620 		/*
3621 		 * Reached EOF.  In protocol version 3, we should ignore anything
3622 		 * after \. up to the protocol end of copy data.  (XXX maybe better
3623 		 * not to treat \. as special?)
3624 		 */
3625 		if (cstate->copy_dest == COPY_NEW_FE)
3626 		{
3627 			do
3628 			{
3629 				cstate->raw_buf_index = cstate->raw_buf_len;
3630 			} while (CopyLoadRawBuf(cstate));
3631 		}
3632 	}
3633 	else
3634 	{
3635 		/*
3636 		 * If we didn't hit EOF, then we must have transferred the EOL marker
3637 		 * to line_buf along with the data.  Get rid of it.
3638 		 */
3639 		switch (cstate->eol_type)
3640 		{
3641 			case EOL_NL:
3642 				Assert(cstate->line_buf.len >= 1);
3643 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3644 				cstate->line_buf.len--;
3645 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3646 				break;
3647 			case EOL_CR:
3648 				Assert(cstate->line_buf.len >= 1);
3649 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3650 				cstate->line_buf.len--;
3651 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3652 				break;
3653 			case EOL_CRNL:
3654 				Assert(cstate->line_buf.len >= 2);
3655 				Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3656 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3657 				cstate->line_buf.len -= 2;
3658 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3659 				break;
3660 			case EOL_UNKNOWN:
3661 				/* shouldn't get here */
3662 				Assert(false);
3663 				break;
3664 		}
3665 	}
3666 
3667 	/* Done reading the line.  Convert it to server encoding. */
3668 	if (cstate->need_transcoding)
3669 	{
3670 		char	   *cvt;
3671 
3672 		cvt = pg_any_to_server(cstate->line_buf.data,
3673 							   cstate->line_buf.len,
3674 							   cstate->file_encoding);
3675 		if (cvt != cstate->line_buf.data)
3676 		{
3677 			/* transfer converted data back to line_buf */
3678 			resetStringInfo(&cstate->line_buf);
3679 			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3680 			pfree(cvt);
3681 		}
3682 	}
3683 
3684 	/* Now it's safe to use the buffer in error messages */
3685 	cstate->line_buf_converted = true;
3686 
3687 	return result;
3688 }
3689 
3690 /*
3691  * CopyReadLineText - inner loop of CopyReadLine for text mode
3692  */
3693 static bool
CopyReadLineText(CopyState cstate)3694 CopyReadLineText(CopyState cstate)
3695 {
3696 	char	   *copy_raw_buf;
3697 	int			raw_buf_ptr;
3698 	int			copy_buf_len;
3699 	bool		need_data = false;
3700 	bool		hit_eof = false;
3701 	bool		result = false;
3702 	char		mblen_str[2];
3703 
3704 	/* CSV variables */
3705 	bool		first_char_in_line = true;
3706 	bool		in_quote = false,
3707 				last_was_esc = false;
3708 	char		quotec = '\0';
3709 	char		escapec = '\0';
3710 
3711 	if (cstate->csv_mode)
3712 	{
3713 		quotec = cstate->quote[0];
3714 		escapec = cstate->escape[0];
3715 		/* ignore special escape processing if it's the same as quotec */
3716 		if (quotec == escapec)
3717 			escapec = '\0';
3718 	}
3719 
3720 	mblen_str[1] = '\0';
3721 
3722 	/*
3723 	 * The objective of this loop is to transfer the entire next input line
3724 	 * into line_buf.  Hence, we only care for detecting newlines (\r and/or
3725 	 * \n) and the end-of-copy marker (\.).
3726 	 *
3727 	 * In CSV mode, \r and \n inside a quoted field are just part of the data
3728 	 * value and are put in line_buf.  We keep just enough state to know if we
3729 	 * are currently in a quoted field or not.
3730 	 *
3731 	 * These four characters, and the CSV escape and quote characters, are
3732 	 * assumed the same in frontend and backend encodings.
3733 	 *
3734 	 * For speed, we try to move data from raw_buf to line_buf in chunks
3735 	 * rather than one character at a time.  raw_buf_ptr points to the next
3736 	 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3737 	 * have been determined to be part of the line, but not yet transferred to
3738 	 * line_buf.
3739 	 *
3740 	 * For a little extra speed within the loop, we copy raw_buf and
3741 	 * raw_buf_len into local variables.
3742 	 */
3743 	copy_raw_buf = cstate->raw_buf;
3744 	raw_buf_ptr = cstate->raw_buf_index;
3745 	copy_buf_len = cstate->raw_buf_len;
3746 
3747 	for (;;)
3748 	{
3749 		int			prev_raw_ptr;
3750 		char		c;
3751 
3752 		/*
3753 		 * Load more data if needed.  Ideally we would just force four bytes
3754 		 * of read-ahead and avoid the many calls to
3755 		 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3756 		 * does not allow us to read too far ahead or we might read into the
3757 		 * next data, so we read-ahead only as far we know we can.  One
3758 		 * optimization would be to read-ahead four byte here if
3759 		 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3760 		 * considering the size of the buffer.
3761 		 */
3762 		if (raw_buf_ptr >= copy_buf_len || need_data)
3763 		{
3764 			REFILL_LINEBUF;
3765 
3766 			/*
3767 			 * Try to read some more data.  This will certainly reset
3768 			 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3769 			 */
3770 			if (!CopyLoadRawBuf(cstate))
3771 				hit_eof = true;
3772 			raw_buf_ptr = 0;
3773 			copy_buf_len = cstate->raw_buf_len;
3774 
3775 			/*
3776 			 * If we are completely out of data, break out of the loop,
3777 			 * reporting EOF.
3778 			 */
3779 			if (copy_buf_len <= 0)
3780 			{
3781 				result = true;
3782 				break;
3783 			}
3784 			need_data = false;
3785 		}
3786 
3787 		/* OK to fetch a character */
3788 		prev_raw_ptr = raw_buf_ptr;
3789 		c = copy_raw_buf[raw_buf_ptr++];
3790 
3791 		if (cstate->csv_mode)
3792 		{
3793 			/*
3794 			 * If character is '\\' or '\r', we may need to look ahead below.
3795 			 * Force fetch of the next character if we don't already have it.
3796 			 * We need to do this before changing CSV state, in case one of
3797 			 * these characters is also the quote or escape character.
3798 			 *
3799 			 * Note: old-protocol does not like forced prefetch, but it's OK
3800 			 * here since we cannot validly be at EOF.
3801 			 */
3802 			if (c == '\\' || c == '\r')
3803 			{
3804 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3805 			}
3806 
3807 			/*
3808 			 * Dealing with quotes and escapes here is mildly tricky. If the
3809 			 * quote char is also the escape char, there's no problem - we
3810 			 * just use the char as a toggle. If they are different, we need
3811 			 * to ensure that we only take account of an escape inside a
3812 			 * quoted field and immediately preceding a quote char, and not
3813 			 * the second in an escape-escape sequence.
3814 			 */
3815 			if (in_quote && c == escapec)
3816 				last_was_esc = !last_was_esc;
3817 			if (c == quotec && !last_was_esc)
3818 				in_quote = !in_quote;
3819 			if (c != escapec)
3820 				last_was_esc = false;
3821 
3822 			/*
3823 			 * Updating the line count for embedded CR and/or LF chars is
3824 			 * necessarily a little fragile - this test is probably about the
3825 			 * best we can do.  (XXX it's arguable whether we should do this
3826 			 * at all --- is cur_lineno a physical or logical count?)
3827 			 */
3828 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3829 				cstate->cur_lineno++;
3830 		}
3831 
3832 		/* Process \r */
3833 		if (c == '\r' && (!cstate->csv_mode || !in_quote))
3834 		{
3835 			/* Check for \r\n on first line, _and_ handle \r\n. */
3836 			if (cstate->eol_type == EOL_UNKNOWN ||
3837 				cstate->eol_type == EOL_CRNL)
3838 			{
3839 				/*
3840 				 * If need more data, go back to loop top to load it.
3841 				 *
3842 				 * Note that if we are at EOF, c will wind up as '\0' because
3843 				 * of the guaranteed pad of raw_buf.
3844 				 */
3845 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3846 
3847 				/* get next char */
3848 				c = copy_raw_buf[raw_buf_ptr];
3849 
3850 				if (c == '\n')
3851 				{
3852 					raw_buf_ptr++;	/* eat newline */
3853 					cstate->eol_type = EOL_CRNL;	/* in case not set yet */
3854 				}
3855 				else
3856 				{
3857 					/* found \r, but no \n */
3858 					if (cstate->eol_type == EOL_CRNL)
3859 						ereport(ERROR,
3860 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3861 								 !cstate->csv_mode ?
3862 								 errmsg("literal carriage return found in data") :
3863 								 errmsg("unquoted carriage return found in data"),
3864 								 !cstate->csv_mode ?
3865 								 errhint("Use \"\\r\" to represent carriage return.") :
3866 								 errhint("Use quoted CSV field to represent carriage return.")));
3867 
3868 					/*
3869 					 * if we got here, it is the first line and we didn't find
3870 					 * \n, so don't consume the peeked character
3871 					 */
3872 					cstate->eol_type = EOL_CR;
3873 				}
3874 			}
3875 			else if (cstate->eol_type == EOL_NL)
3876 				ereport(ERROR,
3877 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3878 						 !cstate->csv_mode ?
3879 						 errmsg("literal carriage return found in data") :
3880 						 errmsg("unquoted carriage return found in data"),
3881 						 !cstate->csv_mode ?
3882 						 errhint("Use \"\\r\" to represent carriage return.") :
3883 						 errhint("Use quoted CSV field to represent carriage return.")));
3884 			/* If reach here, we have found the line terminator */
3885 			break;
3886 		}
3887 
3888 		/* Process \n */
3889 		if (c == '\n' && (!cstate->csv_mode || !in_quote))
3890 		{
3891 			if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3892 				ereport(ERROR,
3893 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3894 						 !cstate->csv_mode ?
3895 						 errmsg("literal newline found in data") :
3896 						 errmsg("unquoted newline found in data"),
3897 						 !cstate->csv_mode ?
3898 						 errhint("Use \"\\n\" to represent newline.") :
3899 						 errhint("Use quoted CSV field to represent newline.")));
3900 			cstate->eol_type = EOL_NL;	/* in case not set yet */
3901 			/* If reach here, we have found the line terminator */
3902 			break;
3903 		}
3904 
3905 		/*
3906 		 * In CSV mode, we only recognize \. alone on a line.  This is because
3907 		 * \. is a valid CSV data value.
3908 		 */
3909 		if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3910 		{
3911 			char		c2;
3912 
3913 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3914 			IF_NEED_REFILL_AND_EOF_BREAK(0);
3915 
3916 			/* -----
3917 			 * get next character
3918 			 * Note: we do not change c so if it isn't \., we can fall
3919 			 * through and continue processing for file encoding.
3920 			 * -----
3921 			 */
3922 			c2 = copy_raw_buf[raw_buf_ptr];
3923 
3924 			if (c2 == '.')
3925 			{
3926 				raw_buf_ptr++;	/* consume the '.' */
3927 
3928 				/*
3929 				 * Note: if we loop back for more data here, it does not
3930 				 * matter that the CSV state change checks are re-executed; we
3931 				 * will come back here with no important state changed.
3932 				 */
3933 				if (cstate->eol_type == EOL_CRNL)
3934 				{
3935 					/* Get the next character */
3936 					IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3937 					/* if hit_eof, c2 will become '\0' */
3938 					c2 = copy_raw_buf[raw_buf_ptr++];
3939 
3940 					if (c2 == '\n')
3941 					{
3942 						if (!cstate->csv_mode)
3943 							ereport(ERROR,
3944 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3945 									 errmsg("end-of-copy marker does not match previous newline style")));
3946 						else
3947 							NO_END_OF_COPY_GOTO;
3948 					}
3949 					else if (c2 != '\r')
3950 					{
3951 						if (!cstate->csv_mode)
3952 							ereport(ERROR,
3953 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3954 									 errmsg("end-of-copy marker corrupt")));
3955 						else
3956 							NO_END_OF_COPY_GOTO;
3957 					}
3958 				}
3959 
3960 				/* Get the next character */
3961 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3962 				/* if hit_eof, c2 will become '\0' */
3963 				c2 = copy_raw_buf[raw_buf_ptr++];
3964 
3965 				if (c2 != '\r' && c2 != '\n')
3966 				{
3967 					if (!cstate->csv_mode)
3968 						ereport(ERROR,
3969 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3970 								 errmsg("end-of-copy marker corrupt")));
3971 					else
3972 						NO_END_OF_COPY_GOTO;
3973 				}
3974 
3975 				if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3976 					(cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3977 					(cstate->eol_type == EOL_CR && c2 != '\r'))
3978 				{
3979 					ereport(ERROR,
3980 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3981 							 errmsg("end-of-copy marker does not match previous newline style")));
3982 				}
3983 
3984 				/*
3985 				 * Transfer only the data before the \. into line_buf, then
3986 				 * discard the data and the \. sequence.
3987 				 */
3988 				if (prev_raw_ptr > cstate->raw_buf_index)
3989 					appendBinaryStringInfo(&cstate->line_buf,
3990 										   cstate->raw_buf + cstate->raw_buf_index,
3991 										   prev_raw_ptr - cstate->raw_buf_index);
3992 				cstate->raw_buf_index = raw_buf_ptr;
3993 				result = true;	/* report EOF */
3994 				break;
3995 			}
3996 			else if (!cstate->csv_mode)
3997 			{
3998 				/*
3999 				 * If we are here, it means we found a backslash followed by
4000 				 * something other than a period.  In non-CSV mode, anything
4001 				 * after a backslash is special, so we skip over that second
4002 				 * character too.  If we didn't do that \\. would be
4003 				 * considered an eof-of copy, while in non-CSV mode it is a
4004 				 * literal backslash followed by a period.  In CSV mode,
4005 				 * backslashes are not special, so we want to process the
4006 				 * character after the backslash just like a normal character,
4007 				 * so we don't increment in those cases.
4008 				 *
4009 				 * Set 'c' to skip whole character correctly in multi-byte
4010 				 * encodings.  If we don't have the whole character in the
4011 				 * buffer yet, we might loop back to process it, after all,
4012 				 * but that's OK because multi-byte characters cannot have any
4013 				 * special meaning.
4014 				 */
4015 				raw_buf_ptr++;
4016 				c = c2;
4017 			}
4018 		}
4019 
4020 		/*
4021 		 * This label is for CSV cases where \. appears at the start of a
4022 		 * line, but there is more text after it, meaning it was a data value.
4023 		 * We are more strict for \. in CSV mode because \. could be a data
4024 		 * value, while in non-CSV mode, \. cannot be a data value.
4025 		 */
4026 not_end_of_copy:
4027 
4028 		/*
4029 		 * Process all bytes of a multi-byte character as a group.
4030 		 *
4031 		 * We only support multi-byte sequences where the first byte has the
4032 		 * high-bit set, so as an optimization we can avoid this block
4033 		 * entirely if it is not set.
4034 		 */
4035 		if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4036 		{
4037 			int			mblen;
4038 
4039 			mblen_str[0] = c;
4040 			/* All our encodings only read the first byte to get the length */
4041 			mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4042 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
4043 			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4044 			raw_buf_ptr += mblen - 1;
4045 		}
4046 		first_char_in_line = false;
4047 	}							/* end of outer loop */
4048 
4049 	/*
4050 	 * Transfer any still-uncopied data to line_buf.
4051 	 */
4052 	REFILL_LINEBUF;
4053 
4054 	return result;
4055 }
4056 
4057 /*
4058  *	Return decimal value for a hexadecimal digit
4059  */
4060 static int
GetDecimalFromHex(char hex)4061 GetDecimalFromHex(char hex)
4062 {
4063 	if (isdigit((unsigned char) hex))
4064 		return hex - '0';
4065 	else
4066 		return tolower((unsigned char) hex) - 'a' + 10;
4067 }
4068 
4069 /*
4070  * Parse the current line into separate attributes (fields),
4071  * performing de-escaping as needed.
4072  *
4073  * The input is in line_buf.  We use attribute_buf to hold the result
4074  * strings.  cstate->raw_fields[k] is set to point to the k'th attribute
4075  * string, or NULL when the input matches the null marker string.
4076  * This array is expanded as necessary.
4077  *
4078  * (Note that the caller cannot check for nulls since the returned
4079  * string would be the post-de-escaping equivalent, which may look
4080  * the same as some valid data string.)
4081  *
4082  * delim is the column delimiter string (must be just one byte for now).
4083  * null_print is the null marker string.  Note that this is compared to
4084  * the pre-de-escaped input string.
4085  *
4086  * The return value is the number of fields actually read.
4087  */
4088 static int
CopyReadAttributesText(CopyState cstate)4089 CopyReadAttributesText(CopyState cstate)
4090 {
4091 	char		delimc = cstate->delim[0];
4092 	int			fieldno;
4093 	char	   *output_ptr;
4094 	char	   *cur_ptr;
4095 	char	   *line_end_ptr;
4096 
4097 	/*
4098 	 * We need a special case for zero-column tables: check that the input
4099 	 * line is empty, and return.
4100 	 */
4101 	if (cstate->max_fields <= 0)
4102 	{
4103 		if (cstate->line_buf.len != 0)
4104 			ereport(ERROR,
4105 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4106 					 errmsg("extra data after last expected column")));
4107 		return 0;
4108 	}
4109 
4110 	resetStringInfo(&cstate->attribute_buf);
4111 
4112 	/*
4113 	 * The de-escaped attributes will certainly not be longer than the input
4114 	 * data line, so we can just force attribute_buf to be large enough and
4115 	 * then transfer data without any checks for enough space.  We need to do
4116 	 * it this way because enlarging attribute_buf mid-stream would invalidate
4117 	 * pointers already stored into cstate->raw_fields[].
4118 	 */
4119 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4120 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4121 	output_ptr = cstate->attribute_buf.data;
4122 
4123 	/* set pointer variables for loop */
4124 	cur_ptr = cstate->line_buf.data;
4125 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4126 
4127 	/* Outer loop iterates over fields */
4128 	fieldno = 0;
4129 	for (;;)
4130 	{
4131 		bool		found_delim = false;
4132 		char	   *start_ptr;
4133 		char	   *end_ptr;
4134 		int			input_len;
4135 		bool		saw_non_ascii = false;
4136 
4137 		/* Make sure there is enough space for the next value */
4138 		if (fieldno >= cstate->max_fields)
4139 		{
4140 			cstate->max_fields *= 2;
4141 			cstate->raw_fields =
4142 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4143 		}
4144 
4145 		/* Remember start of field on both input and output sides */
4146 		start_ptr = cur_ptr;
4147 		cstate->raw_fields[fieldno] = output_ptr;
4148 
4149 		/*
4150 		 * Scan data for field.
4151 		 *
4152 		 * Note that in this loop, we are scanning to locate the end of field
4153 		 * and also speculatively performing de-escaping.  Once we find the
4154 		 * end-of-field, we can match the raw field contents against the null
4155 		 * marker string.  Only after that comparison fails do we know that
4156 		 * de-escaping is actually the right thing to do; therefore we *must
4157 		 * not* throw any syntax errors before we've done the null-marker
4158 		 * check.
4159 		 */
4160 		for (;;)
4161 		{
4162 			char		c;
4163 
4164 			end_ptr = cur_ptr;
4165 			if (cur_ptr >= line_end_ptr)
4166 				break;
4167 			c = *cur_ptr++;
4168 			if (c == delimc)
4169 			{
4170 				found_delim = true;
4171 				break;
4172 			}
4173 			if (c == '\\')
4174 			{
4175 				if (cur_ptr >= line_end_ptr)
4176 					break;
4177 				c = *cur_ptr++;
4178 				switch (c)
4179 				{
4180 					case '0':
4181 					case '1':
4182 					case '2':
4183 					case '3':
4184 					case '4':
4185 					case '5':
4186 					case '6':
4187 					case '7':
4188 						{
4189 							/* handle \013 */
4190 							int			val;
4191 
4192 							val = OCTVALUE(c);
4193 							if (cur_ptr < line_end_ptr)
4194 							{
4195 								c = *cur_ptr;
4196 								if (ISOCTAL(c))
4197 								{
4198 									cur_ptr++;
4199 									val = (val << 3) + OCTVALUE(c);
4200 									if (cur_ptr < line_end_ptr)
4201 									{
4202 										c = *cur_ptr;
4203 										if (ISOCTAL(c))
4204 										{
4205 											cur_ptr++;
4206 											val = (val << 3) + OCTVALUE(c);
4207 										}
4208 									}
4209 								}
4210 							}
4211 							c = val & 0377;
4212 							if (c == '\0' || IS_HIGHBIT_SET(c))
4213 								saw_non_ascii = true;
4214 						}
4215 						break;
4216 					case 'x':
4217 						/* Handle \x3F */
4218 						if (cur_ptr < line_end_ptr)
4219 						{
4220 							char		hexchar = *cur_ptr;
4221 
4222 							if (isxdigit((unsigned char) hexchar))
4223 							{
4224 								int			val = GetDecimalFromHex(hexchar);
4225 
4226 								cur_ptr++;
4227 								if (cur_ptr < line_end_ptr)
4228 								{
4229 									hexchar = *cur_ptr;
4230 									if (isxdigit((unsigned char) hexchar))
4231 									{
4232 										cur_ptr++;
4233 										val = (val << 4) + GetDecimalFromHex(hexchar);
4234 									}
4235 								}
4236 								c = val & 0xff;
4237 								if (c == '\0' || IS_HIGHBIT_SET(c))
4238 									saw_non_ascii = true;
4239 							}
4240 						}
4241 						break;
4242 					case 'b':
4243 						c = '\b';
4244 						break;
4245 					case 'f':
4246 						c = '\f';
4247 						break;
4248 					case 'n':
4249 						c = '\n';
4250 						break;
4251 					case 'r':
4252 						c = '\r';
4253 						break;
4254 					case 't':
4255 						c = '\t';
4256 						break;
4257 					case 'v':
4258 						c = '\v';
4259 						break;
4260 
4261 						/*
4262 						 * in all other cases, take the char after '\'
4263 						 * literally
4264 						 */
4265 				}
4266 			}
4267 
4268 			/* Add c to output string */
4269 			*output_ptr++ = c;
4270 		}
4271 
4272 		/* Check whether raw input matched null marker */
4273 		input_len = end_ptr - start_ptr;
4274 		if (input_len == cstate->null_print_len &&
4275 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
4276 			cstate->raw_fields[fieldno] = NULL;
4277 		else
4278 		{
4279 			/*
4280 			 * At this point we know the field is supposed to contain data.
4281 			 *
4282 			 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4283 			 * resulting string is valid data for the db encoding.
4284 			 */
4285 			if (saw_non_ascii)
4286 			{
4287 				char	   *fld = cstate->raw_fields[fieldno];
4288 
4289 				pg_verifymbstr(fld, output_ptr - fld, false);
4290 			}
4291 		}
4292 
4293 		/* Terminate attribute value in output area */
4294 		*output_ptr++ = '\0';
4295 
4296 		fieldno++;
4297 		/* Done if we hit EOL instead of a delim */
4298 		if (!found_delim)
4299 			break;
4300 	}
4301 
4302 	/* Clean up state of attribute_buf */
4303 	output_ptr--;
4304 	Assert(*output_ptr == '\0');
4305 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4306 
4307 	return fieldno;
4308 }
4309 
4310 /*
4311  * Parse the current line into separate attributes (fields),
4312  * performing de-escaping as needed.  This has exactly the same API as
4313  * CopyReadAttributesText, except we parse the fields according to
4314  * "standard" (i.e. common) CSV usage.
4315  */
4316 static int
CopyReadAttributesCSV(CopyState cstate)4317 CopyReadAttributesCSV(CopyState cstate)
4318 {
4319 	char		delimc = cstate->delim[0];
4320 	char		quotec = cstate->quote[0];
4321 	char		escapec = cstate->escape[0];
4322 	int			fieldno;
4323 	char	   *output_ptr;
4324 	char	   *cur_ptr;
4325 	char	   *line_end_ptr;
4326 
4327 	/*
4328 	 * We need a special case for zero-column tables: check that the input
4329 	 * line is empty, and return.
4330 	 */
4331 	if (cstate->max_fields <= 0)
4332 	{
4333 		if (cstate->line_buf.len != 0)
4334 			ereport(ERROR,
4335 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4336 					 errmsg("extra data after last expected column")));
4337 		return 0;
4338 	}
4339 
4340 	resetStringInfo(&cstate->attribute_buf);
4341 
4342 	/*
4343 	 * The de-escaped attributes will certainly not be longer than the input
4344 	 * data line, so we can just force attribute_buf to be large enough and
4345 	 * then transfer data without any checks for enough space.  We need to do
4346 	 * it this way because enlarging attribute_buf mid-stream would invalidate
4347 	 * pointers already stored into cstate->raw_fields[].
4348 	 */
4349 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4350 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4351 	output_ptr = cstate->attribute_buf.data;
4352 
4353 	/* set pointer variables for loop */
4354 	cur_ptr = cstate->line_buf.data;
4355 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4356 
4357 	/* Outer loop iterates over fields */
4358 	fieldno = 0;
4359 	for (;;)
4360 	{
4361 		bool		found_delim = false;
4362 		bool		saw_quote = false;
4363 		char	   *start_ptr;
4364 		char	   *end_ptr;
4365 		int			input_len;
4366 
4367 		/* Make sure there is enough space for the next value */
4368 		if (fieldno >= cstate->max_fields)
4369 		{
4370 			cstate->max_fields *= 2;
4371 			cstate->raw_fields =
4372 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4373 		}
4374 
4375 		/* Remember start of field on both input and output sides */
4376 		start_ptr = cur_ptr;
4377 		cstate->raw_fields[fieldno] = output_ptr;
4378 
4379 		/*
4380 		 * Scan data for field,
4381 		 *
4382 		 * The loop starts in "not quote" mode and then toggles between that
4383 		 * and "in quote" mode. The loop exits normally if it is in "not
4384 		 * quote" mode and a delimiter or line end is seen.
4385 		 */
4386 		for (;;)
4387 		{
4388 			char		c;
4389 
4390 			/* Not in quote */
4391 			for (;;)
4392 			{
4393 				end_ptr = cur_ptr;
4394 				if (cur_ptr >= line_end_ptr)
4395 					goto endfield;
4396 				c = *cur_ptr++;
4397 				/* unquoted field delimiter */
4398 				if (c == delimc)
4399 				{
4400 					found_delim = true;
4401 					goto endfield;
4402 				}
4403 				/* start of quoted field (or part of field) */
4404 				if (c == quotec)
4405 				{
4406 					saw_quote = true;
4407 					break;
4408 				}
4409 				/* Add c to output string */
4410 				*output_ptr++ = c;
4411 			}
4412 
4413 			/* In quote */
4414 			for (;;)
4415 			{
4416 				end_ptr = cur_ptr;
4417 				if (cur_ptr >= line_end_ptr)
4418 					ereport(ERROR,
4419 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4420 							 errmsg("unterminated CSV quoted field")));
4421 
4422 				c = *cur_ptr++;
4423 
4424 				/* escape within a quoted field */
4425 				if (c == escapec)
4426 				{
4427 					/*
4428 					 * peek at the next char if available, and escape it if it
4429 					 * is an escape char or a quote char
4430 					 */
4431 					if (cur_ptr < line_end_ptr)
4432 					{
4433 						char		nextc = *cur_ptr;
4434 
4435 						if (nextc == escapec || nextc == quotec)
4436 						{
4437 							*output_ptr++ = nextc;
4438 							cur_ptr++;
4439 							continue;
4440 						}
4441 					}
4442 				}
4443 
4444 				/*
4445 				 * end of quoted field. Must do this test after testing for
4446 				 * escape in case quote char and escape char are the same
4447 				 * (which is the common case).
4448 				 */
4449 				if (c == quotec)
4450 					break;
4451 
4452 				/* Add c to output string */
4453 				*output_ptr++ = c;
4454 			}
4455 		}
4456 endfield:
4457 
4458 		/* Terminate attribute value in output area */
4459 		*output_ptr++ = '\0';
4460 
4461 		/* Check whether raw input matched null marker */
4462 		input_len = end_ptr - start_ptr;
4463 		if (!saw_quote && input_len == cstate->null_print_len &&
4464 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
4465 			cstate->raw_fields[fieldno] = NULL;
4466 
4467 		fieldno++;
4468 		/* Done if we hit EOL instead of a delim */
4469 		if (!found_delim)
4470 			break;
4471 	}
4472 
4473 	/* Clean up state of attribute_buf */
4474 	output_ptr--;
4475 	Assert(*output_ptr == '\0');
4476 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4477 
4478 	return fieldno;
4479 }
4480 
4481 
4482 /*
4483  * Read a binary attribute
4484  */
4485 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4486 CopyReadBinaryAttribute(CopyState cstate,
4487 						int column_no, FmgrInfo *flinfo,
4488 						Oid typioparam, int32 typmod,
4489 						bool *isnull)
4490 {
4491 	int32		fld_size;
4492 	Datum		result;
4493 
4494 	if (!CopyGetInt32(cstate, &fld_size))
4495 		ereport(ERROR,
4496 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4497 				 errmsg("unexpected EOF in COPY data")));
4498 	if (fld_size == -1)
4499 	{
4500 		*isnull = true;
4501 		return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4502 	}
4503 	if (fld_size < 0)
4504 		ereport(ERROR,
4505 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4506 				 errmsg("invalid field size")));
4507 
4508 	/* reset attribute_buf to empty, and load raw data in it */
4509 	resetStringInfo(&cstate->attribute_buf);
4510 
4511 	enlargeStringInfo(&cstate->attribute_buf, fld_size);
4512 	if (CopyGetData(cstate, cstate->attribute_buf.data,
4513 					fld_size, fld_size) != fld_size)
4514 		ereport(ERROR,
4515 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4516 				 errmsg("unexpected EOF in COPY data")));
4517 
4518 	cstate->attribute_buf.len = fld_size;
4519 	cstate->attribute_buf.data[fld_size] = '\0';
4520 
4521 	/* Call the column type's binary input converter */
4522 	result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4523 								 typioparam, typmod);
4524 
4525 	/* Trouble if it didn't eat the whole buffer */
4526 	if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4527 		ereport(ERROR,
4528 				(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4529 				 errmsg("incorrect binary data format")));
4530 
4531 	*isnull = false;
4532 	return result;
4533 }
4534 
4535 /*
4536  * Send text representation of one attribute, with conversion and escaping
4537  */
4538 #define DUMPSOFAR() \
4539 	do { \
4540 		if (ptr > start) \
4541 			CopySendData(cstate, start, ptr - start); \
4542 	} while (0)
4543 
4544 static void
CopyAttributeOutText(CopyState cstate,char * string)4545 CopyAttributeOutText(CopyState cstate, char *string)
4546 {
4547 	char	   *ptr;
4548 	char	   *start;
4549 	char		c;
4550 	char		delimc = cstate->delim[0];
4551 
4552 	if (cstate->need_transcoding)
4553 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4554 	else
4555 		ptr = string;
4556 
4557 	/*
4558 	 * We have to grovel through the string searching for control characters
4559 	 * and instances of the delimiter character.  In most cases, though, these
4560 	 * are infrequent.  To avoid overhead from calling CopySendData once per
4561 	 * character, we dump out all characters between escaped characters in a
4562 	 * single call.  The loop invariant is that the data from "start" to "ptr"
4563 	 * can be sent literally, but hasn't yet been.
4564 	 *
4565 	 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4566 	 * in valid backend encodings, extra bytes of a multibyte character never
4567 	 * look like ASCII.  This loop is sufficiently performance-critical that
4568 	 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4569 	 * of the normal safe-encoding path.
4570 	 */
4571 	if (cstate->encoding_embeds_ascii)
4572 	{
4573 		start = ptr;
4574 		while ((c = *ptr) != '\0')
4575 		{
4576 			if ((unsigned char) c < (unsigned char) 0x20)
4577 			{
4578 				/*
4579 				 * \r and \n must be escaped, the others are traditional. We
4580 				 * prefer to dump these using the C-like notation, rather than
4581 				 * a backslash and the literal character, because it makes the
4582 				 * dump file a bit more proof against Microsoftish data
4583 				 * mangling.
4584 				 */
4585 				switch (c)
4586 				{
4587 					case '\b':
4588 						c = 'b';
4589 						break;
4590 					case '\f':
4591 						c = 'f';
4592 						break;
4593 					case '\n':
4594 						c = 'n';
4595 						break;
4596 					case '\r':
4597 						c = 'r';
4598 						break;
4599 					case '\t':
4600 						c = 't';
4601 						break;
4602 					case '\v':
4603 						c = 'v';
4604 						break;
4605 					default:
4606 						/* If it's the delimiter, must backslash it */
4607 						if (c == delimc)
4608 							break;
4609 						/* All ASCII control chars are length 1 */
4610 						ptr++;
4611 						continue;	/* fall to end of loop */
4612 				}
4613 				/* if we get here, we need to convert the control char */
4614 				DUMPSOFAR();
4615 				CopySendChar(cstate, '\\');
4616 				CopySendChar(cstate, c);
4617 				start = ++ptr;	/* do not include char in next run */
4618 			}
4619 			else if (c == '\\' || c == delimc)
4620 			{
4621 				DUMPSOFAR();
4622 				CopySendChar(cstate, '\\');
4623 				start = ptr++;	/* we include char in next run */
4624 			}
4625 			else if (IS_HIGHBIT_SET(c))
4626 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4627 			else
4628 				ptr++;
4629 		}
4630 	}
4631 	else
4632 	{
4633 		start = ptr;
4634 		while ((c = *ptr) != '\0')
4635 		{
4636 			if ((unsigned char) c < (unsigned char) 0x20)
4637 			{
4638 				/*
4639 				 * \r and \n must be escaped, the others are traditional. We
4640 				 * prefer to dump these using the C-like notation, rather than
4641 				 * a backslash and the literal character, because it makes the
4642 				 * dump file a bit more proof against Microsoftish data
4643 				 * mangling.
4644 				 */
4645 				switch (c)
4646 				{
4647 					case '\b':
4648 						c = 'b';
4649 						break;
4650 					case '\f':
4651 						c = 'f';
4652 						break;
4653 					case '\n':
4654 						c = 'n';
4655 						break;
4656 					case '\r':
4657 						c = 'r';
4658 						break;
4659 					case '\t':
4660 						c = 't';
4661 						break;
4662 					case '\v':
4663 						c = 'v';
4664 						break;
4665 					default:
4666 						/* If it's the delimiter, must backslash it */
4667 						if (c == delimc)
4668 							break;
4669 						/* All ASCII control chars are length 1 */
4670 						ptr++;
4671 						continue;	/* fall to end of loop */
4672 				}
4673 				/* if we get here, we need to convert the control char */
4674 				DUMPSOFAR();
4675 				CopySendChar(cstate, '\\');
4676 				CopySendChar(cstate, c);
4677 				start = ++ptr;	/* do not include char in next run */
4678 			}
4679 			else if (c == '\\' || c == delimc)
4680 			{
4681 				DUMPSOFAR();
4682 				CopySendChar(cstate, '\\');
4683 				start = ptr++;	/* we include char in next run */
4684 			}
4685 			else
4686 				ptr++;
4687 		}
4688 	}
4689 
4690 	DUMPSOFAR();
4691 }
4692 
4693 /*
4694  * Send text representation of one attribute, with conversion and
4695  * CSV-style escaping
4696  */
4697 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4698 CopyAttributeOutCSV(CopyState cstate, char *string,
4699 					bool use_quote, bool single_attr)
4700 {
4701 	char	   *ptr;
4702 	char	   *start;
4703 	char		c;
4704 	char		delimc = cstate->delim[0];
4705 	char		quotec = cstate->quote[0];
4706 	char		escapec = cstate->escape[0];
4707 
4708 	/* force quoting if it matches null_print (before conversion!) */
4709 	if (!use_quote && strcmp(string, cstate->null_print) == 0)
4710 		use_quote = true;
4711 
4712 	if (cstate->need_transcoding)
4713 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4714 	else
4715 		ptr = string;
4716 
4717 	/*
4718 	 * Make a preliminary pass to discover if it needs quoting
4719 	 */
4720 	if (!use_quote)
4721 	{
4722 		/*
4723 		 * Because '\.' can be a data value, quote it if it appears alone on a
4724 		 * line so it is not interpreted as the end-of-data marker.
4725 		 */
4726 		if (single_attr && strcmp(ptr, "\\.") == 0)
4727 			use_quote = true;
4728 		else
4729 		{
4730 			char	   *tptr = ptr;
4731 
4732 			while ((c = *tptr) != '\0')
4733 			{
4734 				if (c == delimc || c == quotec || c == '\n' || c == '\r')
4735 				{
4736 					use_quote = true;
4737 					break;
4738 				}
4739 				if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4740 					tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4741 				else
4742 					tptr++;
4743 			}
4744 		}
4745 	}
4746 
4747 	if (use_quote)
4748 	{
4749 		CopySendChar(cstate, quotec);
4750 
4751 		/*
4752 		 * We adopt the same optimization strategy as in CopyAttributeOutText
4753 		 */
4754 		start = ptr;
4755 		while ((c = *ptr) != '\0')
4756 		{
4757 			if (c == quotec || c == escapec)
4758 			{
4759 				DUMPSOFAR();
4760 				CopySendChar(cstate, escapec);
4761 				start = ptr;	/* we include char in next run */
4762 			}
4763 			if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4764 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4765 			else
4766 				ptr++;
4767 		}
4768 		DUMPSOFAR();
4769 
4770 		CopySendChar(cstate, quotec);
4771 	}
4772 	else
4773 	{
4774 		/* If it doesn't need quoting, we can just dump it as-is */
4775 		CopySendString(cstate, ptr);
4776 	}
4777 }
4778 
4779 /*
4780  * CopyGetAttnums - build an integer list of attnums to be copied
4781  *
4782  * The input attnamelist is either the user-specified column list,
4783  * or NIL if there was none (in which case we want all the non-dropped
4784  * columns).
4785  *
4786  * rel can be NULL ... it's only used for error reports.
4787  */
4788 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4789 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4790 {
4791 	List	   *attnums = NIL;
4792 
4793 	if (attnamelist == NIL)
4794 	{
4795 		/* Generate default column list */
4796 		Form_pg_attribute *attr = tupDesc->attrs;
4797 		int			attr_count = tupDesc->natts;
4798 		int			i;
4799 
4800 		for (i = 0; i < attr_count; i++)
4801 		{
4802 			if (attr[i]->attisdropped)
4803 				continue;
4804 			attnums = lappend_int(attnums, i + 1);
4805 		}
4806 	}
4807 	else
4808 	{
4809 		/* Validate the user-supplied list and extract attnums */
4810 		ListCell   *l;
4811 
4812 		foreach(l, attnamelist)
4813 		{
4814 			char	   *name = strVal(lfirst(l));
4815 			int			attnum;
4816 			int			i;
4817 
4818 			/* Lookup column name */
4819 			attnum = InvalidAttrNumber;
4820 			for (i = 0; i < tupDesc->natts; i++)
4821 			{
4822 				if (tupDesc->attrs[i]->attisdropped)
4823 					continue;
4824 				if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4825 				{
4826 					attnum = tupDesc->attrs[i]->attnum;
4827 					break;
4828 				}
4829 			}
4830 			if (attnum == InvalidAttrNumber)
4831 			{
4832 				if (rel != NULL)
4833 					ereport(ERROR,
4834 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4835 							 errmsg("column \"%s\" of relation \"%s\" does not exist",
4836 									name, RelationGetRelationName(rel))));
4837 				else
4838 					ereport(ERROR,
4839 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4840 							 errmsg("column \"%s\" does not exist",
4841 									name)));
4842 			}
4843 			/* Check for duplicates */
4844 			if (list_member_int(attnums, attnum))
4845 				ereport(ERROR,
4846 						(errcode(ERRCODE_DUPLICATE_COLUMN),
4847 						 errmsg("column \"%s\" specified more than once",
4848 								name)));
4849 			attnums = lappend_int(attnums, attnum);
4850 		}
4851 	}
4852 
4853 	return attnums;
4854 }
4855 
4856 
4857 /*
4858  * copy_dest_startup --- executor startup
4859  */
4860 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4861 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4862 {
4863 	/* no-op */
4864 }
4865 
4866 /*
4867  * copy_dest_receive --- receive one tuple
4868  */
4869 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4870 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4871 {
4872 	DR_copy    *myState = (DR_copy *) self;
4873 	CopyState	cstate = myState->cstate;
4874 
4875 	/* Make sure the tuple is fully deconstructed */
4876 	slot_getallattrs(slot);
4877 
4878 	/* And send the data */
4879 	CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4880 	myState->processed++;
4881 
4882 	return true;
4883 }
4884 
4885 /*
4886  * copy_dest_shutdown --- executor end
4887  */
4888 static void
copy_dest_shutdown(DestReceiver * self)4889 copy_dest_shutdown(DestReceiver *self)
4890 {
4891 	/* no-op */
4892 }
4893 
4894 /*
4895  * copy_dest_destroy --- release DestReceiver object
4896  */
4897 static void
copy_dest_destroy(DestReceiver * self)4898 copy_dest_destroy(DestReceiver *self)
4899 {
4900 	pfree(self);
4901 }
4902 
4903 /*
4904  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4905  */
4906 DestReceiver *
CreateCopyDestReceiver(void)4907 CreateCopyDestReceiver(void)
4908 {
4909 	DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));
4910 
4911 	self->pub.receiveSlot = copy_dest_receive;
4912 	self->pub.rStartup = copy_dest_startup;
4913 	self->pub.rShutdown = copy_dest_shutdown;
4914 	self->pub.rDestroy = copy_dest_destroy;
4915 	self->pub.mydest = DestCopyOut;
4916 
4917 	self->cstate = NULL;		/* will be set later */
4918 	self->processed = 0;
4919 
4920 	return (DestReceiver *) self;
4921 }
4922