1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  *		Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22 
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/portal.h"
47 #include "utils/rel.h"
48 #include "utils/rls.h"
49 #include "utils/snapmgr.h"
50 
51 
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
54 
55 /*
56  * Represents the different source/dest cases we need to worry about at
57  * the bottom level
58  */
59 typedef enum CopyDest
60 {
61 	COPY_FILE,					/* to/from file (or a piped program) */
62 	COPY_OLD_FE,				/* to/from frontend (2.0 protocol) */
63 	COPY_NEW_FE					/* to/from frontend (3.0 protocol) */
64 } CopyDest;
65 
66 /*
67  *	Represents the end-of-line terminator type of the input
68  */
69 typedef enum EolType
70 {
71 	EOL_UNKNOWN,
72 	EOL_NL,
73 	EOL_CR,
74 	EOL_CRNL
75 } EolType;
76 
77 /*
78  * This struct contains all the state variables used throughout a COPY
79  * operation. For simplicity, we use the same struct for all variants of COPY,
80  * even though some fields are used in only some cases.
81  *
82  * Multi-byte encodings: all supported client-side encodings encode multi-byte
83  * characters by having the first byte's high bit set. Subsequent bytes of the
84  * character can have the high bit not set. When scanning data in such an
85  * encoding to look for a match to a single-byte (ie ASCII) character, we must
86  * use the full pg_encoding_mblen() machinery to skip over multibyte
87  * characters, else we might find a false match to a trailing byte. In
88  * supported server encodings, there is no possibility of a false match, and
89  * it's faster to make useless comparisons to trailing bytes than it is to
90  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
91  * when we have to do it the hard way.
92  */
93 typedef struct CopyStateData
94 {
95 	/* low-level state data */
96 	CopyDest	copy_dest;		/* type of copy source/destination */
97 	FILE	   *copy_file;		/* used if copy_dest == COPY_FILE */
98 	StringInfo	fe_msgbuf;		/* used for all dests during COPY TO, only for
99 								 * dest == COPY_NEW_FE in COPY FROM */
100 	bool		fe_eof;			/* true if detected end of copy data */
101 	EolType		eol_type;		/* EOL type of input */
102 	int			file_encoding;	/* file or remote side's character encoding */
103 	bool		need_transcoding;		/* file encoding diff from server? */
104 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
105 
106 	/* parameters from the COPY command */
107 	Relation	rel;			/* relation to copy to or from */
108 	QueryDesc  *queryDesc;		/* executable query to copy from */
109 	List	   *attnumlist;		/* integer list of attnums to copy */
110 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
111 	bool		is_program;		/* is 'filename' a program to popen? */
112 	bool		binary;			/* binary format? */
113 	bool		oids;			/* include OIDs? */
114 	bool		freeze;			/* freeze rows on loading? */
115 	bool		csv_mode;		/* Comma Separated Value format? */
116 	bool		header_line;	/* CSV header line? */
117 	char	   *null_print;		/* NULL marker string (server encoding!) */
118 	int			null_print_len; /* length of same */
119 	char	   *null_print_client;		/* same converted to file encoding */
120 	char	   *delim;			/* column delimiter (must be 1 byte) */
121 	char	   *quote;			/* CSV quote char (must be 1 byte) */
122 	char	   *escape;			/* CSV escape char (must be 1 byte) */
123 	List	   *force_quote;	/* list of column names */
124 	bool		force_quote_all;	/* FORCE_QUOTE *? */
125 	bool	   *force_quote_flags;		/* per-column CSV FQ flags */
126 	List	   *force_notnull;	/* list of column names */
127 	bool	   *force_notnull_flags;	/* per-column CSV FNN flags */
128 	List	   *force_null;		/* list of column names */
129 	bool	   *force_null_flags;		/* per-column CSV FN flags */
130 	bool		convert_selectively;	/* do selective binary conversion? */
131 	List	   *convert_select; /* list of column names (can be NIL) */
132 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
133 
134 	/* these are just for error messages, see CopyFromErrorCallback */
135 	const char *cur_relname;	/* table name for error messages */
136 	uint64		cur_lineno;		/* line number for error messages */
137 	const char *cur_attname;	/* current att for error messages */
138 	const char *cur_attval;		/* current att value for error messages */
139 
140 	/*
141 	 * Working state for COPY TO/FROM
142 	 */
143 	MemoryContext copycontext;	/* per-copy execution context */
144 
145 	/*
146 	 * Working state for COPY TO
147 	 */
148 	FmgrInfo   *out_functions;	/* lookup info for output functions */
149 	MemoryContext rowcontext;	/* per-row evaluation context */
150 
151 	/*
152 	 * Working state for COPY FROM
153 	 */
154 	AttrNumber	num_defaults;
155 	bool		file_has_oids;
156 	FmgrInfo	oid_in_function;
157 	Oid			oid_typioparam;
158 	FmgrInfo   *in_functions;	/* array of input functions for each attrs */
159 	Oid		   *typioparams;	/* array of element types for in_functions */
160 	int		   *defmap;			/* array of default att numbers */
161 	ExprState **defexprs;		/* array of default att expressions */
162 	bool		volatile_defexprs;		/* is any of defexprs volatile? */
163 	List	   *range_table;
164 
165 	/*
166 	 * These variables are used to reduce overhead in textual COPY FROM.
167 	 *
168 	 * attribute_buf holds the separated, de-escaped text for each field of
169 	 * the current line.  The CopyReadAttributes functions return arrays of
170 	 * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
171 	 * the buffer on each cycle.
172 	 */
173 	StringInfoData attribute_buf;
174 
175 	/* field raw data pointers found by COPY FROM */
176 
177 	int			max_fields;
178 	char	  **raw_fields;
179 
180 	/*
181 	 * Similarly, line_buf holds the whole input line being processed. The
182 	 * input cycle is first to read the whole line into line_buf, convert it
183 	 * to server encoding there, and then extract the individual attribute
184 	 * fields into attribute_buf.  line_buf is preserved unmodified so that we
185 	 * can display it in error messages if appropriate.
186 	 */
187 	StringInfoData line_buf;
188 	bool		line_buf_converted;		/* converted to server encoding? */
189 	bool		line_buf_valid; /* contains the row being processed? */
190 
191 	/*
192 	 * Finally, raw_buf holds raw data read from the data source (file or
193 	 * client connection).  CopyReadLine parses this data sufficiently to
194 	 * locate line boundaries, then transfers the data to line_buf and
195 	 * converts it.  Note: we guarantee that there is a \0 at
196 	 * raw_buf[raw_buf_len].
197 	 */
198 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
199 	char	   *raw_buf;
200 	int			raw_buf_index;	/* next byte to process */
201 	int			raw_buf_len;	/* total # of bytes stored */
202 } CopyStateData;
203 
204 /* DestReceiver for COPY (query) TO */
205 typedef struct
206 {
207 	DestReceiver pub;			/* publicly-known function pointers */
208 	CopyState	cstate;			/* CopyStateData for the command */
209 	uint64		processed;		/* # of tuples processed */
210 } DR_copy;
211 
212 
213 /*
214  * These macros centralize code used to process line_buf and raw_buf buffers.
215  * They are macros because they often do continue/break control and to avoid
216  * function call overhead in tight COPY loops.
217  *
218  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
219  * prevent the continue/break processing from working.  We end the "if (1)"
220  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
221  * any "else" in the calling code, and to avoid any compiler warnings about
222  * empty statements.  See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
223  */
224 
225 /*
226  * This keeps the character read at the top of the loop in the buffer
227  * even if there is more than one read-ahead.
228  */
229 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
230 if (1) \
231 { \
232 	if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
233 	{ \
234 		raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
235 		need_data = true; \
236 		continue; \
237 	} \
238 } else ((void) 0)
239 
240 /* This consumes the remainder of the buffer and breaks */
241 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
242 if (1) \
243 { \
244 	if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
245 	{ \
246 		if (extralen) \
247 			raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
248 		/* backslash just before EOF, treat as data char */ \
249 		result = true; \
250 		break; \
251 	} \
252 } else ((void) 0)
253 
254 /*
255  * Transfer any approved data to line_buf; must do this to be sure
256  * there is some room in raw_buf.
257  */
258 #define REFILL_LINEBUF \
259 if (1) \
260 { \
261 	if (raw_buf_ptr > cstate->raw_buf_index) \
262 	{ \
263 		appendBinaryStringInfo(&cstate->line_buf, \
264 							 cstate->raw_buf + cstate->raw_buf_index, \
265 							   raw_buf_ptr - cstate->raw_buf_index); \
266 		cstate->raw_buf_index = raw_buf_ptr; \
267 	} \
268 } else ((void) 0)
269 
270 /* Undo any read-ahead and jump out of the block. */
271 #define NO_END_OF_COPY_GOTO \
272 if (1) \
273 { \
274 	raw_buf_ptr = prev_raw_ptr + 1; \
275 	goto not_end_of_copy; \
276 } else ((void) 0)
277 
278 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
279 
280 
281 /* non-export function prototypes */
282 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
283 		  const char *queryString, const Oid queryRelId, List *attnamelist,
284 		  List *options);
285 static void EndCopy(CopyState cstate);
286 static void ClosePipeToProgram(CopyState cstate);
287 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
288 			const Oid queryRelId, const char *filename, bool is_program,
289 			List *attnamelist, List *options);
290 static void EndCopyTo(CopyState cstate);
291 static uint64 DoCopyTo(CopyState cstate);
292 static uint64 CopyTo(CopyState cstate);
293 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
294 			 Datum *values, bool *nulls);
295 static uint64 CopyFrom(CopyState cstate);
296 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
297 					CommandId mycid, int hi_options,
298 					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
299 					BulkInsertState bistate,
300 					int nBufferedTuples, HeapTuple *bufferedTuples,
301 					uint64 firstBufferedLineNo);
302 static bool CopyReadLine(CopyState cstate);
303 static bool CopyReadLineText(CopyState cstate);
304 static int	CopyReadAttributesText(CopyState cstate);
305 static int	CopyReadAttributesCSV(CopyState cstate);
306 static Datum CopyReadBinaryAttribute(CopyState cstate,
307 						int column_no, FmgrInfo *flinfo,
308 						Oid typioparam, int32 typmod,
309 						bool *isnull);
310 static void CopyAttributeOutText(CopyState cstate, char *string);
311 static void CopyAttributeOutCSV(CopyState cstate, char *string,
312 					bool use_quote, bool single_attr);
313 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
314 			   List *attnamelist);
315 static char *limit_printout_length(const char *str);
316 
317 /* Low-level communications functions */
318 static void SendCopyBegin(CopyState cstate);
319 static void ReceiveCopyBegin(CopyState cstate);
320 static void SendCopyEnd(CopyState cstate);
321 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
322 static void CopySendString(CopyState cstate, const char *str);
323 static void CopySendChar(CopyState cstate, char c);
324 static void CopySendEndOfRow(CopyState cstate);
325 static int CopyGetData(CopyState cstate, void *databuf,
326 			int minread, int maxread);
327 static void CopySendInt32(CopyState cstate, int32 val);
328 static bool CopyGetInt32(CopyState cstate, int32 *val);
329 static void CopySendInt16(CopyState cstate, int16 val);
330 static bool CopyGetInt16(CopyState cstate, int16 *val);
331 
332 
333 /*
334  * Send copy start/stop messages for frontend copies.  These have changed
335  * in past protocol redesigns.
336  */
337 static void
SendCopyBegin(CopyState cstate)338 SendCopyBegin(CopyState cstate)
339 {
340 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
341 	{
342 		/* new way */
343 		StringInfoData buf;
344 		int			natts = list_length(cstate->attnumlist);
345 		int16		format = (cstate->binary ? 1 : 0);
346 		int			i;
347 
348 		pq_beginmessage(&buf, 'H');
349 		pq_sendbyte(&buf, format);		/* overall format */
350 		pq_sendint(&buf, natts, 2);
351 		for (i = 0; i < natts; i++)
352 			pq_sendint(&buf, format, 2);		/* per-column formats */
353 		pq_endmessage(&buf);
354 		cstate->copy_dest = COPY_NEW_FE;
355 	}
356 	else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
357 	{
358 		/* old way */
359 		if (cstate->binary)
360 			ereport(ERROR,
361 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
363 		pq_putemptymessage('H');
364 		/* grottiness needed for old COPY OUT protocol */
365 		pq_startcopyout();
366 		cstate->copy_dest = COPY_OLD_FE;
367 	}
368 	else
369 	{
370 		/* very old way */
371 		if (cstate->binary)
372 			ereport(ERROR,
373 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
375 		pq_putemptymessage('B');
376 		/* grottiness needed for old COPY OUT protocol */
377 		pq_startcopyout();
378 		cstate->copy_dest = COPY_OLD_FE;
379 	}
380 }
381 
382 static void
ReceiveCopyBegin(CopyState cstate)383 ReceiveCopyBegin(CopyState cstate)
384 {
385 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
386 	{
387 		/* new way */
388 		StringInfoData buf;
389 		int			natts = list_length(cstate->attnumlist);
390 		int16		format = (cstate->binary ? 1 : 0);
391 		int			i;
392 
393 		pq_beginmessage(&buf, 'G');
394 		pq_sendbyte(&buf, format);		/* overall format */
395 		pq_sendint(&buf, natts, 2);
396 		for (i = 0; i < natts; i++)
397 			pq_sendint(&buf, format, 2);		/* per-column formats */
398 		pq_endmessage(&buf);
399 		cstate->copy_dest = COPY_NEW_FE;
400 		cstate->fe_msgbuf = makeStringInfo();
401 	}
402 	else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
403 	{
404 		/* old way */
405 		if (cstate->binary)
406 			ereport(ERROR,
407 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
409 		pq_putemptymessage('G');
410 		/* any error in old protocol will make us lose sync */
411 		pq_startmsgread();
412 		cstate->copy_dest = COPY_OLD_FE;
413 	}
414 	else
415 	{
416 		/* very old way */
417 		if (cstate->binary)
418 			ereport(ERROR,
419 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 			errmsg("COPY BINARY is not supported to stdout or from stdin")));
421 		pq_putemptymessage('D');
422 		/* any error in old protocol will make us lose sync */
423 		pq_startmsgread();
424 		cstate->copy_dest = COPY_OLD_FE;
425 	}
426 	/* We *must* flush here to ensure FE knows it can send. */
427 	pq_flush();
428 }
429 
430 static void
SendCopyEnd(CopyState cstate)431 SendCopyEnd(CopyState cstate)
432 {
433 	if (cstate->copy_dest == COPY_NEW_FE)
434 	{
435 		/* Shouldn't have any unsent data */
436 		Assert(cstate->fe_msgbuf->len == 0);
437 		/* Send Copy Done message */
438 		pq_putemptymessage('c');
439 	}
440 	else
441 	{
442 		CopySendData(cstate, "\\.", 2);
443 		/* Need to flush out the trailer (this also appends a newline) */
444 		CopySendEndOfRow(cstate);
445 		pq_endcopyout(false);
446 	}
447 }
448 
449 /*----------
450  * CopySendData sends output data to the destination (file or frontend)
451  * CopySendString does the same for null-terminated strings
452  * CopySendChar does the same for single characters
453  * CopySendEndOfRow does the appropriate thing at end of each data row
454  *	(data is not actually flushed except by CopySendEndOfRow)
455  *
456  * NB: no data conversion is applied by these functions
457  *----------
458  */
459 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)460 CopySendData(CopyState cstate, const void *databuf, int datasize)
461 {
462 	appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
463 }
464 
465 static void
CopySendString(CopyState cstate,const char * str)466 CopySendString(CopyState cstate, const char *str)
467 {
468 	appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
469 }
470 
471 static void
CopySendChar(CopyState cstate,char c)472 CopySendChar(CopyState cstate, char c)
473 {
474 	appendStringInfoCharMacro(cstate->fe_msgbuf, c);
475 }
476 
477 static void
CopySendEndOfRow(CopyState cstate)478 CopySendEndOfRow(CopyState cstate)
479 {
480 	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
481 
482 	switch (cstate->copy_dest)
483 	{
484 		case COPY_FILE:
485 			if (!cstate->binary)
486 			{
487 				/* Default line termination depends on platform */
488 #ifndef WIN32
489 				CopySendChar(cstate, '\n');
490 #else
491 				CopySendString(cstate, "\r\n");
492 #endif
493 			}
494 
495 			if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
496 					   cstate->copy_file) != 1 ||
497 				ferror(cstate->copy_file))
498 			{
499 				if (cstate->is_program)
500 				{
501 					if (errno == EPIPE)
502 					{
503 						/*
504 						 * The pipe will be closed automatically on error at
505 						 * the end of transaction, but we might get a better
506 						 * error message from the subprocess' exit code than
507 						 * just "Broken Pipe"
508 						 */
509 						ClosePipeToProgram(cstate);
510 
511 						/*
512 						 * If ClosePipeToProgram() didn't throw an error, the
513 						 * program terminated normally, but closed the pipe
514 						 * first. Restore errno, and throw an error.
515 						 */
516 						errno = EPIPE;
517 					}
518 					ereport(ERROR,
519 							(errcode_for_file_access(),
520 							 errmsg("could not write to COPY program: %m")));
521 				}
522 				else
523 					ereport(ERROR,
524 							(errcode_for_file_access(),
525 							 errmsg("could not write to COPY file: %m")));
526 			}
527 			break;
528 		case COPY_OLD_FE:
529 			/* The FE/BE protocol uses \n as newline for all platforms */
530 			if (!cstate->binary)
531 				CopySendChar(cstate, '\n');
532 
533 			if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
534 			{
535 				/* no hope of recovering connection sync, so FATAL */
536 				ereport(FATAL,
537 						(errcode(ERRCODE_CONNECTION_FAILURE),
538 						 errmsg("connection lost during COPY to stdout")));
539 			}
540 			break;
541 		case COPY_NEW_FE:
542 			/* The FE/BE protocol uses \n as newline for all platforms */
543 			if (!cstate->binary)
544 				CopySendChar(cstate, '\n');
545 
546 			/* Dump the accumulated row as one CopyData message */
547 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
548 			break;
549 	}
550 
551 	resetStringInfo(fe_msgbuf);
552 }
553 
554 /*
555  * CopyGetData reads data from the source (file or frontend)
556  *
557  * We attempt to read at least minread, and at most maxread, bytes from
558  * the source.  The actual number of bytes read is returned; if this is
559  * less than minread, EOF was detected.
560  *
561  * Note: when copying from the frontend, we expect a proper EOF mark per
562  * protocol; if the frontend simply drops the connection, we raise error.
563  * It seems unwise to allow the COPY IN to complete normally in that case.
564  *
565  * NB: no data conversion is applied here.
566  */
567 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)568 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
569 {
570 	int			bytesread = 0;
571 
572 	switch (cstate->copy_dest)
573 	{
574 		case COPY_FILE:
575 			bytesread = fread(databuf, 1, maxread, cstate->copy_file);
576 			if (ferror(cstate->copy_file))
577 				ereport(ERROR,
578 						(errcode_for_file_access(),
579 						 errmsg("could not read from COPY file: %m")));
580 			break;
581 		case COPY_OLD_FE:
582 
583 			/*
584 			 * We cannot read more than minread bytes (which in practice is 1)
585 			 * because old protocol doesn't have any clear way of separating
586 			 * the COPY stream from following data.  This is slow, but not any
587 			 * slower than the code path was originally, and we don't care
588 			 * much anymore about the performance of old protocol.
589 			 */
590 			if (pq_getbytes((char *) databuf, minread))
591 			{
592 				/* Only a \. terminator is legal EOF in old protocol */
593 				ereport(ERROR,
594 						(errcode(ERRCODE_CONNECTION_FAILURE),
595 						 errmsg("unexpected EOF on client connection with an open transaction")));
596 			}
597 			bytesread = minread;
598 			break;
599 		case COPY_NEW_FE:
600 			while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
601 			{
602 				int			avail;
603 
604 				while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
605 				{
606 					/* Try to receive another message */
607 					int			mtype;
608 
609 			readmessage:
610 					HOLD_CANCEL_INTERRUPTS();
611 					pq_startmsgread();
612 					mtype = pq_getbyte();
613 					if (mtype == EOF)
614 						ereport(ERROR,
615 								(errcode(ERRCODE_CONNECTION_FAILURE),
616 								 errmsg("unexpected EOF on client connection with an open transaction")));
617 					if (pq_getmessage(cstate->fe_msgbuf, 0))
618 						ereport(ERROR,
619 								(errcode(ERRCODE_CONNECTION_FAILURE),
620 								 errmsg("unexpected EOF on client connection with an open transaction")));
621 					RESUME_CANCEL_INTERRUPTS();
622 					switch (mtype)
623 					{
624 						case 'd':		/* CopyData */
625 							break;
626 						case 'c':		/* CopyDone */
627 							/* COPY IN correctly terminated by frontend */
628 							cstate->fe_eof = true;
629 							return bytesread;
630 						case 'f':		/* CopyFail */
631 							ereport(ERROR,
632 									(errcode(ERRCODE_QUERY_CANCELED),
633 									 errmsg("COPY from stdin failed: %s",
634 									   pq_getmsgstring(cstate->fe_msgbuf))));
635 							break;
636 						case 'H':		/* Flush */
637 						case 'S':		/* Sync */
638 
639 							/*
640 							 * Ignore Flush/Sync for the convenience of client
641 							 * libraries (such as libpq) that may send those
642 							 * without noticing that the command they just
643 							 * sent was COPY.
644 							 */
645 							goto readmessage;
646 						default:
647 							ereport(ERROR,
648 									(errcode(ERRCODE_PROTOCOL_VIOLATION),
649 									 errmsg("unexpected message type 0x%02X during COPY from stdin",
650 											mtype)));
651 							break;
652 					}
653 				}
654 				avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
655 				if (avail > maxread)
656 					avail = maxread;
657 				pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
658 				databuf = (void *) ((char *) databuf + avail);
659 				maxread -= avail;
660 				bytesread += avail;
661 			}
662 			break;
663 	}
664 
665 	return bytesread;
666 }
667 
668 
669 /*
670  * These functions do apply some data conversion
671  */
672 
673 /*
674  * CopySendInt32 sends an int32 in network byte order
675  */
676 static void
CopySendInt32(CopyState cstate,int32 val)677 CopySendInt32(CopyState cstate, int32 val)
678 {
679 	uint32		buf;
680 
681 	buf = htonl((uint32) val);
682 	CopySendData(cstate, &buf, sizeof(buf));
683 }
684 
685 /*
686  * CopyGetInt32 reads an int32 that appears in network byte order
687  *
688  * Returns true if OK, false if EOF
689  */
690 static bool
CopyGetInt32(CopyState cstate,int32 * val)691 CopyGetInt32(CopyState cstate, int32 *val)
692 {
693 	uint32		buf;
694 
695 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
696 	{
697 		*val = 0;				/* suppress compiler warning */
698 		return false;
699 	}
700 	*val = (int32) ntohl(buf);
701 	return true;
702 }
703 
704 /*
705  * CopySendInt16 sends an int16 in network byte order
706  */
707 static void
CopySendInt16(CopyState cstate,int16 val)708 CopySendInt16(CopyState cstate, int16 val)
709 {
710 	uint16		buf;
711 
712 	buf = htons((uint16) val);
713 	CopySendData(cstate, &buf, sizeof(buf));
714 }
715 
716 /*
717  * CopyGetInt16 reads an int16 that appears in network byte order
718  */
719 static bool
CopyGetInt16(CopyState cstate,int16 * val)720 CopyGetInt16(CopyState cstate, int16 *val)
721 {
722 	uint16		buf;
723 
724 	if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
725 	{
726 		*val = 0;				/* suppress compiler warning */
727 		return false;
728 	}
729 	*val = (int16) ntohs(buf);
730 	return true;
731 }
732 
733 
734 /*
735  * CopyLoadRawBuf loads some more data into raw_buf
736  *
737  * Returns TRUE if able to obtain at least one more byte, else FALSE.
738  *
739  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
740  * down to the start of the buffer and then we load more data after that.
741  * This case is used only when a frontend multibyte character crosses a
742  * bufferload boundary.
743  */
744 static bool
CopyLoadRawBuf(CopyState cstate)745 CopyLoadRawBuf(CopyState cstate)
746 {
747 	int			nbytes;
748 	int			inbytes;
749 
750 	if (cstate->raw_buf_index < cstate->raw_buf_len)
751 	{
752 		/* Copy down the unprocessed data */
753 		nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
754 		memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
755 				nbytes);
756 	}
757 	else
758 		nbytes = 0;				/* no data need be saved */
759 
760 	inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
761 						  1, RAW_BUF_SIZE - nbytes);
762 	nbytes += inbytes;
763 	cstate->raw_buf[nbytes] = '\0';
764 	cstate->raw_buf_index = 0;
765 	cstate->raw_buf_len = nbytes;
766 	return (inbytes > 0);
767 }
768 
769 
770 /*
771  *	 DoCopy executes the SQL COPY statement
772  *
773  * Either unload or reload contents of table <relation>, depending on <from>.
774  * (<from> = TRUE means we are inserting into the table.)  In the "TO" case
775  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
776  * or DELETE query.
777  *
778  * If <pipe> is false, transfer is between the table and the file named
779  * <filename>.  Otherwise, transfer is between the table and our regular
780  * input/output stream. The latter could be either stdin/stdout or a
781  * socket, depending on whether we're running under Postmaster control.
782  *
783  * Do not allow a Postgres user without superuser privilege to read from
784  * or write to a file.
785  *
786  * Do not allow the copy if user doesn't have proper permission to access
787  * the table or the specifically requested columns.
788  */
789 Oid
DoCopy(const CopyStmt * stmt,const char * queryString,uint64 * processed)790 DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
791 {
792 	CopyState	cstate;
793 	bool		is_from = stmt->is_from;
794 	bool		pipe = (stmt->filename == NULL);
795 	Relation	rel;
796 	Oid			relid;
797 	Node	   *query = NULL;
798 	List	   *range_table = NIL;
799 
800 	/* Disallow COPY to/from file or program except to superusers. */
801 	if (!pipe && !superuser())
802 	{
803 		if (stmt->is_program)
804 			ereport(ERROR,
805 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
806 					 errmsg("must be superuser to COPY to or from an external program"),
807 					 errhint("Anyone can COPY to stdout or from stdin. "
808 						   "psql's \\copy command also works for anyone.")));
809 		else
810 			ereport(ERROR,
811 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
812 					 errmsg("must be superuser to COPY to or from a file"),
813 					 errhint("Anyone can COPY to stdout or from stdin. "
814 						   "psql's \\copy command also works for anyone.")));
815 	}
816 
817 	if (stmt->relation)
818 	{
819 		TupleDesc	tupDesc;
820 		AclMode		required_access = (is_from ? ACL_INSERT : ACL_SELECT);
821 		List	   *attnums;
822 		ListCell   *cur;
823 		RangeTblEntry *rte;
824 
825 		Assert(!stmt->query);
826 
827 		/* Open and lock the relation, using the appropriate lock type. */
828 		rel = heap_openrv(stmt->relation,
829 						  (is_from ? RowExclusiveLock : AccessShareLock));
830 
831 		relid = RelationGetRelid(rel);
832 
833 		rte = makeNode(RangeTblEntry);
834 		rte->rtekind = RTE_RELATION;
835 		rte->relid = RelationGetRelid(rel);
836 		rte->relkind = rel->rd_rel->relkind;
837 		rte->requiredPerms = required_access;
838 		range_table = list_make1(rte);
839 
840 		tupDesc = RelationGetDescr(rel);
841 		attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
842 		foreach(cur, attnums)
843 		{
844 			int			attno = lfirst_int(cur) -
845 			FirstLowInvalidHeapAttributeNumber;
846 
847 			if (is_from)
848 				rte->insertedCols = bms_add_member(rte->insertedCols, attno);
849 			else
850 				rte->selectedCols = bms_add_member(rte->selectedCols, attno);
851 		}
852 		ExecCheckRTPerms(range_table, true);
853 
854 		/*
855 		 * Permission check for row security policies.
856 		 *
857 		 * check_enable_rls will ereport(ERROR) if the user has requested
858 		 * something invalid and will otherwise indicate if we should enable
859 		 * RLS (returns RLS_ENABLED) or not for this COPY statement.
860 		 *
861 		 * If the relation has a row security policy and we are to apply it
862 		 * then perform a "query" copy and allow the normal query processing
863 		 * to handle the policies.
864 		 *
865 		 * If RLS is not enabled for this, then just fall through to the
866 		 * normal non-filtering relation handling.
867 		 */
868 		if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
869 		{
870 			SelectStmt *select;
871 			ColumnRef  *cr;
872 			ResTarget  *target;
873 			RangeVar   *from;
874 			List	   *targetList = NIL;
875 
876 			if (is_from)
877 				ereport(ERROR,
878 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 				   errmsg("COPY FROM not supported with row-level security"),
880 						 errhint("Use INSERT statements instead.")));
881 
882 			/*
883 			 * Build target list
884 			 *
885 			 * If no columns are specified in the attribute list of the COPY
886 			 * command, then the target list is 'all' columns. Therefore, '*'
887 			 * should be used as the target list for the resulting SELECT
888 			 * statement.
889 			 *
890 			 * In the case that columns are specified in the attribute list,
891 			 * create a ColumnRef and ResTarget for each column and add them to
892 			 * the target list for the resulting SELECT statement.
893 			 */
894 			if (!stmt->attlist)
895 			{
896 				cr = makeNode(ColumnRef);
897 				cr->fields = list_make1(makeNode(A_Star));
898 				cr->location = -1;
899 
900 				target = makeNode(ResTarget);
901 				target->name = NULL;
902 				target->indirection = NIL;
903 				target->val = (Node *) cr;
904 				target->location = -1;
905 
906 				targetList = list_make1(target);
907 			}
908 			else
909 			{
910 				ListCell   *lc;
911 
912 				foreach(lc, stmt->attlist)
913 				{
914 					/*
915 					 * Build the ColumnRef for each column.  The ColumnRef
916 					 * 'fields' property is a String 'Value' node (see
917 					 * nodes/value.h) that corresponds to the column name
918 					 * respectively.
919 					 */
920 					cr = makeNode(ColumnRef);
921 					cr->fields = list_make1(lfirst(lc));
922 					cr->location = -1;
923 
924 					/* Build the ResTarget and add the ColumnRef to it. */
925 					target = makeNode(ResTarget);
926 					target->name = NULL;
927 					target->indirection = NIL;
928 					target->val = (Node *) cr;
929 					target->location = -1;
930 
931 					/* Add each column to the SELECT statement's target list */
932 					targetList = lappend(targetList, target);
933 				}
934 			}
935 
936 			/*
937 			 * Build RangeVar for from clause, fully qualified based on the
938 			 * relation which we have opened and locked.
939 			 */
940 			from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
941 								pstrdup(RelationGetRelationName(rel)),
942 								-1);
943 
944 			/* Build query */
945 			select = makeNode(SelectStmt);
946 			select->targetList = targetList;
947 			select->fromClause = list_make1(from);
948 
949 			query = (Node *) select;
950 
951 			/*
952 			 * Close the relation for now, but keep the lock on it to prevent
953 			 * changes between now and when we start the query-based COPY.
954 			 *
955 			 * We'll reopen it later as part of the query-based COPY.
956 			 */
957 			heap_close(rel, NoLock);
958 			rel = NULL;
959 		}
960 	}
961 	else
962 	{
963 		Assert(stmt->query);
964 
965 		query = stmt->query;
966 		relid = InvalidOid;
967 		rel = NULL;
968 	}
969 
970 	if (is_from)
971 	{
972 		Assert(rel);
973 
974 		/* check read-only transaction and parallel mode */
975 		if (XactReadOnly && !rel->rd_islocaltemp)
976 			PreventCommandIfReadOnly("COPY FROM");
977 		PreventCommandIfParallelMode("COPY FROM");
978 
979 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
980 							   stmt->attlist, stmt->options);
981 		cstate->range_table = range_table;
982 		*processed = CopyFrom(cstate);	/* copy from file to database */
983 		EndCopyFrom(cstate);
984 	}
985 	else
986 	{
987 		cstate = BeginCopyTo(rel, query, queryString, relid,
988 							 stmt->filename, stmt->is_program,
989 							 stmt->attlist, stmt->options);
990 		*processed = DoCopyTo(cstate);	/* copy from database to file */
991 		EndCopyTo(cstate);
992 	}
993 
994 	/*
995 	 * Close the relation. If reading, we can release the AccessShareLock we
996 	 * got; if writing, we should hold the lock until end of transaction to
997 	 * ensure that updates will be committed before lock is released.
998 	 */
999 	if (rel != NULL)
1000 		heap_close(rel, (is_from ? NoLock : AccessShareLock));
1001 
1002 	return relid;
1003 }
1004 
1005 /*
1006  * Process the statement option list for COPY.
1007  *
1008  * Scan the options list (a list of DefElem) and transpose the information
1009  * into cstate, applying appropriate error checking.
1010  *
1011  * cstate is assumed to be filled with zeroes initially.
1012  *
1013  * This is exported so that external users of the COPY API can sanity-check
1014  * a list of options.  In that usage, cstate should be passed as NULL
1015  * (since external users don't know sizeof(CopyStateData)) and the collected
1016  * data is just leaked until CurrentMemoryContext is reset.
1017  *
1018  * Note that additional checking, such as whether column names listed in FORCE
1019  * QUOTE actually exist, has to be applied later.  This just checks for
1020  * self-consistency of the options list.
1021  */
1022 void
ProcessCopyOptions(CopyState cstate,bool is_from,List * options)1023 ProcessCopyOptions(CopyState cstate,
1024 				   bool is_from,
1025 				   List *options)
1026 {
1027 	bool		format_specified = false;
1028 	ListCell   *option;
1029 
1030 	/* Support external use for option sanity checking */
1031 	if (cstate == NULL)
1032 		cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1033 
1034 	cstate->file_encoding = -1;
1035 
1036 	/* Extract options from the statement node tree */
1037 	foreach(option, options)
1038 	{
1039 		DefElem    *defel = (DefElem *) lfirst(option);
1040 
1041 		if (strcmp(defel->defname, "format") == 0)
1042 		{
1043 			char	   *fmt = defGetString(defel);
1044 
1045 			if (format_specified)
1046 				ereport(ERROR,
1047 						(errcode(ERRCODE_SYNTAX_ERROR),
1048 						 errmsg("conflicting or redundant options")));
1049 			format_specified = true;
1050 			if (strcmp(fmt, "text") == 0)
1051 				 /* default format */ ;
1052 			else if (strcmp(fmt, "csv") == 0)
1053 				cstate->csv_mode = true;
1054 			else if (strcmp(fmt, "binary") == 0)
1055 				cstate->binary = true;
1056 			else
1057 				ereport(ERROR,
1058 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1059 						 errmsg("COPY format \"%s\" not recognized", fmt)));
1060 		}
1061 		else if (strcmp(defel->defname, "oids") == 0)
1062 		{
1063 			if (cstate->oids)
1064 				ereport(ERROR,
1065 						(errcode(ERRCODE_SYNTAX_ERROR),
1066 						 errmsg("conflicting or redundant options")));
1067 			cstate->oids = defGetBoolean(defel);
1068 		}
1069 		else if (strcmp(defel->defname, "freeze") == 0)
1070 		{
1071 			if (cstate->freeze)
1072 				ereport(ERROR,
1073 						(errcode(ERRCODE_SYNTAX_ERROR),
1074 						 errmsg("conflicting or redundant options")));
1075 			cstate->freeze = defGetBoolean(defel);
1076 		}
1077 		else if (strcmp(defel->defname, "delimiter") == 0)
1078 		{
1079 			if (cstate->delim)
1080 				ereport(ERROR,
1081 						(errcode(ERRCODE_SYNTAX_ERROR),
1082 						 errmsg("conflicting or redundant options")));
1083 			cstate->delim = defGetString(defel);
1084 		}
1085 		else if (strcmp(defel->defname, "null") == 0)
1086 		{
1087 			if (cstate->null_print)
1088 				ereport(ERROR,
1089 						(errcode(ERRCODE_SYNTAX_ERROR),
1090 						 errmsg("conflicting or redundant options")));
1091 			cstate->null_print = defGetString(defel);
1092 		}
1093 		else if (strcmp(defel->defname, "header") == 0)
1094 		{
1095 			if (cstate->header_line)
1096 				ereport(ERROR,
1097 						(errcode(ERRCODE_SYNTAX_ERROR),
1098 						 errmsg("conflicting or redundant options")));
1099 			cstate->header_line = defGetBoolean(defel);
1100 		}
1101 		else if (strcmp(defel->defname, "quote") == 0)
1102 		{
1103 			if (cstate->quote)
1104 				ereport(ERROR,
1105 						(errcode(ERRCODE_SYNTAX_ERROR),
1106 						 errmsg("conflicting or redundant options")));
1107 			cstate->quote = defGetString(defel);
1108 		}
1109 		else if (strcmp(defel->defname, "escape") == 0)
1110 		{
1111 			if (cstate->escape)
1112 				ereport(ERROR,
1113 						(errcode(ERRCODE_SYNTAX_ERROR),
1114 						 errmsg("conflicting or redundant options")));
1115 			cstate->escape = defGetString(defel);
1116 		}
1117 		else if (strcmp(defel->defname, "force_quote") == 0)
1118 		{
1119 			if (cstate->force_quote || cstate->force_quote_all)
1120 				ereport(ERROR,
1121 						(errcode(ERRCODE_SYNTAX_ERROR),
1122 						 errmsg("conflicting or redundant options")));
1123 			if (defel->arg && IsA(defel->arg, A_Star))
1124 				cstate->force_quote_all = true;
1125 			else if (defel->arg && IsA(defel->arg, List))
1126 				cstate->force_quote = (List *) defel->arg;
1127 			else
1128 				ereport(ERROR,
1129 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1130 						 errmsg("argument to option \"%s\" must be a list of column names",
1131 								defel->defname)));
1132 		}
1133 		else if (strcmp(defel->defname, "force_not_null") == 0)
1134 		{
1135 			if (cstate->force_notnull)
1136 				ereport(ERROR,
1137 						(errcode(ERRCODE_SYNTAX_ERROR),
1138 						 errmsg("conflicting or redundant options")));
1139 			if (defel->arg && IsA(defel->arg, List))
1140 				cstate->force_notnull = (List *) defel->arg;
1141 			else
1142 				ereport(ERROR,
1143 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1144 						 errmsg("argument to option \"%s\" must be a list of column names",
1145 								defel->defname)));
1146 		}
1147 		else if (strcmp(defel->defname, "force_null") == 0)
1148 		{
1149 			if (cstate->force_null)
1150 				ereport(ERROR,
1151 						(errcode(ERRCODE_SYNTAX_ERROR),
1152 						 errmsg("conflicting or redundant options")));
1153 			if (defel->arg && IsA(defel->arg, List))
1154 				cstate->force_null = (List *) defel->arg;
1155 			else
1156 				ereport(ERROR,
1157 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1158 						 errmsg("argument to option \"%s\" must be a list of column names",
1159 								defel->defname)));
1160 		}
1161 		else if (strcmp(defel->defname, "convert_selectively") == 0)
1162 		{
1163 			/*
1164 			 * Undocumented, not-accessible-from-SQL option: convert only the
1165 			 * named columns to binary form, storing the rest as NULLs. It's
1166 			 * allowed for the column list to be NIL.
1167 			 */
1168 			if (cstate->convert_selectively)
1169 				ereport(ERROR,
1170 						(errcode(ERRCODE_SYNTAX_ERROR),
1171 						 errmsg("conflicting or redundant options")));
1172 			cstate->convert_selectively = true;
1173 			if (defel->arg == NULL || IsA(defel->arg, List))
1174 				cstate->convert_select = (List *) defel->arg;
1175 			else
1176 				ereport(ERROR,
1177 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1178 						 errmsg("argument to option \"%s\" must be a list of column names",
1179 								defel->defname)));
1180 		}
1181 		else if (strcmp(defel->defname, "encoding") == 0)
1182 		{
1183 			if (cstate->file_encoding >= 0)
1184 				ereport(ERROR,
1185 						(errcode(ERRCODE_SYNTAX_ERROR),
1186 						 errmsg("conflicting or redundant options")));
1187 			cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1188 			if (cstate->file_encoding < 0)
1189 				ereport(ERROR,
1190 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 						 errmsg("argument to option \"%s\" must be a valid encoding name",
1192 								defel->defname)));
1193 		}
1194 		else
1195 			ereport(ERROR,
1196 					(errcode(ERRCODE_SYNTAX_ERROR),
1197 					 errmsg("option \"%s\" not recognized",
1198 							defel->defname)));
1199 	}
1200 
1201 	/*
1202 	 * Check for incompatible options (must do these two before inserting
1203 	 * defaults)
1204 	 */
1205 	if (cstate->binary && cstate->delim)
1206 		ereport(ERROR,
1207 				(errcode(ERRCODE_SYNTAX_ERROR),
1208 				 errmsg("cannot specify DELIMITER in BINARY mode")));
1209 
1210 	if (cstate->binary && cstate->null_print)
1211 		ereport(ERROR,
1212 				(errcode(ERRCODE_SYNTAX_ERROR),
1213 				 errmsg("cannot specify NULL in BINARY mode")));
1214 
1215 	/* Set defaults for omitted options */
1216 	if (!cstate->delim)
1217 		cstate->delim = cstate->csv_mode ? "," : "\t";
1218 
1219 	if (!cstate->null_print)
1220 		cstate->null_print = cstate->csv_mode ? "" : "\\N";
1221 	cstate->null_print_len = strlen(cstate->null_print);
1222 
1223 	if (cstate->csv_mode)
1224 	{
1225 		if (!cstate->quote)
1226 			cstate->quote = "\"";
1227 		if (!cstate->escape)
1228 			cstate->escape = cstate->quote;
1229 	}
1230 
1231 	/* Only single-byte delimiter strings are supported. */
1232 	if (strlen(cstate->delim) != 1)
1233 		ereport(ERROR,
1234 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1235 			  errmsg("COPY delimiter must be a single one-byte character")));
1236 
1237 	/* Disallow end-of-line characters */
1238 	if (strchr(cstate->delim, '\r') != NULL ||
1239 		strchr(cstate->delim, '\n') != NULL)
1240 		ereport(ERROR,
1241 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1242 			 errmsg("COPY delimiter cannot be newline or carriage return")));
1243 
1244 	if (strchr(cstate->null_print, '\r') != NULL ||
1245 		strchr(cstate->null_print, '\n') != NULL)
1246 		ereport(ERROR,
1247 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1248 				 errmsg("COPY null representation cannot use newline or carriage return")));
1249 
1250 	/*
1251 	 * Disallow unsafe delimiter characters in non-CSV mode.  We can't allow
1252 	 * backslash because it would be ambiguous.  We can't allow the other
1253 	 * cases because data characters matching the delimiter must be
1254 	 * backslashed, and certain backslash combinations are interpreted
1255 	 * non-literally by COPY IN.  Disallowing all lower case ASCII letters is
1256 	 * more than strictly necessary, but seems best for consistency and
1257 	 * future-proofing.  Likewise we disallow all digits though only octal
1258 	 * digits are actually dangerous.
1259 	 */
1260 	if (!cstate->csv_mode &&
1261 		strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1262 			   cstate->delim[0]) != NULL)
1263 		ereport(ERROR,
1264 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1265 				 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1266 
1267 	/* Check header */
1268 	if (!cstate->csv_mode && cstate->header_line)
1269 		ereport(ERROR,
1270 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1271 				 errmsg("COPY HEADER available only in CSV mode")));
1272 
1273 	/* Check quote */
1274 	if (!cstate->csv_mode && cstate->quote != NULL)
1275 		ereport(ERROR,
1276 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1277 				 errmsg("COPY quote available only in CSV mode")));
1278 
1279 	if (cstate->csv_mode && strlen(cstate->quote) != 1)
1280 		ereport(ERROR,
1281 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1282 				 errmsg("COPY quote must be a single one-byte character")));
1283 
1284 	if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1285 		ereport(ERROR,
1286 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1287 				 errmsg("COPY delimiter and quote must be different")));
1288 
1289 	/* Check escape */
1290 	if (!cstate->csv_mode && cstate->escape != NULL)
1291 		ereport(ERROR,
1292 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1293 				 errmsg("COPY escape available only in CSV mode")));
1294 
1295 	if (cstate->csv_mode && strlen(cstate->escape) != 1)
1296 		ereport(ERROR,
1297 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1298 				 errmsg("COPY escape must be a single one-byte character")));
1299 
1300 	/* Check force_quote */
1301 	if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1302 		ereport(ERROR,
1303 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1304 				 errmsg("COPY force quote available only in CSV mode")));
1305 	if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1306 		ereport(ERROR,
1307 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1308 				 errmsg("COPY force quote only available using COPY TO")));
1309 
1310 	/* Check force_notnull */
1311 	if (!cstate->csv_mode && cstate->force_notnull != NIL)
1312 		ereport(ERROR,
1313 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1314 				 errmsg("COPY force not null available only in CSV mode")));
1315 	if (cstate->force_notnull != NIL && !is_from)
1316 		ereport(ERROR,
1317 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1318 			  errmsg("COPY force not null only available using COPY FROM")));
1319 
1320 	/* Check force_null */
1321 	if (!cstate->csv_mode && cstate->force_null != NIL)
1322 		ereport(ERROR,
1323 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1324 				 errmsg("COPY force null available only in CSV mode")));
1325 
1326 	if (cstate->force_null != NIL && !is_from)
1327 		ereport(ERROR,
1328 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1329 				 errmsg("COPY force null only available using COPY FROM")));
1330 
1331 	/* Don't allow the delimiter to appear in the null string. */
1332 	if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1333 		ereport(ERROR,
1334 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1335 		errmsg("COPY delimiter must not appear in the NULL specification")));
1336 
1337 	/* Don't allow the CSV quote char to appear in the null string. */
1338 	if (cstate->csv_mode &&
1339 		strchr(cstate->null_print, cstate->quote[0]) != NULL)
1340 		ereport(ERROR,
1341 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 				 errmsg("CSV quote character must not appear in the NULL specification")));
1343 }
1344 
1345 /*
1346  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1347  *
1348  * Iff <binary>, unload or reload in the binary format, as opposed to the
1349  * more wasteful but more robust and portable text format.
1350  *
1351  * Iff <oids>, unload or reload the format that includes OID information.
1352  * On input, we accept OIDs whether or not the table has an OID column,
1353  * but silently drop them if it does not.  On output, we report an error
1354  * if the user asks for OIDs in a table that has none (not providing an
1355  * OID column might seem friendlier, but could seriously confuse programs).
1356  *
1357  * If in the text format, delimit columns with delimiter <delim> and print
1358  * NULL values as <null_print>.
1359  */
1360 static CopyState
BeginCopy(bool is_from,Relation rel,Node * raw_query,const char * queryString,const Oid queryRelId,List * attnamelist,List * options)1361 BeginCopy(bool is_from,
1362 		  Relation rel,
1363 		  Node *raw_query,
1364 		  const char *queryString,
1365 		  const Oid queryRelId,
1366 		  List *attnamelist,
1367 		  List *options)
1368 {
1369 	CopyState	cstate;
1370 	TupleDesc	tupDesc;
1371 	int			num_phys_attrs;
1372 	MemoryContext oldcontext;
1373 
1374 	/* Allocate workspace and zero all fields */
1375 	cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1376 
1377 	/*
1378 	 * We allocate everything used by a cstate in a new memory context. This
1379 	 * avoids memory leaks during repeated use of COPY in a query.
1380 	 */
1381 	cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1382 												"COPY",
1383 												ALLOCSET_DEFAULT_SIZES);
1384 
1385 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1386 
1387 	/* Extract options from the statement node tree */
1388 	ProcessCopyOptions(cstate, is_from, options);
1389 
1390 	/* Process the source/target relation or query */
1391 	if (rel)
1392 	{
1393 		Assert(!raw_query);
1394 
1395 		cstate->rel = rel;
1396 
1397 		tupDesc = RelationGetDescr(cstate->rel);
1398 
1399 		/* Don't allow COPY w/ OIDs to or from a table without them */
1400 		if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1401 			ereport(ERROR,
1402 					(errcode(ERRCODE_UNDEFINED_COLUMN),
1403 					 errmsg("table \"%s\" does not have OIDs",
1404 							RelationGetRelationName(cstate->rel))));
1405 	}
1406 	else
1407 	{
1408 		List	   *rewritten;
1409 		Query	   *query;
1410 		PlannedStmt *plan;
1411 		DestReceiver *dest;
1412 
1413 		Assert(!is_from);
1414 		cstate->rel = NULL;
1415 
1416 		/* Don't allow COPY w/ OIDs from a query */
1417 		if (cstate->oids)
1418 			ereport(ERROR,
1419 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1420 					 errmsg("COPY (query) WITH OIDS is not supported")));
1421 
1422 		/*
1423 		 * Run parse analysis and rewrite.  Note this also acquires sufficient
1424 		 * locks on the source table(s).
1425 		 *
1426 		 * Because the parser and planner tend to scribble on their input, we
1427 		 * make a preliminary copy of the source querytree.  This prevents
1428 		 * problems in the case that the COPY is in a portal or plpgsql
1429 		 * function and is executed repeatedly.  (See also the same hack in
1430 		 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
1431 		 */
1432 		rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
1433 										   queryString, NULL, 0);
1434 
1435 		/* check that we got back something we can work with */
1436 		if (rewritten == NIL)
1437 		{
1438 			ereport(ERROR,
1439 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1440 			 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1441 		}
1442 		else if (list_length(rewritten) > 1)
1443 		{
1444 			ListCell   *lc;
1445 
1446 			/* examine queries to determine which error message to issue */
1447 			foreach(lc, rewritten)
1448 			{
1449 				Query	   *q = (Query *) lfirst(lc);
1450 
1451 				if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1452 					ereport(ERROR,
1453 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1454 							 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1455 				if (q->querySource == QSRC_NON_INSTEAD_RULE)
1456 					ereport(ERROR,
1457 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1458 					errmsg("DO ALSO rules are not supported for the COPY")));
1459 			}
1460 
1461 			ereport(ERROR,
1462 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1463 					 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1464 		}
1465 
1466 		query = (Query *) linitial(rewritten);
1467 
1468 		/* The grammar allows SELECT INTO, but we don't support that */
1469 		if (query->utilityStmt != NULL &&
1470 			IsA(query->utilityStmt, CreateTableAsStmt))
1471 			ereport(ERROR,
1472 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473 					 errmsg("COPY (SELECT INTO) is not supported")));
1474 
1475 		Assert(query->utilityStmt == NULL);
1476 
1477 		/*
1478 		 * Similarly the grammar doesn't enforce the presence of a RETURNING
1479 		 * clause, but this is required here.
1480 		 */
1481 		if (query->commandType != CMD_SELECT &&
1482 			query->returningList == NIL)
1483 		{
1484 			Assert(query->commandType == CMD_INSERT ||
1485 				   query->commandType == CMD_UPDATE ||
1486 				   query->commandType == CMD_DELETE);
1487 
1488 			ereport(ERROR,
1489 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1490 					 errmsg("COPY query must have a RETURNING clause")));
1491 		}
1492 
1493 		/* plan the query */
1494 		plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1495 
1496 		/*
1497 		 * With row level security and a user using "COPY relation TO", we
1498 		 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1499 		 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1500 		 * in any RLS clauses.
1501 		 *
1502 		 * When this happens, we are passed in the relid of the originally
1503 		 * found relation (which we have locked).  As the planner will look up
1504 		 * the relation again, we double-check here to make sure it found the
1505 		 * same one that we have locked.
1506 		 */
1507 		if (queryRelId != InvalidOid)
1508 		{
1509 			/*
1510 			 * Note that with RLS involved there may be multiple relations,
1511 			 * and while the one we need is almost certainly first, we don't
1512 			 * make any guarantees of that in the planner, so check the whole
1513 			 * list and make sure we find the original relation.
1514 			 */
1515 			if (!list_member_oid(plan->relationOids, queryRelId))
1516 				ereport(ERROR,
1517 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1518 				errmsg("relation referenced by COPY statement has changed")));
1519 		}
1520 
1521 		/*
1522 		 * Use a snapshot with an updated command ID to ensure this query sees
1523 		 * results of any previously executed queries.
1524 		 */
1525 		PushCopiedSnapshot(GetActiveSnapshot());
1526 		UpdateActiveSnapshotCommandId();
1527 
1528 		/* Create dest receiver for COPY OUT */
1529 		dest = CreateDestReceiver(DestCopyOut);
1530 		((DR_copy *) dest)->cstate = cstate;
1531 
1532 		/* Create a QueryDesc requesting no output */
1533 		cstate->queryDesc = CreateQueryDesc(plan, queryString,
1534 											GetActiveSnapshot(),
1535 											InvalidSnapshot,
1536 											dest, NULL, 0);
1537 
1538 		/*
1539 		 * Call ExecutorStart to prepare the plan for execution.
1540 		 *
1541 		 * ExecutorStart computes a result tupdesc for us
1542 		 */
1543 		ExecutorStart(cstate->queryDesc, 0);
1544 
1545 		tupDesc = cstate->queryDesc->tupDesc;
1546 	}
1547 
1548 	/* Generate or convert list of attributes to process */
1549 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1550 
1551 	num_phys_attrs = tupDesc->natts;
1552 
1553 	/* Convert FORCE_QUOTE name list to per-column flags, check validity */
1554 	cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1555 	if (cstate->force_quote_all)
1556 	{
1557 		int			i;
1558 
1559 		for (i = 0; i < num_phys_attrs; i++)
1560 			cstate->force_quote_flags[i] = true;
1561 	}
1562 	else if (cstate->force_quote)
1563 	{
1564 		List	   *attnums;
1565 		ListCell   *cur;
1566 
1567 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1568 
1569 		foreach(cur, attnums)
1570 		{
1571 			int			attnum = lfirst_int(cur);
1572 
1573 			if (!list_member_int(cstate->attnumlist, attnum))
1574 				ereport(ERROR,
1575 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1576 				   errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1577 						  NameStr(tupDesc->attrs[attnum - 1]->attname))));
1578 			cstate->force_quote_flags[attnum - 1] = true;
1579 		}
1580 	}
1581 
1582 	/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1583 	cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1584 	if (cstate->force_notnull)
1585 	{
1586 		List	   *attnums;
1587 		ListCell   *cur;
1588 
1589 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1590 
1591 		foreach(cur, attnums)
1592 		{
1593 			int			attnum = lfirst_int(cur);
1594 
1595 			if (!list_member_int(cstate->attnumlist, attnum))
1596 				ereport(ERROR,
1597 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1598 				errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1599 					   NameStr(tupDesc->attrs[attnum - 1]->attname))));
1600 			cstate->force_notnull_flags[attnum - 1] = true;
1601 		}
1602 	}
1603 
1604 	/* Convert FORCE_NULL name list to per-column flags, check validity */
1605 	cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1606 	if (cstate->force_null)
1607 	{
1608 		List	   *attnums;
1609 		ListCell   *cur;
1610 
1611 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1612 
1613 		foreach(cur, attnums)
1614 		{
1615 			int			attnum = lfirst_int(cur);
1616 
1617 			if (!list_member_int(cstate->attnumlist, attnum))
1618 				ereport(ERROR,
1619 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1620 					errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1621 						   NameStr(tupDesc->attrs[attnum - 1]->attname))));
1622 			cstate->force_null_flags[attnum - 1] = true;
1623 		}
1624 	}
1625 
1626 	/* Convert convert_selectively name list to per-column flags */
1627 	if (cstate->convert_selectively)
1628 	{
1629 		List	   *attnums;
1630 		ListCell   *cur;
1631 
1632 		cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1633 
1634 		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1635 
1636 		foreach(cur, attnums)
1637 		{
1638 			int			attnum = lfirst_int(cur);
1639 
1640 			if (!list_member_int(cstate->attnumlist, attnum))
1641 				ereport(ERROR,
1642 						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1643 						 errmsg_internal("selected column \"%s\" not referenced by COPY",
1644 							 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1645 			cstate->convert_select_flags[attnum - 1] = true;
1646 		}
1647 	}
1648 
1649 	/* Use client encoding when ENCODING option is not specified. */
1650 	if (cstate->file_encoding < 0)
1651 		cstate->file_encoding = pg_get_client_encoding();
1652 
1653 	/*
1654 	 * Set up encoding conversion info.  Even if the file and server encodings
1655 	 * are the same, we must apply pg_any_to_server() to validate data in
1656 	 * multibyte encodings.
1657 	 */
1658 	cstate->need_transcoding =
1659 		(cstate->file_encoding != GetDatabaseEncoding() ||
1660 		 pg_database_encoding_max_length() > 1);
1661 	/* See Multibyte encoding comment above */
1662 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1663 
1664 	cstate->copy_dest = COPY_FILE;		/* default */
1665 
1666 	MemoryContextSwitchTo(oldcontext);
1667 
1668 	return cstate;
1669 }
1670 
1671 /*
1672  * Closes the pipe to an external program, checking the pclose() return code.
1673  */
1674 static void
ClosePipeToProgram(CopyState cstate)1675 ClosePipeToProgram(CopyState cstate)
1676 {
1677 	int			pclose_rc;
1678 
1679 	Assert(cstate->is_program);
1680 
1681 	pclose_rc = ClosePipeStream(cstate->copy_file);
1682 	if (pclose_rc == -1)
1683 		ereport(ERROR,
1684 				(errcode_for_file_access(),
1685 				 errmsg("could not close pipe to external command: %m")));
1686 	else if (pclose_rc != 0)
1687 		ereport(ERROR,
1688 				(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1689 				 errmsg("program \"%s\" failed",
1690 						cstate->filename),
1691 				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1692 }
1693 
1694 /*
1695  * Release resources allocated in a cstate for COPY TO/FROM.
1696  */
1697 static void
EndCopy(CopyState cstate)1698 EndCopy(CopyState cstate)
1699 {
1700 	if (cstate->is_program)
1701 	{
1702 		ClosePipeToProgram(cstate);
1703 	}
1704 	else
1705 	{
1706 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1707 			ereport(ERROR,
1708 					(errcode_for_file_access(),
1709 					 errmsg("could not close file \"%s\": %m",
1710 							cstate->filename)));
1711 	}
1712 
1713 	MemoryContextDelete(cstate->copycontext);
1714 	pfree(cstate);
1715 }
1716 
1717 /*
1718  * Setup CopyState to read tuples from a table or a query for COPY TO.
1719  */
1720 static CopyState
BeginCopyTo(Relation rel,Node * query,const char * queryString,const Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1721 BeginCopyTo(Relation rel,
1722 			Node *query,
1723 			const char *queryString,
1724 			const Oid queryRelId,
1725 			const char *filename,
1726 			bool is_program,
1727 			List *attnamelist,
1728 			List *options)
1729 {
1730 	CopyState	cstate;
1731 	bool		pipe = (filename == NULL);
1732 	MemoryContext oldcontext;
1733 
1734 	if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1735 	{
1736 		if (rel->rd_rel->relkind == RELKIND_VIEW)
1737 			ereport(ERROR,
1738 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1739 					 errmsg("cannot copy from view \"%s\"",
1740 							RelationGetRelationName(rel)),
1741 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1742 		else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1743 			ereport(ERROR,
1744 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1745 					 errmsg("cannot copy from materialized view \"%s\"",
1746 							RelationGetRelationName(rel)),
1747 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1748 		else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1749 			ereport(ERROR,
1750 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1751 					 errmsg("cannot copy from foreign table \"%s\"",
1752 							RelationGetRelationName(rel)),
1753 					 errhint("Try the COPY (SELECT ...) TO variant.")));
1754 		else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1755 			ereport(ERROR,
1756 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1757 					 errmsg("cannot copy from sequence \"%s\"",
1758 							RelationGetRelationName(rel))));
1759 		else
1760 			ereport(ERROR,
1761 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1762 					 errmsg("cannot copy from non-table relation \"%s\"",
1763 							RelationGetRelationName(rel))));
1764 	}
1765 
1766 	cstate = BeginCopy(false, rel, query, queryString, queryRelId, attnamelist,
1767 					   options);
1768 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1769 
1770 	if (pipe)
1771 	{
1772 		Assert(!is_program);	/* the grammar does not allow this */
1773 		if (whereToSendOutput != DestRemote)
1774 			cstate->copy_file = stdout;
1775 	}
1776 	else
1777 	{
1778 		cstate->filename = pstrdup(filename);
1779 		cstate->is_program = is_program;
1780 
1781 		if (is_program)
1782 		{
1783 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1784 			if (cstate->copy_file == NULL)
1785 				ereport(ERROR,
1786 						(errcode_for_file_access(),
1787 						 errmsg("could not execute command \"%s\": %m",
1788 								cstate->filename)));
1789 		}
1790 		else
1791 		{
1792 			mode_t		oumask; /* Pre-existing umask value */
1793 			struct stat st;
1794 
1795 			/*
1796 			 * Prevent write to relative path ... too easy to shoot oneself in
1797 			 * the foot by overwriting a database file ...
1798 			 */
1799 			if (!is_absolute_path(filename))
1800 				ereport(ERROR,
1801 						(errcode(ERRCODE_INVALID_NAME),
1802 					  errmsg("relative path not allowed for COPY to file")));
1803 
1804 			oumask = umask(S_IWGRP | S_IWOTH);
1805 			PG_TRY();
1806 			{
1807 				cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1808 			}
1809 			PG_CATCH();
1810 			{
1811 				umask(oumask);
1812 				PG_RE_THROW();
1813 			}
1814 			PG_END_TRY();
1815 			umask(oumask);
1816 			if (cstate->copy_file == NULL)
1817 				ereport(ERROR,
1818 						(errcode_for_file_access(),
1819 						 errmsg("could not open file \"%s\" for writing: %m",
1820 								cstate->filename)));
1821 
1822 			if (fstat(fileno(cstate->copy_file), &st))
1823 				ereport(ERROR,
1824 						(errcode_for_file_access(),
1825 						 errmsg("could not stat file \"%s\": %m",
1826 								cstate->filename)));
1827 
1828 			if (S_ISDIR(st.st_mode))
1829 				ereport(ERROR,
1830 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1831 						 errmsg("\"%s\" is a directory", cstate->filename)));
1832 		}
1833 	}
1834 
1835 	MemoryContextSwitchTo(oldcontext);
1836 
1837 	return cstate;
1838 }
1839 
1840 /*
1841  * This intermediate routine exists mainly to localize the effects of setjmp
1842  * so we don't need to plaster a lot of variables with "volatile".
1843  */
1844 static uint64
DoCopyTo(CopyState cstate)1845 DoCopyTo(CopyState cstate)
1846 {
1847 	bool		pipe = (cstate->filename == NULL);
1848 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
1849 	uint64		processed;
1850 
1851 	PG_TRY();
1852 	{
1853 		if (fe_copy)
1854 			SendCopyBegin(cstate);
1855 
1856 		processed = CopyTo(cstate);
1857 
1858 		if (fe_copy)
1859 			SendCopyEnd(cstate);
1860 	}
1861 	PG_CATCH();
1862 	{
1863 		/*
1864 		 * Make sure we turn off old-style COPY OUT mode upon error. It is
1865 		 * okay to do this in all cases, since it does nothing if the mode is
1866 		 * not on.
1867 		 */
1868 		pq_endcopyout(true);
1869 		PG_RE_THROW();
1870 	}
1871 	PG_END_TRY();
1872 
1873 	return processed;
1874 }
1875 
1876 /*
1877  * Clean up storage and release resources for COPY TO.
1878  */
1879 static void
EndCopyTo(CopyState cstate)1880 EndCopyTo(CopyState cstate)
1881 {
1882 	if (cstate->queryDesc != NULL)
1883 	{
1884 		/* Close down the query and free resources. */
1885 		ExecutorFinish(cstate->queryDesc);
1886 		ExecutorEnd(cstate->queryDesc);
1887 		FreeQueryDesc(cstate->queryDesc);
1888 		PopActiveSnapshot();
1889 	}
1890 
1891 	/* Clean up storage */
1892 	EndCopy(cstate);
1893 }
1894 
1895 /*
1896  * Copy from relation or query TO file.
1897  */
1898 static uint64
CopyTo(CopyState cstate)1899 CopyTo(CopyState cstate)
1900 {
1901 	TupleDesc	tupDesc;
1902 	int			num_phys_attrs;
1903 	Form_pg_attribute *attr;
1904 	ListCell   *cur;
1905 	uint64		processed;
1906 
1907 	if (cstate->rel)
1908 		tupDesc = RelationGetDescr(cstate->rel);
1909 	else
1910 		tupDesc = cstate->queryDesc->tupDesc;
1911 	attr = tupDesc->attrs;
1912 	num_phys_attrs = tupDesc->natts;
1913 	cstate->null_print_client = cstate->null_print;		/* default */
1914 
1915 	/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1916 	cstate->fe_msgbuf = makeStringInfo();
1917 
1918 	/* Get info about the columns we need to process. */
1919 	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1920 	foreach(cur, cstate->attnumlist)
1921 	{
1922 		int			attnum = lfirst_int(cur);
1923 		Oid			out_func_oid;
1924 		bool		isvarlena;
1925 
1926 		if (cstate->binary)
1927 			getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1928 									&out_func_oid,
1929 									&isvarlena);
1930 		else
1931 			getTypeOutputInfo(attr[attnum - 1]->atttypid,
1932 							  &out_func_oid,
1933 							  &isvarlena);
1934 		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1935 	}
1936 
1937 	/*
1938 	 * Create a temporary memory context that we can reset once per row to
1939 	 * recover palloc'd memory.  This avoids any problems with leaks inside
1940 	 * datatype output routines, and should be faster than retail pfree's
1941 	 * anyway.  (We don't need a whole econtext as CopyFrom does.)
1942 	 */
1943 	cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1944 											   "COPY TO",
1945 											   ALLOCSET_DEFAULT_SIZES);
1946 
1947 	if (cstate->binary)
1948 	{
1949 		/* Generate header for a binary copy */
1950 		int32		tmp;
1951 
1952 		/* Signature */
1953 		CopySendData(cstate, BinarySignature, 11);
1954 		/* Flags field */
1955 		tmp = 0;
1956 		if (cstate->oids)
1957 			tmp |= (1 << 16);
1958 		CopySendInt32(cstate, tmp);
1959 		/* No header extension */
1960 		tmp = 0;
1961 		CopySendInt32(cstate, tmp);
1962 	}
1963 	else
1964 	{
1965 		/*
1966 		 * For non-binary copy, we need to convert null_print to file
1967 		 * encoding, because it will be sent directly with CopySendString.
1968 		 */
1969 		if (cstate->need_transcoding)
1970 			cstate->null_print_client = pg_server_to_any(cstate->null_print,
1971 													  cstate->null_print_len,
1972 													  cstate->file_encoding);
1973 
1974 		/* if a header has been requested send the line */
1975 		if (cstate->header_line)
1976 		{
1977 			bool		hdr_delim = false;
1978 
1979 			foreach(cur, cstate->attnumlist)
1980 			{
1981 				int			attnum = lfirst_int(cur);
1982 				char	   *colname;
1983 
1984 				if (hdr_delim)
1985 					CopySendChar(cstate, cstate->delim[0]);
1986 				hdr_delim = true;
1987 
1988 				colname = NameStr(attr[attnum - 1]->attname);
1989 
1990 				CopyAttributeOutCSV(cstate, colname, false,
1991 									list_length(cstate->attnumlist) == 1);
1992 			}
1993 
1994 			CopySendEndOfRow(cstate);
1995 		}
1996 	}
1997 
1998 	if (cstate->rel)
1999 	{
2000 		Datum	   *values;
2001 		bool	   *nulls;
2002 		HeapScanDesc scandesc;
2003 		HeapTuple	tuple;
2004 
2005 		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2006 		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2007 
2008 		scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2009 
2010 		processed = 0;
2011 		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2012 		{
2013 			CHECK_FOR_INTERRUPTS();
2014 
2015 			/* Deconstruct the tuple ... faster than repeated heap_getattr */
2016 			heap_deform_tuple(tuple, tupDesc, values, nulls);
2017 
2018 			/* Format and send the data */
2019 			CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2020 			processed++;
2021 		}
2022 
2023 		heap_endscan(scandesc);
2024 
2025 		pfree(values);
2026 		pfree(nulls);
2027 	}
2028 	else
2029 	{
2030 		/* run the plan --- the dest receiver will send tuples */
2031 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
2032 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2033 	}
2034 
2035 	if (cstate->binary)
2036 	{
2037 		/* Generate trailer for a binary copy */
2038 		CopySendInt16(cstate, -1);
2039 		/* Need to flush out the trailer */
2040 		CopySendEndOfRow(cstate);
2041 	}
2042 
2043 	MemoryContextDelete(cstate->rowcontext);
2044 
2045 	return processed;
2046 }
2047 
2048 /*
2049  * Emit one row during CopyTo().
2050  */
2051 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2052 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2053 {
2054 	bool		need_delim = false;
2055 	FmgrInfo   *out_functions = cstate->out_functions;
2056 	MemoryContext oldcontext;
2057 	ListCell   *cur;
2058 	char	   *string;
2059 
2060 	MemoryContextReset(cstate->rowcontext);
2061 	oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2062 
2063 	if (cstate->binary)
2064 	{
2065 		/* Binary per-tuple header */
2066 		CopySendInt16(cstate, list_length(cstate->attnumlist));
2067 		/* Send OID if wanted --- note attnumlist doesn't include it */
2068 		if (cstate->oids)
2069 		{
2070 			/* Hack --- assume Oid is same size as int32 */
2071 			CopySendInt32(cstate, sizeof(int32));
2072 			CopySendInt32(cstate, tupleOid);
2073 		}
2074 	}
2075 	else
2076 	{
2077 		/* Text format has no per-tuple header, but send OID if wanted */
2078 		/* Assume digits don't need any quoting or encoding conversion */
2079 		if (cstate->oids)
2080 		{
2081 			string = DatumGetCString(DirectFunctionCall1(oidout,
2082 												ObjectIdGetDatum(tupleOid)));
2083 			CopySendString(cstate, string);
2084 			need_delim = true;
2085 		}
2086 	}
2087 
2088 	foreach(cur, cstate->attnumlist)
2089 	{
2090 		int			attnum = lfirst_int(cur);
2091 		Datum		value = values[attnum - 1];
2092 		bool		isnull = nulls[attnum - 1];
2093 
2094 		if (!cstate->binary)
2095 		{
2096 			if (need_delim)
2097 				CopySendChar(cstate, cstate->delim[0]);
2098 			need_delim = true;
2099 		}
2100 
2101 		if (isnull)
2102 		{
2103 			if (!cstate->binary)
2104 				CopySendString(cstate, cstate->null_print_client);
2105 			else
2106 				CopySendInt32(cstate, -1);
2107 		}
2108 		else
2109 		{
2110 			if (!cstate->binary)
2111 			{
2112 				string = OutputFunctionCall(&out_functions[attnum - 1],
2113 											value);
2114 				if (cstate->csv_mode)
2115 					CopyAttributeOutCSV(cstate, string,
2116 										cstate->force_quote_flags[attnum - 1],
2117 										list_length(cstate->attnumlist) == 1);
2118 				else
2119 					CopyAttributeOutText(cstate, string);
2120 			}
2121 			else
2122 			{
2123 				bytea	   *outputbytes;
2124 
2125 				outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2126 											   value);
2127 				CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2128 				CopySendData(cstate, VARDATA(outputbytes),
2129 							 VARSIZE(outputbytes) - VARHDRSZ);
2130 			}
2131 		}
2132 	}
2133 
2134 	CopySendEndOfRow(cstate);
2135 
2136 	MemoryContextSwitchTo(oldcontext);
2137 }
2138 
2139 
2140 /*
2141  * error context callback for COPY FROM
2142  *
2143  * The argument for the error context must be CopyState.
2144  */
2145 void
CopyFromErrorCallback(void * arg)2146 CopyFromErrorCallback(void *arg)
2147 {
2148 	CopyState	cstate = (CopyState) arg;
2149 	char		curlineno_str[32];
2150 
2151 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2152 			 cstate->cur_lineno);
2153 
2154 	if (cstate->binary)
2155 	{
2156 		/* can't usefully display the data */
2157 		if (cstate->cur_attname)
2158 			errcontext("COPY %s, line %s, column %s",
2159 					   cstate->cur_relname, curlineno_str,
2160 					   cstate->cur_attname);
2161 		else
2162 			errcontext("COPY %s, line %s",
2163 					   cstate->cur_relname, curlineno_str);
2164 	}
2165 	else
2166 	{
2167 		if (cstate->cur_attname && cstate->cur_attval)
2168 		{
2169 			/* error is relevant to a particular column */
2170 			char	   *attval;
2171 
2172 			attval = limit_printout_length(cstate->cur_attval);
2173 			errcontext("COPY %s, line %s, column %s: \"%s\"",
2174 					   cstate->cur_relname, curlineno_str,
2175 					   cstate->cur_attname, attval);
2176 			pfree(attval);
2177 		}
2178 		else if (cstate->cur_attname)
2179 		{
2180 			/* error is relevant to a particular column, value is NULL */
2181 			errcontext("COPY %s, line %s, column %s: null input",
2182 					   cstate->cur_relname, curlineno_str,
2183 					   cstate->cur_attname);
2184 		}
2185 		else
2186 		{
2187 			/*
2188 			 * Error is relevant to a particular line.
2189 			 *
2190 			 * If line_buf still contains the correct line, and it's already
2191 			 * transcoded, print it. If it's still in a foreign encoding, it's
2192 			 * quite likely that the error is precisely a failure to do
2193 			 * encoding conversion (ie, bad data). We dare not try to convert
2194 			 * it, and at present there's no way to regurgitate it without
2195 			 * conversion. So we have to punt and just report the line number.
2196 			 */
2197 			if (cstate->line_buf_valid &&
2198 				(cstate->line_buf_converted || !cstate->need_transcoding))
2199 			{
2200 				char	   *lineval;
2201 
2202 				lineval = limit_printout_length(cstate->line_buf.data);
2203 				errcontext("COPY %s, line %s: \"%s\"",
2204 						   cstate->cur_relname, curlineno_str, lineval);
2205 				pfree(lineval);
2206 			}
2207 			else
2208 			{
2209 				errcontext("COPY %s, line %s",
2210 						   cstate->cur_relname, curlineno_str);
2211 			}
2212 		}
2213 	}
2214 }
2215 
2216 /*
2217  * Make sure we don't print an unreasonable amount of COPY data in a message.
2218  *
2219  * It would seem a lot easier to just use the sprintf "precision" limit to
2220  * truncate the string.  However, some versions of glibc have a bug/misfeature
2221  * that vsnprintf will always fail (return -1) if it is asked to truncate
2222  * a string that contains invalid byte sequences for the current encoding.
2223  * So, do our own truncation.  We return a pstrdup'd copy of the input.
2224  */
2225 static char *
limit_printout_length(const char * str)2226 limit_printout_length(const char *str)
2227 {
2228 #define MAX_COPY_DATA_DISPLAY 100
2229 
2230 	int			slen = strlen(str);
2231 	int			len;
2232 	char	   *res;
2233 
2234 	/* Fast path if definitely okay */
2235 	if (slen <= MAX_COPY_DATA_DISPLAY)
2236 		return pstrdup(str);
2237 
2238 	/* Apply encoding-dependent truncation */
2239 	len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2240 
2241 	/*
2242 	 * Truncate, and add "..." to show we truncated the input.
2243 	 */
2244 	res = (char *) palloc(len + 4);
2245 	memcpy(res, str, len);
2246 	strcpy(res + len, "...");
2247 
2248 	return res;
2249 }
2250 
2251 /*
2252  * Copy FROM file to relation.
2253  */
2254 static uint64
CopyFrom(CopyState cstate)2255 CopyFrom(CopyState cstate)
2256 {
2257 	HeapTuple	tuple;
2258 	TupleDesc	tupDesc;
2259 	Datum	   *values;
2260 	bool	   *nulls;
2261 	ResultRelInfo *resultRelInfo;
2262 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
2263 	ExprContext *econtext;
2264 	TupleTableSlot *myslot;
2265 	MemoryContext oldcontext = CurrentMemoryContext;
2266 
2267 	ErrorContextCallback errcallback;
2268 	CommandId	mycid = GetCurrentCommandId(true);
2269 	int			hi_options = 0; /* start with default heap_insert options */
2270 	BulkInsertState bistate;
2271 	uint64		processed = 0;
2272 	bool		useHeapMultiInsert;
2273 	int			nBufferedTuples = 0;
2274 
2275 #define MAX_BUFFERED_TUPLES 1000
2276 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
2277 	Size		bufferedTuplesSize = 0;
2278 	uint64		firstBufferedLineNo = 0;
2279 
2280 	Assert(cstate->rel);
2281 
2282 	if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
2283 	{
2284 		if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2285 			ereport(ERROR,
2286 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2287 					 errmsg("cannot copy to view \"%s\"",
2288 							RelationGetRelationName(cstate->rel))));
2289 		else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2290 			ereport(ERROR,
2291 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2292 					 errmsg("cannot copy to materialized view \"%s\"",
2293 							RelationGetRelationName(cstate->rel))));
2294 		else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2295 			ereport(ERROR,
2296 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2297 					 errmsg("cannot copy to foreign table \"%s\"",
2298 							RelationGetRelationName(cstate->rel))));
2299 		else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2300 			ereport(ERROR,
2301 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2302 					 errmsg("cannot copy to sequence \"%s\"",
2303 							RelationGetRelationName(cstate->rel))));
2304 		else
2305 			ereport(ERROR,
2306 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2307 					 errmsg("cannot copy to non-table relation \"%s\"",
2308 							RelationGetRelationName(cstate->rel))));
2309 	}
2310 
2311 	tupDesc = RelationGetDescr(cstate->rel);
2312 
2313 	/*----------
2314 	 * Check to see if we can avoid writing WAL
2315 	 *
2316 	 * If archive logging/streaming is not enabled *and* either
2317 	 *	- table was created in same transaction as this COPY
2318 	 *	- data is being written to relfilenode created in this transaction
2319 	 * then we can skip writing WAL.  It's safe because if the transaction
2320 	 * doesn't commit, we'll discard the table (or the new relfilenode file).
2321 	 * If it does commit, we'll have done the heap_sync at the bottom of this
2322 	 * routine first.
2323 	 *
2324 	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2325 	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2326 	 * can be cleared before the end of the transaction. The exact case is
2327 	 * when a relation sets a new relfilenode twice in same transaction, yet
2328 	 * the second one fails in an aborted subtransaction, e.g.
2329 	 *
2330 	 * BEGIN;
2331 	 * TRUNCATE t;
2332 	 * SAVEPOINT save;
2333 	 * TRUNCATE t;
2334 	 * ROLLBACK TO save;
2335 	 * COPY ...
2336 	 *
2337 	 * Also, if the target file is new-in-transaction, we assume that checking
2338 	 * FSM for free space is a waste of time, even if we must use WAL because
2339 	 * of archiving.  This could possibly be wrong, but it's unlikely.
2340 	 *
2341 	 * The comments for heap_insert and RelationGetBufferForTuple specify that
2342 	 * skipping WAL logging is only safe if we ensure that our tuples do not
2343 	 * go into pages containing tuples from any other transactions --- but this
2344 	 * must be the case if we have a new table or new relfilenode, so we need
2345 	 * no additional work to enforce that.
2346 	 *----------
2347 	 */
2348 	/* createSubid is creation check, newRelfilenodeSubid is truncation check */
2349 	if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2350 		cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2351 	{
2352 		hi_options |= HEAP_INSERT_SKIP_FSM;
2353 		if (!XLogIsNeeded())
2354 			hi_options |= HEAP_INSERT_SKIP_WAL;
2355 	}
2356 
2357 	/*
2358 	 * Optimize if new relfilenode was created in this subxact or one of its
2359 	 * committed children and we won't see those rows later as part of an
2360 	 * earlier scan or command. The subxact test ensures that if this subxact
2361 	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
2362 	 * that the stronger test of exactly which subtransaction created it is
2363 	 * crucial for correctness of this optimisation. The test for an earlier
2364 	 * scan or command tolerates false negatives. FREEZE causes other sessions
2365 	 * to see rows they would not see under MVCC, and a false negative merely
2366 	 * spreads that anomaly to the current session.
2367 	 */
2368 	if (cstate->freeze)
2369 	{
2370 		/*
2371 		 * Tolerate one registration for the benefit of FirstXactSnapshot.
2372 		 * Scan-bearing queries generally create at least two registrations,
2373 		 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2374 		 * Clear CatalogSnapshot to avoid counting its registration.  We'll
2375 		 * still detect ongoing catalog scans, each of which separately
2376 		 * registers the snapshot it uses.
2377 		 */
2378 		InvalidateCatalogSnapshot();
2379 		if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2380 			ereport(ERROR,
2381 					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2382 					 errmsg("cannot perform FREEZE because of prior transaction activity")));
2383 
2384 		if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2385 		 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2386 			ereport(ERROR,
2387 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2388 					 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2389 
2390 		hi_options |= HEAP_INSERT_FROZEN;
2391 	}
2392 
2393 	/*
2394 	 * We need a ResultRelInfo so we can use the regular executor's
2395 	 * index-entry-making machinery.  (There used to be a huge amount of code
2396 	 * here that basically duplicated execUtils.c ...)
2397 	 */
2398 	resultRelInfo = makeNode(ResultRelInfo);
2399 	InitResultRelInfo(resultRelInfo,
2400 					  cstate->rel,
2401 					  1,		/* dummy rangetable index */
2402 					  0);
2403 
2404 	ExecOpenIndices(resultRelInfo, false);
2405 
2406 	estate->es_result_relations = resultRelInfo;
2407 	estate->es_num_result_relations = 1;
2408 	estate->es_result_relation_info = resultRelInfo;
2409 	estate->es_range_table = cstate->range_table;
2410 
2411 	/* Set up a tuple slot too */
2412 	myslot = ExecInitExtraTupleSlot(estate);
2413 	ExecSetSlotDescriptor(myslot, tupDesc);
2414 	/* Triggers might need a slot as well */
2415 	estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2416 
2417 	/*
2418 	 * It's more efficient to prepare a bunch of tuples for insertion, and
2419 	 * insert them in one heap_multi_insert() call, than call heap_insert()
2420 	 * separately for every tuple. However, we can't do that if there are
2421 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2422 	 * expressions. Such triggers or expressions might query the table we're
2423 	 * inserting to, and act differently if the tuples that have already been
2424 	 * processed and prepared for insertion are not there.
2425 	 */
2426 	if ((resultRelInfo->ri_TrigDesc != NULL &&
2427 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2428 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2429 		cstate->volatile_defexprs)
2430 	{
2431 		useHeapMultiInsert = false;
2432 	}
2433 	else
2434 	{
2435 		useHeapMultiInsert = true;
2436 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2437 	}
2438 
2439 	/* Prepare to catch AFTER triggers. */
2440 	AfterTriggerBeginQuery();
2441 
2442 	/*
2443 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2444 	 * should do this for COPY, since it's not really an "INSERT" statement as
2445 	 * such. However, executing these triggers maintains consistency with the
2446 	 * EACH ROW triggers that we already fire on COPY.
2447 	 */
2448 	ExecBSInsertTriggers(estate, resultRelInfo);
2449 
2450 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2451 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2452 
2453 	bistate = GetBulkInsertState();
2454 	econtext = GetPerTupleExprContext(estate);
2455 
2456 	/* Set up callback to identify error line number */
2457 	errcallback.callback = CopyFromErrorCallback;
2458 	errcallback.arg = (void *) cstate;
2459 	errcallback.previous = error_context_stack;
2460 	error_context_stack = &errcallback;
2461 
2462 	for (;;)
2463 	{
2464 		TupleTableSlot *slot;
2465 		bool		skip_tuple;
2466 		Oid			loaded_oid = InvalidOid;
2467 
2468 		CHECK_FOR_INTERRUPTS();
2469 
2470 		if (nBufferedTuples == 0)
2471 		{
2472 			/*
2473 			 * Reset the per-tuple exprcontext. We can only do this if the
2474 			 * tuple buffer is empty. (Calling the context the per-tuple
2475 			 * memory context is a bit of a misnomer now.)
2476 			 */
2477 			ResetPerTupleExprContext(estate);
2478 		}
2479 
2480 		/* Switch into its memory context */
2481 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2482 
2483 		if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2484 			break;
2485 
2486 		/* And now we can form the input tuple. */
2487 		tuple = heap_form_tuple(tupDesc, values, nulls);
2488 
2489 		if (loaded_oid != InvalidOid)
2490 			HeapTupleSetOid(tuple, loaded_oid);
2491 
2492 		/*
2493 		 * Constraints might reference the tableoid column, so initialize
2494 		 * t_tableOid before evaluating them.
2495 		 */
2496 		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2497 
2498 		/* Triggers and stuff need to be invoked in query context. */
2499 		MemoryContextSwitchTo(oldcontext);
2500 
2501 		/* Place tuple in tuple slot --- but slot shouldn't free it */
2502 		slot = myslot;
2503 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2504 
2505 		skip_tuple = false;
2506 
2507 		/* BEFORE ROW INSERT Triggers */
2508 		if (resultRelInfo->ri_TrigDesc &&
2509 			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2510 		{
2511 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2512 
2513 			if (slot == NULL)	/* "do nothing" */
2514 				skip_tuple = true;
2515 			else	/* trigger might have changed tuple */
2516 				tuple = ExecMaterializeSlot(slot);
2517 		}
2518 
2519 		if (!skip_tuple)
2520 		{
2521 			/* Check the constraints of the tuple */
2522 			if (cstate->rel->rd_att->constr)
2523 				ExecConstraints(resultRelInfo, slot, estate);
2524 
2525 			if (useHeapMultiInsert)
2526 			{
2527 				/* Add this tuple to the tuple buffer */
2528 				if (nBufferedTuples == 0)
2529 					firstBufferedLineNo = cstate->cur_lineno;
2530 				bufferedTuples[nBufferedTuples++] = tuple;
2531 				bufferedTuplesSize += tuple->t_len;
2532 
2533 				/*
2534 				 * If the buffer filled up, flush it. Also flush if the total
2535 				 * size of all the tuples in the buffer becomes large, to
2536 				 * avoid using large amounts of memory for the buffers when
2537 				 * the tuples are exceptionally wide.
2538 				 */
2539 				if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2540 					bufferedTuplesSize > 65535)
2541 				{
2542 					CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2543 										resultRelInfo, myslot, bistate,
2544 										nBufferedTuples, bufferedTuples,
2545 										firstBufferedLineNo);
2546 					nBufferedTuples = 0;
2547 					bufferedTuplesSize = 0;
2548 				}
2549 			}
2550 			else
2551 			{
2552 				List	   *recheckIndexes = NIL;
2553 
2554 				/* OK, store the tuple and create index entries for it */
2555 				heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2556 
2557 				if (resultRelInfo->ri_NumIndices > 0)
2558 					recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
2559 														 estate, false, NULL,
2560 														   NIL);
2561 
2562 				/* AFTER ROW INSERT Triggers */
2563 				ExecARInsertTriggers(estate, resultRelInfo, tuple,
2564 									 recheckIndexes);
2565 
2566 				list_free(recheckIndexes);
2567 			}
2568 
2569 			/*
2570 			 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2571 			 * this is the same definition used by execMain.c for counting
2572 			 * tuples inserted by an INSERT command.
2573 			 */
2574 			processed++;
2575 		}
2576 	}
2577 
2578 	/* Flush any remaining buffered tuples */
2579 	if (nBufferedTuples > 0)
2580 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2581 							resultRelInfo, myslot, bistate,
2582 							nBufferedTuples, bufferedTuples,
2583 							firstBufferedLineNo);
2584 
2585 	/* Done, clean up */
2586 	error_context_stack = errcallback.previous;
2587 
2588 	FreeBulkInsertState(bistate);
2589 
2590 	MemoryContextSwitchTo(oldcontext);
2591 
2592 	/*
2593 	 * In the old protocol, tell pqcomm that we can process normal protocol
2594 	 * messages again.
2595 	 */
2596 	if (cstate->copy_dest == COPY_OLD_FE)
2597 		pq_endmsgread();
2598 
2599 	/* Execute AFTER STATEMENT insertion triggers */
2600 	ExecASInsertTriggers(estate, resultRelInfo);
2601 
2602 	/* Handle queued AFTER triggers */
2603 	AfterTriggerEndQuery(estate);
2604 
2605 	pfree(values);
2606 	pfree(nulls);
2607 
2608 	ExecResetTupleTable(estate->es_tupleTable, false);
2609 
2610 	ExecCloseIndices(resultRelInfo);
2611 
2612 	FreeExecutorState(estate);
2613 
2614 	/*
2615 	 * If we skipped writing WAL, then we need to sync the heap (but not
2616 	 * indexes since those use WAL anyway)
2617 	 */
2618 	if (hi_options & HEAP_INSERT_SKIP_WAL)
2619 		heap_sync(cstate->rel);
2620 
2621 	return processed;
2622 }
2623 
2624 /*
2625  * A subroutine of CopyFrom, to write the current batch of buffered heap
2626  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2627  * triggers.
2628  */
2629 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2630 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2631 					int hi_options, ResultRelInfo *resultRelInfo,
2632 					TupleTableSlot *myslot, BulkInsertState bistate,
2633 					int nBufferedTuples, HeapTuple *bufferedTuples,
2634 					uint64 firstBufferedLineNo)
2635 {
2636 	MemoryContext oldcontext;
2637 	int			i;
2638 	uint64		save_cur_lineno;
2639 
2640 	/*
2641 	 * Print error context information correctly, if one of the operations
2642 	 * below fail.
2643 	 */
2644 	cstate->line_buf_valid = false;
2645 	save_cur_lineno = cstate->cur_lineno;
2646 
2647 	/*
2648 	 * heap_multi_insert leaks memory, so switch to short-lived memory context
2649 	 * before calling it.
2650 	 */
2651 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2652 	heap_multi_insert(cstate->rel,
2653 					  bufferedTuples,
2654 					  nBufferedTuples,
2655 					  mycid,
2656 					  hi_options,
2657 					  bistate);
2658 	MemoryContextSwitchTo(oldcontext);
2659 
2660 	/*
2661 	 * If there are any indexes, update them for all the inserted tuples, and
2662 	 * run AFTER ROW INSERT triggers.
2663 	 */
2664 	if (resultRelInfo->ri_NumIndices > 0)
2665 	{
2666 		for (i = 0; i < nBufferedTuples; i++)
2667 		{
2668 			List	   *recheckIndexes;
2669 
2670 			cstate->cur_lineno = firstBufferedLineNo + i;
2671 			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2672 			recheckIndexes =
2673 				ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2674 									  estate, false, NULL, NIL);
2675 			ExecARInsertTriggers(estate, resultRelInfo,
2676 								 bufferedTuples[i],
2677 								 recheckIndexes);
2678 			list_free(recheckIndexes);
2679 		}
2680 	}
2681 
2682 	/*
2683 	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2684 	 * anyway.
2685 	 */
2686 	else if (resultRelInfo->ri_TrigDesc != NULL &&
2687 			 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2688 	{
2689 		for (i = 0; i < nBufferedTuples; i++)
2690 		{
2691 			cstate->cur_lineno = firstBufferedLineNo + i;
2692 			ExecARInsertTriggers(estate, resultRelInfo,
2693 								 bufferedTuples[i],
2694 								 NIL);
2695 		}
2696 	}
2697 
2698 	/* reset cur_lineno to where we were */
2699 	cstate->cur_lineno = save_cur_lineno;
2700 }
2701 
2702 /*
2703  * Setup to read tuples from a file for COPY FROM.
2704  *
2705  * 'rel': Used as a template for the tuples
2706  * 'filename': Name of server-local file to read
2707  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2708  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2709  *
2710  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2711  */
2712 CopyState
BeginCopyFrom(Relation rel,const char * filename,bool is_program,List * attnamelist,List * options)2713 BeginCopyFrom(Relation rel,
2714 			  const char *filename,
2715 			  bool is_program,
2716 			  List *attnamelist,
2717 			  List *options)
2718 {
2719 	CopyState	cstate;
2720 	bool		pipe = (filename == NULL);
2721 	TupleDesc	tupDesc;
2722 	Form_pg_attribute *attr;
2723 	AttrNumber	num_phys_attrs,
2724 				num_defaults;
2725 	FmgrInfo   *in_functions;
2726 	Oid		   *typioparams;
2727 	int			attnum;
2728 	Oid			in_func_oid;
2729 	int		   *defmap;
2730 	ExprState **defexprs;
2731 	MemoryContext oldcontext;
2732 	bool		volatile_defexprs;
2733 
2734 	cstate = BeginCopy(true, rel, NULL, NULL, InvalidOid, attnamelist, options);
2735 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2736 
2737 	/* Initialize state variables */
2738 	cstate->fe_eof = false;
2739 	cstate->eol_type = EOL_UNKNOWN;
2740 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
2741 	cstate->cur_lineno = 0;
2742 	cstate->cur_attname = NULL;
2743 	cstate->cur_attval = NULL;
2744 
2745 	/* Set up variables to avoid per-attribute overhead. */
2746 	initStringInfo(&cstate->attribute_buf);
2747 	initStringInfo(&cstate->line_buf);
2748 	cstate->line_buf_converted = false;
2749 	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2750 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
2751 
2752 	tupDesc = RelationGetDescr(cstate->rel);
2753 	attr = tupDesc->attrs;
2754 	num_phys_attrs = tupDesc->natts;
2755 	num_defaults = 0;
2756 	volatile_defexprs = false;
2757 
2758 	/*
2759 	 * Pick up the required catalog information for each attribute in the
2760 	 * relation, including the input function, the element type (to pass to
2761 	 * the input function), and info about defaults and constraints. (Which
2762 	 * input function we use depends on text/binary format choice.)
2763 	 */
2764 	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2765 	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2766 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2767 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2768 
2769 	for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2770 	{
2771 		/* We don't need info for dropped attributes */
2772 		if (attr[attnum - 1]->attisdropped)
2773 			continue;
2774 
2775 		/* Fetch the input function and typioparam info */
2776 		if (cstate->binary)
2777 			getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2778 								   &in_func_oid, &typioparams[attnum - 1]);
2779 		else
2780 			getTypeInputInfo(attr[attnum - 1]->atttypid,
2781 							 &in_func_oid, &typioparams[attnum - 1]);
2782 		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2783 
2784 		/* Get default info if needed */
2785 		if (!list_member_int(cstate->attnumlist, attnum))
2786 		{
2787 			/* attribute is NOT to be copied from input */
2788 			/* use default value if one exists */
2789 			Expr	   *defexpr = (Expr *) build_column_default(cstate->rel,
2790 																attnum);
2791 
2792 			if (defexpr != NULL)
2793 			{
2794 				/* Run the expression through planner */
2795 				defexpr = expression_planner(defexpr);
2796 
2797 				/* Initialize executable expression in copycontext */
2798 				defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
2799 				defmap[num_defaults] = attnum - 1;
2800 				num_defaults++;
2801 
2802 				/*
2803 				 * If a default expression looks at the table being loaded,
2804 				 * then it could give the wrong answer when using
2805 				 * multi-insert. Since database access can be dynamic this is
2806 				 * hard to test for exactly, so we use the much wider test of
2807 				 * whether the default expression is volatile. We allow for
2808 				 * the special case of when the default expression is the
2809 				 * nextval() of a sequence which in this specific case is
2810 				 * known to be safe for use with the multi-insert
2811 				 * optimisation. Hence we use this special case function
2812 				 * checker rather than the standard check for
2813 				 * contain_volatile_functions().
2814 				 */
2815 				if (!volatile_defexprs)
2816 					volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
2817 			}
2818 		}
2819 	}
2820 
2821 	/* We keep those variables in cstate. */
2822 	cstate->in_functions = in_functions;
2823 	cstate->typioparams = typioparams;
2824 	cstate->defmap = defmap;
2825 	cstate->defexprs = defexprs;
2826 	cstate->volatile_defexprs = volatile_defexprs;
2827 	cstate->num_defaults = num_defaults;
2828 	cstate->is_program = is_program;
2829 
2830 	if (pipe)
2831 	{
2832 		Assert(!is_program);	/* the grammar does not allow this */
2833 		if (whereToSendOutput == DestRemote)
2834 			ReceiveCopyBegin(cstate);
2835 		else
2836 			cstate->copy_file = stdin;
2837 	}
2838 	else
2839 	{
2840 		cstate->filename = pstrdup(filename);
2841 
2842 		if (cstate->is_program)
2843 		{
2844 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
2845 			if (cstate->copy_file == NULL)
2846 				ereport(ERROR,
2847 						(errcode_for_file_access(),
2848 						 errmsg("could not execute command \"%s\": %m",
2849 								cstate->filename)));
2850 		}
2851 		else
2852 		{
2853 			struct stat st;
2854 
2855 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
2856 			if (cstate->copy_file == NULL)
2857 				ereport(ERROR,
2858 						(errcode_for_file_access(),
2859 						 errmsg("could not open file \"%s\" for reading: %m",
2860 								cstate->filename)));
2861 
2862 			if (fstat(fileno(cstate->copy_file), &st))
2863 				ereport(ERROR,
2864 						(errcode_for_file_access(),
2865 						 errmsg("could not stat file \"%s\": %m",
2866 								cstate->filename)));
2867 
2868 			if (S_ISDIR(st.st_mode))
2869 				ereport(ERROR,
2870 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2871 						 errmsg("\"%s\" is a directory", cstate->filename)));
2872 		}
2873 	}
2874 
2875 	if (!cstate->binary)
2876 	{
2877 		/* must rely on user to tell us... */
2878 		cstate->file_has_oids = cstate->oids;
2879 	}
2880 	else
2881 	{
2882 		/* Read and verify binary header */
2883 		char		readSig[11];
2884 		int32		tmp;
2885 
2886 		/* Signature */
2887 		if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
2888 			memcmp(readSig, BinarySignature, 11) != 0)
2889 			ereport(ERROR,
2890 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2891 					 errmsg("COPY file signature not recognized")));
2892 		/* Flags field */
2893 		if (!CopyGetInt32(cstate, &tmp))
2894 			ereport(ERROR,
2895 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2896 					 errmsg("invalid COPY file header (missing flags)")));
2897 		cstate->file_has_oids = (tmp & (1 << 16)) != 0;
2898 		tmp &= ~(1 << 16);
2899 		if ((tmp >> 16) != 0)
2900 			ereport(ERROR,
2901 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2902 				 errmsg("unrecognized critical flags in COPY file header")));
2903 		/* Header extension length */
2904 		if (!CopyGetInt32(cstate, &tmp) ||
2905 			tmp < 0)
2906 			ereport(ERROR,
2907 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2908 					 errmsg("invalid COPY file header (missing length)")));
2909 		/* Skip extension header, if present */
2910 		while (tmp-- > 0)
2911 		{
2912 			if (CopyGetData(cstate, readSig, 1, 1) != 1)
2913 				ereport(ERROR,
2914 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2915 						 errmsg("invalid COPY file header (wrong length)")));
2916 		}
2917 	}
2918 
2919 	if (cstate->file_has_oids && cstate->binary)
2920 	{
2921 		getTypeBinaryInputInfo(OIDOID,
2922 							   &in_func_oid, &cstate->oid_typioparam);
2923 		fmgr_info(in_func_oid, &cstate->oid_in_function);
2924 	}
2925 
2926 	/* create workspace for CopyReadAttributes results */
2927 	if (!cstate->binary)
2928 	{
2929 		AttrNumber	attr_count = list_length(cstate->attnumlist);
2930 		int			nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
2931 
2932 		cstate->max_fields = nfields;
2933 		cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
2934 	}
2935 
2936 	MemoryContextSwitchTo(oldcontext);
2937 
2938 	return cstate;
2939 }
2940 
2941 /*
2942  * Read raw fields in the next line for COPY FROM in text or csv mode.
2943  * Return false if no more lines.
2944  *
2945  * An internal temporary buffer is returned via 'fields'. It is valid until
2946  * the next call of the function. Since the function returns all raw fields
2947  * in the input file, 'nfields' could be different from the number of columns
2948  * in the relation.
2949  *
2950  * NOTE: force_not_null option are not applied to the returned fields.
2951  */
2952 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)2953 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
2954 {
2955 	int			fldct;
2956 	bool		done;
2957 
2958 	/* only available for text or csv input */
2959 	Assert(!cstate->binary);
2960 
2961 	/* on input just throw the header line away */
2962 	if (cstate->cur_lineno == 0 && cstate->header_line)
2963 	{
2964 		cstate->cur_lineno++;
2965 		if (CopyReadLine(cstate))
2966 			return false;		/* done */
2967 	}
2968 
2969 	cstate->cur_lineno++;
2970 
2971 	/* Actually read the line into memory here */
2972 	done = CopyReadLine(cstate);
2973 
2974 	/*
2975 	 * EOF at start of line means we're done.  If we see EOF after some
2976 	 * characters, we act as though it was newline followed by EOF, ie,
2977 	 * process the line and then exit loop on next iteration.
2978 	 */
2979 	if (done && cstate->line_buf.len == 0)
2980 		return false;
2981 
2982 	/* Parse the line into de-escaped field values */
2983 	if (cstate->csv_mode)
2984 		fldct = CopyReadAttributesCSV(cstate);
2985 	else
2986 		fldct = CopyReadAttributesText(cstate);
2987 
2988 	*fields = cstate->raw_fields;
2989 	*nfields = fldct;
2990 	return true;
2991 }
2992 
2993 /*
2994  * Read next tuple from file for COPY FROM. Return false if no more tuples.
2995  *
2996  * 'econtext' is used to evaluate default expression for each columns not
2997  * read from the file. It can be NULL when no default values are used, i.e.
2998  * when all columns are read from the file.
2999  *
3000  * 'values' and 'nulls' arrays must be the same length as columns of the
3001  * relation passed to BeginCopyFrom. This function fills the arrays.
3002  * Oid of the tuple is returned with 'tupleOid' separately.
3003  */
3004 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3005 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3006 			 Datum *values, bool *nulls, Oid *tupleOid)
3007 {
3008 	TupleDesc	tupDesc;
3009 	Form_pg_attribute *attr;
3010 	AttrNumber	num_phys_attrs,
3011 				attr_count,
3012 				num_defaults = cstate->num_defaults;
3013 	FmgrInfo   *in_functions = cstate->in_functions;
3014 	Oid		   *typioparams = cstate->typioparams;
3015 	int			i;
3016 	int			nfields;
3017 	bool		isnull;
3018 	bool		file_has_oids = cstate->file_has_oids;
3019 	int		   *defmap = cstate->defmap;
3020 	ExprState **defexprs = cstate->defexprs;
3021 
3022 	tupDesc = RelationGetDescr(cstate->rel);
3023 	attr = tupDesc->attrs;
3024 	num_phys_attrs = tupDesc->natts;
3025 	attr_count = list_length(cstate->attnumlist);
3026 	nfields = file_has_oids ? (attr_count + 1) : attr_count;
3027 
3028 	/* Initialize all values for row to NULL */
3029 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3030 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3031 
3032 	if (!cstate->binary)
3033 	{
3034 		char	  **field_strings;
3035 		ListCell   *cur;
3036 		int			fldct;
3037 		int			fieldno;
3038 		char	   *string;
3039 
3040 		/* read raw fields in the next line */
3041 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3042 			return false;
3043 
3044 		/* check for overflowing fields */
3045 		if (nfields > 0 && fldct > nfields)
3046 			ereport(ERROR,
3047 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3048 					 errmsg("extra data after last expected column")));
3049 
3050 		fieldno = 0;
3051 
3052 		/* Read the OID field if present */
3053 		if (file_has_oids)
3054 		{
3055 			if (fieldno >= fldct)
3056 				ereport(ERROR,
3057 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3058 						 errmsg("missing data for OID column")));
3059 			string = field_strings[fieldno++];
3060 
3061 			if (string == NULL)
3062 				ereport(ERROR,
3063 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3064 						 errmsg("null OID in COPY data")));
3065 			else if (cstate->oids && tupleOid != NULL)
3066 			{
3067 				cstate->cur_attname = "oid";
3068 				cstate->cur_attval = string;
3069 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3070 												   CStringGetDatum(string)));
3071 				if (*tupleOid == InvalidOid)
3072 					ereport(ERROR,
3073 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3074 							 errmsg("invalid OID in COPY data")));
3075 				cstate->cur_attname = NULL;
3076 				cstate->cur_attval = NULL;
3077 			}
3078 		}
3079 
3080 		/* Loop to read the user attributes on the line. */
3081 		foreach(cur, cstate->attnumlist)
3082 		{
3083 			int			attnum = lfirst_int(cur);
3084 			int			m = attnum - 1;
3085 
3086 			if (fieldno >= fldct)
3087 				ereport(ERROR,
3088 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3089 						 errmsg("missing data for column \"%s\"",
3090 								NameStr(attr[m]->attname))));
3091 			string = field_strings[fieldno++];
3092 
3093 			if (cstate->convert_select_flags &&
3094 				!cstate->convert_select_flags[m])
3095 			{
3096 				/* ignore input field, leaving column as NULL */
3097 				continue;
3098 			}
3099 
3100 			if (cstate->csv_mode)
3101 			{
3102 				if (string == NULL &&
3103 					cstate->force_notnull_flags[m])
3104 				{
3105 					/*
3106 					 * FORCE_NOT_NULL option is set and column is NULL -
3107 					 * convert it to the NULL string.
3108 					 */
3109 					string = cstate->null_print;
3110 				}
3111 				else if (string != NULL && cstate->force_null_flags[m]
3112 						 && strcmp(string, cstate->null_print) == 0)
3113 				{
3114 					/*
3115 					 * FORCE_NULL option is set and column matches the NULL
3116 					 * string. It must have been quoted, or otherwise the
3117 					 * string would already have been set to NULL. Convert it
3118 					 * to NULL as specified.
3119 					 */
3120 					string = NULL;
3121 				}
3122 			}
3123 
3124 			cstate->cur_attname = NameStr(attr[m]->attname);
3125 			cstate->cur_attval = string;
3126 			values[m] = InputFunctionCall(&in_functions[m],
3127 										  string,
3128 										  typioparams[m],
3129 										  attr[m]->atttypmod);
3130 			if (string != NULL)
3131 				nulls[m] = false;
3132 			cstate->cur_attname = NULL;
3133 			cstate->cur_attval = NULL;
3134 		}
3135 
3136 		Assert(fieldno == nfields);
3137 	}
3138 	else
3139 	{
3140 		/* binary */
3141 		int16		fld_count;
3142 		ListCell   *cur;
3143 
3144 		cstate->cur_lineno++;
3145 
3146 		if (!CopyGetInt16(cstate, &fld_count))
3147 		{
3148 			/* EOF detected (end of file, or protocol-level EOF) */
3149 			return false;
3150 		}
3151 
3152 		if (fld_count == -1)
3153 		{
3154 			/*
3155 			 * Received EOF marker.  In a V3-protocol copy, wait for the
3156 			 * protocol-level EOF, and complain if it doesn't come
3157 			 * immediately.  This ensures that we correctly handle CopyFail,
3158 			 * if client chooses to send that now.
3159 			 *
3160 			 * Note that we MUST NOT try to read more data in an old-protocol
3161 			 * copy, since there is no protocol-level EOF marker then.  We
3162 			 * could go either way for copy from file, but choose to throw
3163 			 * error if there's data after the EOF marker, for consistency
3164 			 * with the new-protocol case.
3165 			 */
3166 			char		dummy;
3167 
3168 			if (cstate->copy_dest != COPY_OLD_FE &&
3169 				CopyGetData(cstate, &dummy, 1, 1) > 0)
3170 				ereport(ERROR,
3171 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3172 						 errmsg("received copy data after EOF marker")));
3173 			return false;
3174 		}
3175 
3176 		if (fld_count != attr_count)
3177 			ereport(ERROR,
3178 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3179 					 errmsg("row field count is %d, expected %d",
3180 							(int) fld_count, attr_count)));
3181 
3182 		if (file_has_oids)
3183 		{
3184 			Oid			loaded_oid;
3185 
3186 			cstate->cur_attname = "oid";
3187 			loaded_oid =
3188 				DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3189 														 0,
3190 													&cstate->oid_in_function,
3191 													  cstate->oid_typioparam,
3192 														 -1,
3193 														 &isnull));
3194 			if (isnull || loaded_oid == InvalidOid)
3195 				ereport(ERROR,
3196 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3197 						 errmsg("invalid OID in COPY data")));
3198 			cstate->cur_attname = NULL;
3199 			if (cstate->oids && tupleOid != NULL)
3200 				*tupleOid = loaded_oid;
3201 		}
3202 
3203 		i = 0;
3204 		foreach(cur, cstate->attnumlist)
3205 		{
3206 			int			attnum = lfirst_int(cur);
3207 			int			m = attnum - 1;
3208 
3209 			cstate->cur_attname = NameStr(attr[m]->attname);
3210 			i++;
3211 			values[m] = CopyReadBinaryAttribute(cstate,
3212 												i,
3213 												&in_functions[m],
3214 												typioparams[m],
3215 												attr[m]->atttypmod,
3216 												&nulls[m]);
3217 			cstate->cur_attname = NULL;
3218 		}
3219 	}
3220 
3221 	/*
3222 	 * Now compute and insert any defaults available for the columns not
3223 	 * provided by the input data.  Anything not processed here or above will
3224 	 * remain NULL.
3225 	 */
3226 	for (i = 0; i < num_defaults; i++)
3227 	{
3228 		/*
3229 		 * The caller must supply econtext and have switched into the
3230 		 * per-tuple memory context in it.
3231 		 */
3232 		Assert(econtext != NULL);
3233 		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3234 
3235 		values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3236 										 &nulls[defmap[i]], NULL);
3237 	}
3238 
3239 	return true;
3240 }
3241 
3242 /*
3243  * Clean up storage and release resources for COPY FROM.
3244  */
3245 void
EndCopyFrom(CopyState cstate)3246 EndCopyFrom(CopyState cstate)
3247 {
3248 	/* No COPY FROM related resources except memory. */
3249 
3250 	EndCopy(cstate);
3251 }
3252 
3253 /*
3254  * Read the next input line and stash it in line_buf, with conversion to
3255  * server encoding.
3256  *
3257  * Result is true if read was terminated by EOF, false if terminated
3258  * by newline.  The terminating newline or EOF marker is not included
3259  * in the final value of line_buf.
3260  */
3261 static bool
CopyReadLine(CopyState cstate)3262 CopyReadLine(CopyState cstate)
3263 {
3264 	bool		result;
3265 
3266 	resetStringInfo(&cstate->line_buf);
3267 	cstate->line_buf_valid = true;
3268 
3269 	/* Mark that encoding conversion hasn't occurred yet */
3270 	cstate->line_buf_converted = false;
3271 
3272 	/* Parse data and transfer into line_buf */
3273 	result = CopyReadLineText(cstate);
3274 
3275 	if (result)
3276 	{
3277 		/*
3278 		 * Reached EOF.  In protocol version 3, we should ignore anything
3279 		 * after \. up to the protocol end of copy data.  (XXX maybe better
3280 		 * not to treat \. as special?)
3281 		 */
3282 		if (cstate->copy_dest == COPY_NEW_FE)
3283 		{
3284 			do
3285 			{
3286 				cstate->raw_buf_index = cstate->raw_buf_len;
3287 			} while (CopyLoadRawBuf(cstate));
3288 		}
3289 	}
3290 	else
3291 	{
3292 		/*
3293 		 * If we didn't hit EOF, then we must have transferred the EOL marker
3294 		 * to line_buf along with the data.  Get rid of it.
3295 		 */
3296 		switch (cstate->eol_type)
3297 		{
3298 			case EOL_NL:
3299 				Assert(cstate->line_buf.len >= 1);
3300 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3301 				cstate->line_buf.len--;
3302 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3303 				break;
3304 			case EOL_CR:
3305 				Assert(cstate->line_buf.len >= 1);
3306 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3307 				cstate->line_buf.len--;
3308 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3309 				break;
3310 			case EOL_CRNL:
3311 				Assert(cstate->line_buf.len >= 2);
3312 				Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3313 				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3314 				cstate->line_buf.len -= 2;
3315 				cstate->line_buf.data[cstate->line_buf.len] = '\0';
3316 				break;
3317 			case EOL_UNKNOWN:
3318 				/* shouldn't get here */
3319 				Assert(false);
3320 				break;
3321 		}
3322 	}
3323 
3324 	/* Done reading the line.  Convert it to server encoding. */
3325 	if (cstate->need_transcoding)
3326 	{
3327 		char	   *cvt;
3328 
3329 		cvt = pg_any_to_server(cstate->line_buf.data,
3330 							   cstate->line_buf.len,
3331 							   cstate->file_encoding);
3332 		if (cvt != cstate->line_buf.data)
3333 		{
3334 			/* transfer converted data back to line_buf */
3335 			resetStringInfo(&cstate->line_buf);
3336 			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3337 			pfree(cvt);
3338 		}
3339 	}
3340 
3341 	/* Now it's safe to use the buffer in error messages */
3342 	cstate->line_buf_converted = true;
3343 
3344 	return result;
3345 }
3346 
3347 /*
3348  * CopyReadLineText - inner loop of CopyReadLine for text mode
3349  */
3350 static bool
CopyReadLineText(CopyState cstate)3351 CopyReadLineText(CopyState cstate)
3352 {
3353 	char	   *copy_raw_buf;
3354 	int			raw_buf_ptr;
3355 	int			copy_buf_len;
3356 	bool		need_data = false;
3357 	bool		hit_eof = false;
3358 	bool		result = false;
3359 	char		mblen_str[2];
3360 
3361 	/* CSV variables */
3362 	bool		first_char_in_line = true;
3363 	bool		in_quote = false,
3364 				last_was_esc = false;
3365 	char		quotec = '\0';
3366 	char		escapec = '\0';
3367 
3368 	if (cstate->csv_mode)
3369 	{
3370 		quotec = cstate->quote[0];
3371 		escapec = cstate->escape[0];
3372 		/* ignore special escape processing if it's the same as quotec */
3373 		if (quotec == escapec)
3374 			escapec = '\0';
3375 	}
3376 
3377 	mblen_str[1] = '\0';
3378 
3379 	/*
3380 	 * The objective of this loop is to transfer the entire next input line
3381 	 * into line_buf.  Hence, we only care for detecting newlines (\r and/or
3382 	 * \n) and the end-of-copy marker (\.).
3383 	 *
3384 	 * In CSV mode, \r and \n inside a quoted field are just part of the data
3385 	 * value and are put in line_buf.  We keep just enough state to know if we
3386 	 * are currently in a quoted field or not.
3387 	 *
3388 	 * These four characters, and the CSV escape and quote characters, are
3389 	 * assumed the same in frontend and backend encodings.
3390 	 *
3391 	 * For speed, we try to move data from raw_buf to line_buf in chunks
3392 	 * rather than one character at a time.  raw_buf_ptr points to the next
3393 	 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3394 	 * have been determined to be part of the line, but not yet transferred to
3395 	 * line_buf.
3396 	 *
3397 	 * For a little extra speed within the loop, we copy raw_buf and
3398 	 * raw_buf_len into local variables.
3399 	 */
3400 	copy_raw_buf = cstate->raw_buf;
3401 	raw_buf_ptr = cstate->raw_buf_index;
3402 	copy_buf_len = cstate->raw_buf_len;
3403 
3404 	for (;;)
3405 	{
3406 		int			prev_raw_ptr;
3407 		char		c;
3408 
3409 		/*
3410 		 * Load more data if needed.  Ideally we would just force four bytes
3411 		 * of read-ahead and avoid the many calls to
3412 		 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3413 		 * does not allow us to read too far ahead or we might read into the
3414 		 * next data, so we read-ahead only as far we know we can.  One
3415 		 * optimization would be to read-ahead four byte here if
3416 		 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3417 		 * considering the size of the buffer.
3418 		 */
3419 		if (raw_buf_ptr >= copy_buf_len || need_data)
3420 		{
3421 			REFILL_LINEBUF;
3422 
3423 			/*
3424 			 * Try to read some more data.  This will certainly reset
3425 			 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3426 			 */
3427 			if (!CopyLoadRawBuf(cstate))
3428 				hit_eof = true;
3429 			raw_buf_ptr = 0;
3430 			copy_buf_len = cstate->raw_buf_len;
3431 
3432 			/*
3433 			 * If we are completely out of data, break out of the loop,
3434 			 * reporting EOF.
3435 			 */
3436 			if (copy_buf_len <= 0)
3437 			{
3438 				result = true;
3439 				break;
3440 			}
3441 			need_data = false;
3442 		}
3443 
3444 		/* OK to fetch a character */
3445 		prev_raw_ptr = raw_buf_ptr;
3446 		c = copy_raw_buf[raw_buf_ptr++];
3447 
3448 		if (cstate->csv_mode)
3449 		{
3450 			/*
3451 			 * If character is '\\' or '\r', we may need to look ahead below.
3452 			 * Force fetch of the next character if we don't already have it.
3453 			 * We need to do this before changing CSV state, in case one of
3454 			 * these characters is also the quote or escape character.
3455 			 *
3456 			 * Note: old-protocol does not like forced prefetch, but it's OK
3457 			 * here since we cannot validly be at EOF.
3458 			 */
3459 			if (c == '\\' || c == '\r')
3460 			{
3461 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3462 			}
3463 
3464 			/*
3465 			 * Dealing with quotes and escapes here is mildly tricky. If the
3466 			 * quote char is also the escape char, there's no problem - we
3467 			 * just use the char as a toggle. If they are different, we need
3468 			 * to ensure that we only take account of an escape inside a
3469 			 * quoted field and immediately preceding a quote char, and not
3470 			 * the second in an escape-escape sequence.
3471 			 */
3472 			if (in_quote && c == escapec)
3473 				last_was_esc = !last_was_esc;
3474 			if (c == quotec && !last_was_esc)
3475 				in_quote = !in_quote;
3476 			if (c != escapec)
3477 				last_was_esc = false;
3478 
3479 			/*
3480 			 * Updating the line count for embedded CR and/or LF chars is
3481 			 * necessarily a little fragile - this test is probably about the
3482 			 * best we can do.  (XXX it's arguable whether we should do this
3483 			 * at all --- is cur_lineno a physical or logical count?)
3484 			 */
3485 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3486 				cstate->cur_lineno++;
3487 		}
3488 
3489 		/* Process \r */
3490 		if (c == '\r' && (!cstate->csv_mode || !in_quote))
3491 		{
3492 			/* Check for \r\n on first line, _and_ handle \r\n. */
3493 			if (cstate->eol_type == EOL_UNKNOWN ||
3494 				cstate->eol_type == EOL_CRNL)
3495 			{
3496 				/*
3497 				 * If need more data, go back to loop top to load it.
3498 				 *
3499 				 * Note that if we are at EOF, c will wind up as '\0' because
3500 				 * of the guaranteed pad of raw_buf.
3501 				 */
3502 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3503 
3504 				/* get next char */
3505 				c = copy_raw_buf[raw_buf_ptr];
3506 
3507 				if (c == '\n')
3508 				{
3509 					raw_buf_ptr++;		/* eat newline */
3510 					cstate->eol_type = EOL_CRNL;		/* in case not set yet */
3511 				}
3512 				else
3513 				{
3514 					/* found \r, but no \n */
3515 					if (cstate->eol_type == EOL_CRNL)
3516 						ereport(ERROR,
3517 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3518 								 !cstate->csv_mode ?
3519 							errmsg("literal carriage return found in data") :
3520 							errmsg("unquoted carriage return found in data"),
3521 								 !cstate->csv_mode ?
3522 						errhint("Use \"\\r\" to represent carriage return.") :
3523 								 errhint("Use quoted CSV field to represent carriage return.")));
3524 
3525 					/*
3526 					 * if we got here, it is the first line and we didn't find
3527 					 * \n, so don't consume the peeked character
3528 					 */
3529 					cstate->eol_type = EOL_CR;
3530 				}
3531 			}
3532 			else if (cstate->eol_type == EOL_NL)
3533 				ereport(ERROR,
3534 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3535 						 !cstate->csv_mode ?
3536 						 errmsg("literal carriage return found in data") :
3537 						 errmsg("unquoted carriage return found in data"),
3538 						 !cstate->csv_mode ?
3539 					   errhint("Use \"\\r\" to represent carriage return.") :
3540 						 errhint("Use quoted CSV field to represent carriage return.")));
3541 			/* If reach here, we have found the line terminator */
3542 			break;
3543 		}
3544 
3545 		/* Process \n */
3546 		if (c == '\n' && (!cstate->csv_mode || !in_quote))
3547 		{
3548 			if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3549 				ereport(ERROR,
3550 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3551 						 !cstate->csv_mode ?
3552 						 errmsg("literal newline found in data") :
3553 						 errmsg("unquoted newline found in data"),
3554 						 !cstate->csv_mode ?
3555 						 errhint("Use \"\\n\" to represent newline.") :
3556 					 errhint("Use quoted CSV field to represent newline.")));
3557 			cstate->eol_type = EOL_NL;	/* in case not set yet */
3558 			/* If reach here, we have found the line terminator */
3559 			break;
3560 		}
3561 
3562 		/*
3563 		 * In CSV mode, we only recognize \. alone on a line.  This is because
3564 		 * \. is a valid CSV data value.
3565 		 */
3566 		if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3567 		{
3568 			char		c2;
3569 
3570 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3571 			IF_NEED_REFILL_AND_EOF_BREAK(0);
3572 
3573 			/* -----
3574 			 * get next character
3575 			 * Note: we do not change c so if it isn't \., we can fall
3576 			 * through and continue processing for file encoding.
3577 			 * -----
3578 			 */
3579 			c2 = copy_raw_buf[raw_buf_ptr];
3580 
3581 			if (c2 == '.')
3582 			{
3583 				raw_buf_ptr++;	/* consume the '.' */
3584 
3585 				/*
3586 				 * Note: if we loop back for more data here, it does not
3587 				 * matter that the CSV state change checks are re-executed; we
3588 				 * will come back here with no important state changed.
3589 				 */
3590 				if (cstate->eol_type == EOL_CRNL)
3591 				{
3592 					/* Get the next character */
3593 					IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3594 					/* if hit_eof, c2 will become '\0' */
3595 					c2 = copy_raw_buf[raw_buf_ptr++];
3596 
3597 					if (c2 == '\n')
3598 					{
3599 						if (!cstate->csv_mode)
3600 							ereport(ERROR,
3601 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3602 									 errmsg("end-of-copy marker does not match previous newline style")));
3603 						else
3604 							NO_END_OF_COPY_GOTO;
3605 					}
3606 					else if (c2 != '\r')
3607 					{
3608 						if (!cstate->csv_mode)
3609 							ereport(ERROR,
3610 									(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3611 									 errmsg("end-of-copy marker corrupt")));
3612 						else
3613 							NO_END_OF_COPY_GOTO;
3614 					}
3615 				}
3616 
3617 				/* Get the next character */
3618 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3619 				/* if hit_eof, c2 will become '\0' */
3620 				c2 = copy_raw_buf[raw_buf_ptr++];
3621 
3622 				if (c2 != '\r' && c2 != '\n')
3623 				{
3624 					if (!cstate->csv_mode)
3625 						ereport(ERROR,
3626 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3627 								 errmsg("end-of-copy marker corrupt")));
3628 					else
3629 						NO_END_OF_COPY_GOTO;
3630 				}
3631 
3632 				if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3633 					(cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3634 					(cstate->eol_type == EOL_CR && c2 != '\r'))
3635 				{
3636 					ereport(ERROR,
3637 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3638 							 errmsg("end-of-copy marker does not match previous newline style")));
3639 				}
3640 
3641 				/*
3642 				 * Transfer only the data before the \. into line_buf, then
3643 				 * discard the data and the \. sequence.
3644 				 */
3645 				if (prev_raw_ptr > cstate->raw_buf_index)
3646 					appendBinaryStringInfo(&cstate->line_buf,
3647 									 cstate->raw_buf + cstate->raw_buf_index,
3648 									   prev_raw_ptr - cstate->raw_buf_index);
3649 				cstate->raw_buf_index = raw_buf_ptr;
3650 				result = true;	/* report EOF */
3651 				break;
3652 			}
3653 			else if (!cstate->csv_mode)
3654 			{
3655 				/*
3656 				 * If we are here, it means we found a backslash followed by
3657 				 * something other than a period.  In non-CSV mode, anything
3658 				 * after a backslash is special, so we skip over that second
3659 				 * character too.  If we didn't do that \\. would be
3660 				 * considered an eof-of copy, while in non-CSV mode it is a
3661 				 * literal backslash followed by a period.  In CSV mode,
3662 				 * backslashes are not special, so we want to process the
3663 				 * character after the backslash just like a normal character,
3664 				 * so we don't increment in those cases.
3665 				 *
3666 				 * Set 'c' to skip whole character correctly in multi-byte
3667 				 * encodings.  If we don't have the whole character in the
3668 				 * buffer yet, we might loop back to process it, after all,
3669 				 * but that's OK because multi-byte characters cannot have any
3670 				 * special meaning.
3671 				 */
3672 				raw_buf_ptr++;
3673 				c = c2;
3674 			}
3675 		}
3676 
3677 		/*
3678 		 * This label is for CSV cases where \. appears at the start of a
3679 		 * line, but there is more text after it, meaning it was a data value.
3680 		 * We are more strict for \. in CSV mode because \. could be a data
3681 		 * value, while in non-CSV mode, \. cannot be a data value.
3682 		 */
3683 not_end_of_copy:
3684 
3685 		/*
3686 		 * Process all bytes of a multi-byte character as a group.
3687 		 *
3688 		 * We only support multi-byte sequences where the first byte has the
3689 		 * high-bit set, so as an optimization we can avoid this block
3690 		 * entirely if it is not set.
3691 		 */
3692 		if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3693 		{
3694 			int			mblen;
3695 
3696 			mblen_str[0] = c;
3697 			/* All our encodings only read the first byte to get the length */
3698 			mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3699 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3700 			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3701 			raw_buf_ptr += mblen - 1;
3702 		}
3703 		first_char_in_line = false;
3704 	}							/* end of outer loop */
3705 
3706 	/*
3707 	 * Transfer any still-uncopied data to line_buf.
3708 	 */
3709 	REFILL_LINEBUF;
3710 
3711 	return result;
3712 }
3713 
3714 /*
3715  *	Return decimal value for a hexadecimal digit
3716  */
3717 static int
GetDecimalFromHex(char hex)3718 GetDecimalFromHex(char hex)
3719 {
3720 	if (isdigit((unsigned char) hex))
3721 		return hex - '0';
3722 	else
3723 		return tolower((unsigned char) hex) - 'a' + 10;
3724 }
3725 
3726 /*
3727  * Parse the current line into separate attributes (fields),
3728  * performing de-escaping as needed.
3729  *
3730  * The input is in line_buf.  We use attribute_buf to hold the result
3731  * strings.  cstate->raw_fields[k] is set to point to the k'th attribute
3732  * string, or NULL when the input matches the null marker string.
3733  * This array is expanded as necessary.
3734  *
3735  * (Note that the caller cannot check for nulls since the returned
3736  * string would be the post-de-escaping equivalent, which may look
3737  * the same as some valid data string.)
3738  *
3739  * delim is the column delimiter string (must be just one byte for now).
3740  * null_print is the null marker string.  Note that this is compared to
3741  * the pre-de-escaped input string.
3742  *
3743  * The return value is the number of fields actually read.
3744  */
3745 static int
CopyReadAttributesText(CopyState cstate)3746 CopyReadAttributesText(CopyState cstate)
3747 {
3748 	char		delimc = cstate->delim[0];
3749 	int			fieldno;
3750 	char	   *output_ptr;
3751 	char	   *cur_ptr;
3752 	char	   *line_end_ptr;
3753 
3754 	/*
3755 	 * We need a special case for zero-column tables: check that the input
3756 	 * line is empty, and return.
3757 	 */
3758 	if (cstate->max_fields <= 0)
3759 	{
3760 		if (cstate->line_buf.len != 0)
3761 			ereport(ERROR,
3762 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3763 					 errmsg("extra data after last expected column")));
3764 		return 0;
3765 	}
3766 
3767 	resetStringInfo(&cstate->attribute_buf);
3768 
3769 	/*
3770 	 * The de-escaped attributes will certainly not be longer than the input
3771 	 * data line, so we can just force attribute_buf to be large enough and
3772 	 * then transfer data without any checks for enough space.  We need to do
3773 	 * it this way because enlarging attribute_buf mid-stream would invalidate
3774 	 * pointers already stored into cstate->raw_fields[].
3775 	 */
3776 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3777 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3778 	output_ptr = cstate->attribute_buf.data;
3779 
3780 	/* set pointer variables for loop */
3781 	cur_ptr = cstate->line_buf.data;
3782 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3783 
3784 	/* Outer loop iterates over fields */
3785 	fieldno = 0;
3786 	for (;;)
3787 	{
3788 		bool		found_delim = false;
3789 		char	   *start_ptr;
3790 		char	   *end_ptr;
3791 		int			input_len;
3792 		bool		saw_non_ascii = false;
3793 
3794 		/* Make sure there is enough space for the next value */
3795 		if (fieldno >= cstate->max_fields)
3796 		{
3797 			cstate->max_fields *= 2;
3798 			cstate->raw_fields =
3799 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3800 		}
3801 
3802 		/* Remember start of field on both input and output sides */
3803 		start_ptr = cur_ptr;
3804 		cstate->raw_fields[fieldno] = output_ptr;
3805 
3806 		/*
3807 		 * Scan data for field.
3808 		 *
3809 		 * Note that in this loop, we are scanning to locate the end of field
3810 		 * and also speculatively performing de-escaping.  Once we find the
3811 		 * end-of-field, we can match the raw field contents against the null
3812 		 * marker string.  Only after that comparison fails do we know that
3813 		 * de-escaping is actually the right thing to do; therefore we *must
3814 		 * not* throw any syntax errors before we've done the null-marker
3815 		 * check.
3816 		 */
3817 		for (;;)
3818 		{
3819 			char		c;
3820 
3821 			end_ptr = cur_ptr;
3822 			if (cur_ptr >= line_end_ptr)
3823 				break;
3824 			c = *cur_ptr++;
3825 			if (c == delimc)
3826 			{
3827 				found_delim = true;
3828 				break;
3829 			}
3830 			if (c == '\\')
3831 			{
3832 				if (cur_ptr >= line_end_ptr)
3833 					break;
3834 				c = *cur_ptr++;
3835 				switch (c)
3836 				{
3837 					case '0':
3838 					case '1':
3839 					case '2':
3840 					case '3':
3841 					case '4':
3842 					case '5':
3843 					case '6':
3844 					case '7':
3845 						{
3846 							/* handle \013 */
3847 							int			val;
3848 
3849 							val = OCTVALUE(c);
3850 							if (cur_ptr < line_end_ptr)
3851 							{
3852 								c = *cur_ptr;
3853 								if (ISOCTAL(c))
3854 								{
3855 									cur_ptr++;
3856 									val = (val << 3) + OCTVALUE(c);
3857 									if (cur_ptr < line_end_ptr)
3858 									{
3859 										c = *cur_ptr;
3860 										if (ISOCTAL(c))
3861 										{
3862 											cur_ptr++;
3863 											val = (val << 3) + OCTVALUE(c);
3864 										}
3865 									}
3866 								}
3867 							}
3868 							c = val & 0377;
3869 							if (c == '\0' || IS_HIGHBIT_SET(c))
3870 								saw_non_ascii = true;
3871 						}
3872 						break;
3873 					case 'x':
3874 						/* Handle \x3F */
3875 						if (cur_ptr < line_end_ptr)
3876 						{
3877 							char		hexchar = *cur_ptr;
3878 
3879 							if (isxdigit((unsigned char) hexchar))
3880 							{
3881 								int			val = GetDecimalFromHex(hexchar);
3882 
3883 								cur_ptr++;
3884 								if (cur_ptr < line_end_ptr)
3885 								{
3886 									hexchar = *cur_ptr;
3887 									if (isxdigit((unsigned char) hexchar))
3888 									{
3889 										cur_ptr++;
3890 										val = (val << 4) + GetDecimalFromHex(hexchar);
3891 									}
3892 								}
3893 								c = val & 0xff;
3894 								if (c == '\0' || IS_HIGHBIT_SET(c))
3895 									saw_non_ascii = true;
3896 							}
3897 						}
3898 						break;
3899 					case 'b':
3900 						c = '\b';
3901 						break;
3902 					case 'f':
3903 						c = '\f';
3904 						break;
3905 					case 'n':
3906 						c = '\n';
3907 						break;
3908 					case 'r':
3909 						c = '\r';
3910 						break;
3911 					case 't':
3912 						c = '\t';
3913 						break;
3914 					case 'v':
3915 						c = '\v';
3916 						break;
3917 
3918 						/*
3919 						 * in all other cases, take the char after '\'
3920 						 * literally
3921 						 */
3922 				}
3923 			}
3924 
3925 			/* Add c to output string */
3926 			*output_ptr++ = c;
3927 		}
3928 
3929 		/* Check whether raw input matched null marker */
3930 		input_len = end_ptr - start_ptr;
3931 		if (input_len == cstate->null_print_len &&
3932 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
3933 			cstate->raw_fields[fieldno] = NULL;
3934 		else
3935 		{
3936 			/*
3937 			 * At this point we know the field is supposed to contain data.
3938 			 *
3939 			 * If we de-escaped any non-7-bit-ASCII chars, make sure the
3940 			 * resulting string is valid data for the db encoding.
3941 			 */
3942 			if (saw_non_ascii)
3943 			{
3944 				char	   *fld = cstate->raw_fields[fieldno];
3945 
3946 				pg_verifymbstr(fld, output_ptr - fld, false);
3947 			}
3948 		}
3949 
3950 		/* Terminate attribute value in output area */
3951 		*output_ptr++ = '\0';
3952 
3953 		fieldno++;
3954 		/* Done if we hit EOL instead of a delim */
3955 		if (!found_delim)
3956 			break;
3957 	}
3958 
3959 	/* Clean up state of attribute_buf */
3960 	output_ptr--;
3961 	Assert(*output_ptr == '\0');
3962 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3963 
3964 	return fieldno;
3965 }
3966 
3967 /*
3968  * Parse the current line into separate attributes (fields),
3969  * performing de-escaping as needed.  This has exactly the same API as
3970  * CopyReadAttributesText, except we parse the fields according to
3971  * "standard" (i.e. common) CSV usage.
3972  */
3973 static int
CopyReadAttributesCSV(CopyState cstate)3974 CopyReadAttributesCSV(CopyState cstate)
3975 {
3976 	char		delimc = cstate->delim[0];
3977 	char		quotec = cstate->quote[0];
3978 	char		escapec = cstate->escape[0];
3979 	int			fieldno;
3980 	char	   *output_ptr;
3981 	char	   *cur_ptr;
3982 	char	   *line_end_ptr;
3983 
3984 	/*
3985 	 * We need a special case for zero-column tables: check that the input
3986 	 * line is empty, and return.
3987 	 */
3988 	if (cstate->max_fields <= 0)
3989 	{
3990 		if (cstate->line_buf.len != 0)
3991 			ereport(ERROR,
3992 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3993 					 errmsg("extra data after last expected column")));
3994 		return 0;
3995 	}
3996 
3997 	resetStringInfo(&cstate->attribute_buf);
3998 
3999 	/*
4000 	 * The de-escaped attributes will certainly not be longer than the input
4001 	 * data line, so we can just force attribute_buf to be large enough and
4002 	 * then transfer data without any checks for enough space.  We need to do
4003 	 * it this way because enlarging attribute_buf mid-stream would invalidate
4004 	 * pointers already stored into cstate->raw_fields[].
4005 	 */
4006 	if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4007 		enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4008 	output_ptr = cstate->attribute_buf.data;
4009 
4010 	/* set pointer variables for loop */
4011 	cur_ptr = cstate->line_buf.data;
4012 	line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4013 
4014 	/* Outer loop iterates over fields */
4015 	fieldno = 0;
4016 	for (;;)
4017 	{
4018 		bool		found_delim = false;
4019 		bool		saw_quote = false;
4020 		char	   *start_ptr;
4021 		char	   *end_ptr;
4022 		int			input_len;
4023 
4024 		/* Make sure there is enough space for the next value */
4025 		if (fieldno >= cstate->max_fields)
4026 		{
4027 			cstate->max_fields *= 2;
4028 			cstate->raw_fields =
4029 				repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4030 		}
4031 
4032 		/* Remember start of field on both input and output sides */
4033 		start_ptr = cur_ptr;
4034 		cstate->raw_fields[fieldno] = output_ptr;
4035 
4036 		/*
4037 		 * Scan data for field,
4038 		 *
4039 		 * The loop starts in "not quote" mode and then toggles between that
4040 		 * and "in quote" mode. The loop exits normally if it is in "not
4041 		 * quote" mode and a delimiter or line end is seen.
4042 		 */
4043 		for (;;)
4044 		{
4045 			char		c;
4046 
4047 			/* Not in quote */
4048 			for (;;)
4049 			{
4050 				end_ptr = cur_ptr;
4051 				if (cur_ptr >= line_end_ptr)
4052 					goto endfield;
4053 				c = *cur_ptr++;
4054 				/* unquoted field delimiter */
4055 				if (c == delimc)
4056 				{
4057 					found_delim = true;
4058 					goto endfield;
4059 				}
4060 				/* start of quoted field (or part of field) */
4061 				if (c == quotec)
4062 				{
4063 					saw_quote = true;
4064 					break;
4065 				}
4066 				/* Add c to output string */
4067 				*output_ptr++ = c;
4068 			}
4069 
4070 			/* In quote */
4071 			for (;;)
4072 			{
4073 				end_ptr = cur_ptr;
4074 				if (cur_ptr >= line_end_ptr)
4075 					ereport(ERROR,
4076 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4077 							 errmsg("unterminated CSV quoted field")));
4078 
4079 				c = *cur_ptr++;
4080 
4081 				/* escape within a quoted field */
4082 				if (c == escapec)
4083 				{
4084 					/*
4085 					 * peek at the next char if available, and escape it if it
4086 					 * is an escape char or a quote char
4087 					 */
4088 					if (cur_ptr < line_end_ptr)
4089 					{
4090 						char		nextc = *cur_ptr;
4091 
4092 						if (nextc == escapec || nextc == quotec)
4093 						{
4094 							*output_ptr++ = nextc;
4095 							cur_ptr++;
4096 							continue;
4097 						}
4098 					}
4099 				}
4100 
4101 				/*
4102 				 * end of quoted field. Must do this test after testing for
4103 				 * escape in case quote char and escape char are the same
4104 				 * (which is the common case).
4105 				 */
4106 				if (c == quotec)
4107 					break;
4108 
4109 				/* Add c to output string */
4110 				*output_ptr++ = c;
4111 			}
4112 		}
4113 endfield:
4114 
4115 		/* Terminate attribute value in output area */
4116 		*output_ptr++ = '\0';
4117 
4118 		/* Check whether raw input matched null marker */
4119 		input_len = end_ptr - start_ptr;
4120 		if (!saw_quote && input_len == cstate->null_print_len &&
4121 			strncmp(start_ptr, cstate->null_print, input_len) == 0)
4122 			cstate->raw_fields[fieldno] = NULL;
4123 
4124 		fieldno++;
4125 		/* Done if we hit EOL instead of a delim */
4126 		if (!found_delim)
4127 			break;
4128 	}
4129 
4130 	/* Clean up state of attribute_buf */
4131 	output_ptr--;
4132 	Assert(*output_ptr == '\0');
4133 	cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4134 
4135 	return fieldno;
4136 }
4137 
4138 
4139 /*
4140  * Read a binary attribute
4141  */
4142 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4143 CopyReadBinaryAttribute(CopyState cstate,
4144 						int column_no, FmgrInfo *flinfo,
4145 						Oid typioparam, int32 typmod,
4146 						bool *isnull)
4147 {
4148 	int32		fld_size;
4149 	Datum		result;
4150 
4151 	if (!CopyGetInt32(cstate, &fld_size))
4152 		ereport(ERROR,
4153 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4154 				 errmsg("unexpected EOF in COPY data")));
4155 	if (fld_size == -1)
4156 	{
4157 		*isnull = true;
4158 		return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4159 	}
4160 	if (fld_size < 0)
4161 		ereport(ERROR,
4162 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4163 				 errmsg("invalid field size")));
4164 
4165 	/* reset attribute_buf to empty, and load raw data in it */
4166 	resetStringInfo(&cstate->attribute_buf);
4167 
4168 	enlargeStringInfo(&cstate->attribute_buf, fld_size);
4169 	if (CopyGetData(cstate, cstate->attribute_buf.data,
4170 					fld_size, fld_size) != fld_size)
4171 		ereport(ERROR,
4172 				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4173 				 errmsg("unexpected EOF in COPY data")));
4174 
4175 	cstate->attribute_buf.len = fld_size;
4176 	cstate->attribute_buf.data[fld_size] = '\0';
4177 
4178 	/* Call the column type's binary input converter */
4179 	result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4180 								 typioparam, typmod);
4181 
4182 	/* Trouble if it didn't eat the whole buffer */
4183 	if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4184 		ereport(ERROR,
4185 				(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4186 				 errmsg("incorrect binary data format")));
4187 
4188 	*isnull = false;
4189 	return result;
4190 }
4191 
4192 /*
4193  * Send text representation of one attribute, with conversion and escaping
4194  */
4195 #define DUMPSOFAR() \
4196 	do { \
4197 		if (ptr > start) \
4198 			CopySendData(cstate, start, ptr - start); \
4199 	} while (0)
4200 
4201 static void
CopyAttributeOutText(CopyState cstate,char * string)4202 CopyAttributeOutText(CopyState cstate, char *string)
4203 {
4204 	char	   *ptr;
4205 	char	   *start;
4206 	char		c;
4207 	char		delimc = cstate->delim[0];
4208 
4209 	if (cstate->need_transcoding)
4210 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4211 	else
4212 		ptr = string;
4213 
4214 	/*
4215 	 * We have to grovel through the string searching for control characters
4216 	 * and instances of the delimiter character.  In most cases, though, these
4217 	 * are infrequent.  To avoid overhead from calling CopySendData once per
4218 	 * character, we dump out all characters between escaped characters in a
4219 	 * single call.  The loop invariant is that the data from "start" to "ptr"
4220 	 * can be sent literally, but hasn't yet been.
4221 	 *
4222 	 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4223 	 * in valid backend encodings, extra bytes of a multibyte character never
4224 	 * look like ASCII.  This loop is sufficiently performance-critical that
4225 	 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4226 	 * of the normal safe-encoding path.
4227 	 */
4228 	if (cstate->encoding_embeds_ascii)
4229 	{
4230 		start = ptr;
4231 		while ((c = *ptr) != '\0')
4232 		{
4233 			if ((unsigned char) c < (unsigned char) 0x20)
4234 			{
4235 				/*
4236 				 * \r and \n must be escaped, the others are traditional. We
4237 				 * prefer to dump these using the C-like notation, rather than
4238 				 * a backslash and the literal character, because it makes the
4239 				 * dump file a bit more proof against Microsoftish data
4240 				 * mangling.
4241 				 */
4242 				switch (c)
4243 				{
4244 					case '\b':
4245 						c = 'b';
4246 						break;
4247 					case '\f':
4248 						c = 'f';
4249 						break;
4250 					case '\n':
4251 						c = 'n';
4252 						break;
4253 					case '\r':
4254 						c = 'r';
4255 						break;
4256 					case '\t':
4257 						c = 't';
4258 						break;
4259 					case '\v':
4260 						c = 'v';
4261 						break;
4262 					default:
4263 						/* If it's the delimiter, must backslash it */
4264 						if (c == delimc)
4265 							break;
4266 						/* All ASCII control chars are length 1 */
4267 						ptr++;
4268 						continue;		/* fall to end of loop */
4269 				}
4270 				/* if we get here, we need to convert the control char */
4271 				DUMPSOFAR();
4272 				CopySendChar(cstate, '\\');
4273 				CopySendChar(cstate, c);
4274 				start = ++ptr;	/* do not include char in next run */
4275 			}
4276 			else if (c == '\\' || c == delimc)
4277 			{
4278 				DUMPSOFAR();
4279 				CopySendChar(cstate, '\\');
4280 				start = ptr++;	/* we include char in next run */
4281 			}
4282 			else if (IS_HIGHBIT_SET(c))
4283 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4284 			else
4285 				ptr++;
4286 		}
4287 	}
4288 	else
4289 	{
4290 		start = ptr;
4291 		while ((c = *ptr) != '\0')
4292 		{
4293 			if ((unsigned char) c < (unsigned char) 0x20)
4294 			{
4295 				/*
4296 				 * \r and \n must be escaped, the others are traditional. We
4297 				 * prefer to dump these using the C-like notation, rather than
4298 				 * a backslash and the literal character, because it makes the
4299 				 * dump file a bit more proof against Microsoftish data
4300 				 * mangling.
4301 				 */
4302 				switch (c)
4303 				{
4304 					case '\b':
4305 						c = 'b';
4306 						break;
4307 					case '\f':
4308 						c = 'f';
4309 						break;
4310 					case '\n':
4311 						c = 'n';
4312 						break;
4313 					case '\r':
4314 						c = 'r';
4315 						break;
4316 					case '\t':
4317 						c = 't';
4318 						break;
4319 					case '\v':
4320 						c = 'v';
4321 						break;
4322 					default:
4323 						/* If it's the delimiter, must backslash it */
4324 						if (c == delimc)
4325 							break;
4326 						/* All ASCII control chars are length 1 */
4327 						ptr++;
4328 						continue;		/* fall to end of loop */
4329 				}
4330 				/* if we get here, we need to convert the control char */
4331 				DUMPSOFAR();
4332 				CopySendChar(cstate, '\\');
4333 				CopySendChar(cstate, c);
4334 				start = ++ptr;	/* do not include char in next run */
4335 			}
4336 			else if (c == '\\' || c == delimc)
4337 			{
4338 				DUMPSOFAR();
4339 				CopySendChar(cstate, '\\');
4340 				start = ptr++;	/* we include char in next run */
4341 			}
4342 			else
4343 				ptr++;
4344 		}
4345 	}
4346 
4347 	DUMPSOFAR();
4348 }
4349 
4350 /*
4351  * Send text representation of one attribute, with conversion and
4352  * CSV-style escaping
4353  */
4354 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4355 CopyAttributeOutCSV(CopyState cstate, char *string,
4356 					bool use_quote, bool single_attr)
4357 {
4358 	char	   *ptr;
4359 	char	   *start;
4360 	char		c;
4361 	char		delimc = cstate->delim[0];
4362 	char		quotec = cstate->quote[0];
4363 	char		escapec = cstate->escape[0];
4364 
4365 	/* force quoting if it matches null_print (before conversion!) */
4366 	if (!use_quote && strcmp(string, cstate->null_print) == 0)
4367 		use_quote = true;
4368 
4369 	if (cstate->need_transcoding)
4370 		ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4371 	else
4372 		ptr = string;
4373 
4374 	/*
4375 	 * Make a preliminary pass to discover if it needs quoting
4376 	 */
4377 	if (!use_quote)
4378 	{
4379 		/*
4380 		 * Because '\.' can be a data value, quote it if it appears alone on a
4381 		 * line so it is not interpreted as the end-of-data marker.
4382 		 */
4383 		if (single_attr && strcmp(ptr, "\\.") == 0)
4384 			use_quote = true;
4385 		else
4386 		{
4387 			char	   *tptr = ptr;
4388 
4389 			while ((c = *tptr) != '\0')
4390 			{
4391 				if (c == delimc || c == quotec || c == '\n' || c == '\r')
4392 				{
4393 					use_quote = true;
4394 					break;
4395 				}
4396 				if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4397 					tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4398 				else
4399 					tptr++;
4400 			}
4401 		}
4402 	}
4403 
4404 	if (use_quote)
4405 	{
4406 		CopySendChar(cstate, quotec);
4407 
4408 		/*
4409 		 * We adopt the same optimization strategy as in CopyAttributeOutText
4410 		 */
4411 		start = ptr;
4412 		while ((c = *ptr) != '\0')
4413 		{
4414 			if (c == quotec || c == escapec)
4415 			{
4416 				DUMPSOFAR();
4417 				CopySendChar(cstate, escapec);
4418 				start = ptr;	/* we include char in next run */
4419 			}
4420 			if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4421 				ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4422 			else
4423 				ptr++;
4424 		}
4425 		DUMPSOFAR();
4426 
4427 		CopySendChar(cstate, quotec);
4428 	}
4429 	else
4430 	{
4431 		/* If it doesn't need quoting, we can just dump it as-is */
4432 		CopySendString(cstate, ptr);
4433 	}
4434 }
4435 
4436 /*
4437  * CopyGetAttnums - build an integer list of attnums to be copied
4438  *
4439  * The input attnamelist is either the user-specified column list,
4440  * or NIL if there was none (in which case we want all the non-dropped
4441  * columns).
4442  *
4443  * rel can be NULL ... it's only used for error reports.
4444  */
4445 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4446 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4447 {
4448 	List	   *attnums = NIL;
4449 
4450 	if (attnamelist == NIL)
4451 	{
4452 		/* Generate default column list */
4453 		Form_pg_attribute *attr = tupDesc->attrs;
4454 		int			attr_count = tupDesc->natts;
4455 		int			i;
4456 
4457 		for (i = 0; i < attr_count; i++)
4458 		{
4459 			if (attr[i]->attisdropped)
4460 				continue;
4461 			attnums = lappend_int(attnums, i + 1);
4462 		}
4463 	}
4464 	else
4465 	{
4466 		/* Validate the user-supplied list and extract attnums */
4467 		ListCell   *l;
4468 
4469 		foreach(l, attnamelist)
4470 		{
4471 			char	   *name = strVal(lfirst(l));
4472 			int			attnum;
4473 			int			i;
4474 
4475 			/* Lookup column name */
4476 			attnum = InvalidAttrNumber;
4477 			for (i = 0; i < tupDesc->natts; i++)
4478 			{
4479 				if (tupDesc->attrs[i]->attisdropped)
4480 					continue;
4481 				if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4482 				{
4483 					attnum = tupDesc->attrs[i]->attnum;
4484 					break;
4485 				}
4486 			}
4487 			if (attnum == InvalidAttrNumber)
4488 			{
4489 				if (rel != NULL)
4490 					ereport(ERROR,
4491 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4492 					errmsg("column \"%s\" of relation \"%s\" does not exist",
4493 						   name, RelationGetRelationName(rel))));
4494 				else
4495 					ereport(ERROR,
4496 							(errcode(ERRCODE_UNDEFINED_COLUMN),
4497 							 errmsg("column \"%s\" does not exist",
4498 									name)));
4499 			}
4500 			/* Check for duplicates */
4501 			if (list_member_int(attnums, attnum))
4502 				ereport(ERROR,
4503 						(errcode(ERRCODE_DUPLICATE_COLUMN),
4504 						 errmsg("column \"%s\" specified more than once",
4505 								name)));
4506 			attnums = lappend_int(attnums, attnum);
4507 		}
4508 	}
4509 
4510 	return attnums;
4511 }
4512 
4513 
4514 /*
4515  * copy_dest_startup --- executor startup
4516  */
4517 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4518 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4519 {
4520 	/* no-op */
4521 }
4522 
4523 /*
4524  * copy_dest_receive --- receive one tuple
4525  */
4526 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4527 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4528 {
4529 	DR_copy    *myState = (DR_copy *) self;
4530 	CopyState	cstate = myState->cstate;
4531 
4532 	/* Make sure the tuple is fully deconstructed */
4533 	slot_getallattrs(slot);
4534 
4535 	/* And send the data */
4536 	CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4537 	myState->processed++;
4538 
4539 	return true;
4540 }
4541 
4542 /*
4543  * copy_dest_shutdown --- executor end
4544  */
4545 static void
copy_dest_shutdown(DestReceiver * self)4546 copy_dest_shutdown(DestReceiver *self)
4547 {
4548 	/* no-op */
4549 }
4550 
4551 /*
4552  * copy_dest_destroy --- release DestReceiver object
4553  */
4554 static void
copy_dest_destroy(DestReceiver * self)4555 copy_dest_destroy(DestReceiver *self)
4556 {
4557 	pfree(self);
4558 }
4559 
4560 /*
4561  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4562  */
4563 DestReceiver *
CreateCopyDestReceiver(void)4564 CreateCopyDestReceiver(void)
4565 {
4566 	DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));
4567 
4568 	self->pub.receiveSlot = copy_dest_receive;
4569 	self->pub.rStartup = copy_dest_startup;
4570 	self->pub.rShutdown = copy_dest_shutdown;
4571 	self->pub.rDestroy = copy_dest_destroy;
4572 	self->pub.mydest = DestCopyOut;
4573 
4574 	self->cstate = NULL;		/* will be set later */
4575 	self->processed = 0;
4576 
4577 	return (DestReceiver *) self;
4578 }
4579