1 /*-------------------------------------------------------------------------
2 *
3 * copy.c
4 * Implements the COPY utility command
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/copy.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "parser/parse_relation.h"
41 #include "rewrite/rewriteHandler.h"
42 #include "storage/fd.h"
43 #include "tcop/tcopprot.h"
44 #include "utils/builtins.h"
45 #include "utils/lsyscache.h"
46 #include "utils/memutils.h"
47 #include "utils/portal.h"
48 #include "utils/rel.h"
49 #include "utils/rls.h"
50 #include "utils/snapmgr.h"
51
52
53 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
54 #define OCTVALUE(c) ((c) - '0')
55
56 /*
57 * Represents the different source/dest cases we need to worry about at
58 * the bottom level
59 */
60 typedef enum CopyDest
61 {
62 COPY_FILE, /* to/from file (or a piped program) */
63 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
64 COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
65 COPY_CALLBACK /* to/from callback function */
66 } CopyDest;
67
68 /*
69 * Represents the end-of-line terminator type of the input
70 */
71 typedef enum EolType
72 {
73 EOL_UNKNOWN,
74 EOL_NL,
75 EOL_CR,
76 EOL_CRNL
77 } EolType;
78
79 /*
80 * This struct contains all the state variables used throughout a COPY
81 * operation. For simplicity, we use the same struct for all variants of COPY,
82 * even though some fields are used in only some cases.
83 *
84 * Multi-byte encodings: all supported client-side encodings encode multi-byte
85 * characters by having the first byte's high bit set. Subsequent bytes of the
86 * character can have the high bit not set. When scanning data in such an
87 * encoding to look for a match to a single-byte (ie ASCII) character, we must
88 * use the full pg_encoding_mblen() machinery to skip over multibyte
89 * characters, else we might find a false match to a trailing byte. In
90 * supported server encodings, there is no possibility of a false match, and
91 * it's faster to make useless comparisons to trailing bytes than it is to
92 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
93 * when we have to do it the hard way.
94 */
95 typedef struct CopyStateData
96 {
97 /* low-level state data */
98 CopyDest copy_dest; /* type of copy source/destination */
99 FILE *copy_file; /* used if copy_dest == COPY_FILE */
100 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
101 * dest == COPY_NEW_FE in COPY FROM */
102 bool is_copy_from; /* COPY TO, or COPY FROM? */
103 bool reached_eof; /* true if we read to end of copy data (not
104 * all copy_dest types maintain this) */
105 EolType eol_type; /* EOL type of input */
106 int file_encoding; /* file or remote side's character encoding */
107 bool need_transcoding; /* file encoding diff from server? */
108 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
109
110 /* parameters from the COPY command */
111 Relation rel; /* relation to copy to or from */
112 QueryDesc *queryDesc; /* executable query to copy from */
113 List *attnumlist; /* integer list of attnums to copy */
114 char *filename; /* filename, or NULL for STDIN/STDOUT */
115 bool is_program; /* is 'filename' a program to popen? */
116 copy_data_source_cb data_source_cb; /* function for reading data */
117 bool binary; /* binary format? */
118 bool oids; /* include OIDs? */
119 bool freeze; /* freeze rows on loading? */
120 bool csv_mode; /* Comma Separated Value format? */
121 bool header_line; /* CSV header line? */
122 char *null_print; /* NULL marker string (server encoding!) */
123 int null_print_len; /* length of same */
124 char *null_print_client; /* same converted to file encoding */
125 char *delim; /* column delimiter (must be 1 byte) */
126 char *quote; /* CSV quote char (must be 1 byte) */
127 char *escape; /* CSV escape char (must be 1 byte) */
128 List *force_quote; /* list of column names */
129 bool force_quote_all; /* FORCE_QUOTE *? */
130 bool *force_quote_flags; /* per-column CSV FQ flags */
131 List *force_notnull; /* list of column names */
132 bool *force_notnull_flags; /* per-column CSV FNN flags */
133 List *force_null; /* list of column names */
134 bool *force_null_flags; /* per-column CSV FN flags */
135 bool convert_selectively; /* do selective binary conversion? */
136 List *convert_select; /* list of column names (can be NIL) */
137 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
138
139 /* these are just for error messages, see CopyFromErrorCallback */
140 const char *cur_relname; /* table name for error messages */
141 uint64 cur_lineno; /* line number for error messages */
142 const char *cur_attname; /* current att for error messages */
143 const char *cur_attval; /* current att value for error messages */
144
145 /*
146 * Working state for COPY TO/FROM
147 */
148 MemoryContext copycontext; /* per-copy execution context */
149
150 /*
151 * Working state for COPY TO
152 */
153 FmgrInfo *out_functions; /* lookup info for output functions */
154 MemoryContext rowcontext; /* per-row evaluation context */
155
156 /*
157 * Working state for COPY FROM
158 */
159 AttrNumber num_defaults;
160 bool file_has_oids;
161 FmgrInfo oid_in_function;
162 Oid oid_typioparam;
163 FmgrInfo *in_functions; /* array of input functions for each attrs */
164 Oid *typioparams; /* array of element types for in_functions */
165 int *defmap; /* array of default att numbers */
166 ExprState **defexprs; /* array of default att expressions */
167 bool volatile_defexprs; /* is any of defexprs volatile? */
168 List *range_table;
169
170 PartitionDispatch *partition_dispatch_info;
171 int num_dispatch; /* Number of entries in the above array */
172 int num_partitions; /* Number of members in the following arrays */
173 ResultRelInfo *partitions; /* Per partition result relation */
174 TupleConversionMap **partition_tupconv_maps;
175 TupleTableSlot *partition_tuple_slot;
176 TransitionCaptureState *transition_capture;
177 TupleConversionMap **transition_tupconv_maps;
178
179 /*
180 * These variables are used to reduce overhead in textual COPY FROM.
181 *
182 * attribute_buf holds the separated, de-escaped text for each field of
183 * the current line. The CopyReadAttributes functions return arrays of
184 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
185 * the buffer on each cycle.
186 */
187 StringInfoData attribute_buf;
188
189 /* field raw data pointers found by COPY FROM */
190
191 int max_fields;
192 char **raw_fields;
193
194 /*
195 * Similarly, line_buf holds the whole input line being processed. The
196 * input cycle is first to read the whole line into line_buf, convert it
197 * to server encoding there, and then extract the individual attribute
198 * fields into attribute_buf. line_buf is preserved unmodified so that we
199 * can display it in error messages if appropriate.
200 */
201 StringInfoData line_buf;
202 bool line_buf_converted; /* converted to server encoding? */
203 bool line_buf_valid; /* contains the row being processed? */
204
205 /*
206 * Finally, raw_buf holds raw data read from the data source (file or
207 * client connection). CopyReadLine parses this data sufficiently to
208 * locate line boundaries, then transfers the data to line_buf and
209 * converts it. Note: we guarantee that there is a \0 at
210 * raw_buf[raw_buf_len].
211 */
212 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
213 char *raw_buf;
214 int raw_buf_index; /* next byte to process */
215 int raw_buf_len; /* total # of bytes stored */
216 } CopyStateData;
217
218 /* DestReceiver for COPY (query) TO */
219 typedef struct
220 {
221 DestReceiver pub; /* publicly-known function pointers */
222 CopyState cstate; /* CopyStateData for the command */
223 uint64 processed; /* # of tuples processed */
224 } DR_copy;
225
226
227 /*
228 * These macros centralize code used to process line_buf and raw_buf buffers.
229 * They are macros because they often do continue/break control and to avoid
230 * function call overhead in tight COPY loops.
231 *
232 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
233 * prevent the continue/break processing from working. We end the "if (1)"
234 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
235 * any "else" in the calling code, and to avoid any compiler warnings about
236 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
237 */
238
239 /*
240 * This keeps the character read at the top of the loop in the buffer
241 * even if there is more than one read-ahead.
242 */
243 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
244 if (1) \
245 { \
246 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
247 { \
248 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
249 need_data = true; \
250 continue; \
251 } \
252 } else ((void) 0)
253
254 /* This consumes the remainder of the buffer and breaks */
255 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
256 if (1) \
257 { \
258 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
259 { \
260 if (extralen) \
261 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
262 /* backslash just before EOF, treat as data char */ \
263 result = true; \
264 break; \
265 } \
266 } else ((void) 0)
267
268 /*
269 * Transfer any approved data to line_buf; must do this to be sure
270 * there is some room in raw_buf.
271 */
272 #define REFILL_LINEBUF \
273 if (1) \
274 { \
275 if (raw_buf_ptr > cstate->raw_buf_index) \
276 { \
277 appendBinaryStringInfo(&cstate->line_buf, \
278 cstate->raw_buf + cstate->raw_buf_index, \
279 raw_buf_ptr - cstate->raw_buf_index); \
280 cstate->raw_buf_index = raw_buf_ptr; \
281 } \
282 } else ((void) 0)
283
284 /* Undo any read-ahead and jump out of the block. */
285 #define NO_END_OF_COPY_GOTO \
286 if (1) \
287 { \
288 raw_buf_ptr = prev_raw_ptr + 1; \
289 goto not_end_of_copy; \
290 } else ((void) 0)
291
292 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
293
294
295 /* non-export function prototypes */
296 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
297 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
298 List *options);
299 static void EndCopy(CopyState cstate);
300 static void ClosePipeToProgram(CopyState cstate);
301 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
302 Oid queryRelId, const char *filename, bool is_program,
303 List *attnamelist, List *options);
304 static void EndCopyTo(CopyState cstate);
305 static uint64 DoCopyTo(CopyState cstate);
306 static uint64 CopyTo(CopyState cstate);
307 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
308 Datum *values, bool *nulls);
309 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
310 CommandId mycid, int hi_options,
311 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
312 BulkInsertState bistate,
313 int nBufferedTuples, HeapTuple *bufferedTuples,
314 uint64 firstBufferedLineNo);
315 static bool CopyReadLine(CopyState cstate);
316 static bool CopyReadLineText(CopyState cstate);
317 static int CopyReadAttributesText(CopyState cstate);
318 static int CopyReadAttributesCSV(CopyState cstate);
319 static Datum CopyReadBinaryAttribute(CopyState cstate,
320 int column_no, FmgrInfo *flinfo,
321 Oid typioparam, int32 typmod,
322 bool *isnull);
323 static void CopyAttributeOutText(CopyState cstate, char *string);
324 static void CopyAttributeOutCSV(CopyState cstate, char *string,
325 bool use_quote, bool single_attr);
326 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
327 List *attnamelist);
328 static char *limit_printout_length(const char *str);
329
330 /* Low-level communications functions */
331 static void SendCopyBegin(CopyState cstate);
332 static void ReceiveCopyBegin(CopyState cstate);
333 static void SendCopyEnd(CopyState cstate);
334 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
335 static void CopySendString(CopyState cstate, const char *str);
336 static void CopySendChar(CopyState cstate, char c);
337 static void CopySendEndOfRow(CopyState cstate);
338 static int CopyGetData(CopyState cstate, void *databuf,
339 int minread, int maxread);
340 static void CopySendInt32(CopyState cstate, int32 val);
341 static bool CopyGetInt32(CopyState cstate, int32 *val);
342 static void CopySendInt16(CopyState cstate, int16 val);
343 static bool CopyGetInt16(CopyState cstate, int16 *val);
344
345
346 /*
347 * Send copy start/stop messages for frontend copies. These have changed
348 * in past protocol redesigns.
349 */
350 static void
SendCopyBegin(CopyState cstate)351 SendCopyBegin(CopyState cstate)
352 {
353 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
354 {
355 /* new way */
356 StringInfoData buf;
357 int natts = list_length(cstate->attnumlist);
358 int16 format = (cstate->binary ? 1 : 0);
359 int i;
360
361 pq_beginmessage(&buf, 'H');
362 pq_sendbyte(&buf, format); /* overall format */
363 pq_sendint(&buf, natts, 2);
364 for (i = 0; i < natts; i++)
365 pq_sendint(&buf, format, 2); /* per-column formats */
366 pq_endmessage(&buf);
367 cstate->copy_dest = COPY_NEW_FE;
368 }
369 else
370 {
371 /* old way */
372 if (cstate->binary)
373 ereport(ERROR,
374 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
375 errmsg("COPY BINARY is not supported to stdout or from stdin")));
376 pq_putemptymessage('H');
377 /* grottiness needed for old COPY OUT protocol */
378 pq_startcopyout();
379 cstate->copy_dest = COPY_OLD_FE;
380 }
381 }
382
383 static void
ReceiveCopyBegin(CopyState cstate)384 ReceiveCopyBegin(CopyState cstate)
385 {
386 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
387 {
388 /* new way */
389 StringInfoData buf;
390 int natts = list_length(cstate->attnumlist);
391 int16 format = (cstate->binary ? 1 : 0);
392 int i;
393
394 pq_beginmessage(&buf, 'G');
395 pq_sendbyte(&buf, format); /* overall format */
396 pq_sendint(&buf, natts, 2);
397 for (i = 0; i < natts; i++)
398 pq_sendint(&buf, format, 2); /* per-column formats */
399 pq_endmessage(&buf);
400 cstate->copy_dest = COPY_NEW_FE;
401 cstate->fe_msgbuf = makeStringInfo();
402 }
403 else
404 {
405 /* old way */
406 if (cstate->binary)
407 ereport(ERROR,
408 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
409 errmsg("COPY BINARY is not supported to stdout or from stdin")));
410 pq_putemptymessage('G');
411 /* any error in old protocol will make us lose sync */
412 pq_startmsgread();
413 cstate->copy_dest = COPY_OLD_FE;
414 }
415 /* We *must* flush here to ensure FE knows it can send. */
416 pq_flush();
417 }
418
419 static void
SendCopyEnd(CopyState cstate)420 SendCopyEnd(CopyState cstate)
421 {
422 if (cstate->copy_dest == COPY_NEW_FE)
423 {
424 /* Shouldn't have any unsent data */
425 Assert(cstate->fe_msgbuf->len == 0);
426 /* Send Copy Done message */
427 pq_putemptymessage('c');
428 }
429 else
430 {
431 CopySendData(cstate, "\\.", 2);
432 /* Need to flush out the trailer (this also appends a newline) */
433 CopySendEndOfRow(cstate);
434 pq_endcopyout(false);
435 }
436 }
437
438 /*----------
439 * CopySendData sends output data to the destination (file or frontend)
440 * CopySendString does the same for null-terminated strings
441 * CopySendChar does the same for single characters
442 * CopySendEndOfRow does the appropriate thing at end of each data row
443 * (data is not actually flushed except by CopySendEndOfRow)
444 *
445 * NB: no data conversion is applied by these functions
446 *----------
447 */
448 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)449 CopySendData(CopyState cstate, const void *databuf, int datasize)
450 {
451 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
452 }
453
454 static void
CopySendString(CopyState cstate,const char * str)455 CopySendString(CopyState cstate, const char *str)
456 {
457 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
458 }
459
460 static void
CopySendChar(CopyState cstate,char c)461 CopySendChar(CopyState cstate, char c)
462 {
463 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
464 }
465
466 static void
CopySendEndOfRow(CopyState cstate)467 CopySendEndOfRow(CopyState cstate)
468 {
469 StringInfo fe_msgbuf = cstate->fe_msgbuf;
470
471 switch (cstate->copy_dest)
472 {
473 case COPY_FILE:
474 if (!cstate->binary)
475 {
476 /* Default line termination depends on platform */
477 #ifndef WIN32
478 CopySendChar(cstate, '\n');
479 #else
480 CopySendString(cstate, "\r\n");
481 #endif
482 }
483
484 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
485 cstate->copy_file) != 1 ||
486 ferror(cstate->copy_file))
487 {
488 if (cstate->is_program)
489 {
490 if (errno == EPIPE)
491 {
492 /*
493 * The pipe will be closed automatically on error at
494 * the end of transaction, but we might get a better
495 * error message from the subprocess' exit code than
496 * just "Broken Pipe"
497 */
498 ClosePipeToProgram(cstate);
499
500 /*
501 * If ClosePipeToProgram() didn't throw an error, the
502 * program terminated normally, but closed the pipe
503 * first. Restore errno, and throw an error.
504 */
505 errno = EPIPE;
506 }
507 ereport(ERROR,
508 (errcode_for_file_access(),
509 errmsg("could not write to COPY program: %m")));
510 }
511 else
512 ereport(ERROR,
513 (errcode_for_file_access(),
514 errmsg("could not write to COPY file: %m")));
515 }
516 break;
517 case COPY_OLD_FE:
518 /* The FE/BE protocol uses \n as newline for all platforms */
519 if (!cstate->binary)
520 CopySendChar(cstate, '\n');
521
522 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
523 {
524 /* no hope of recovering connection sync, so FATAL */
525 ereport(FATAL,
526 (errcode(ERRCODE_CONNECTION_FAILURE),
527 errmsg("connection lost during COPY to stdout")));
528 }
529 break;
530 case COPY_NEW_FE:
531 /* The FE/BE protocol uses \n as newline for all platforms */
532 if (!cstate->binary)
533 CopySendChar(cstate, '\n');
534
535 /* Dump the accumulated row as one CopyData message */
536 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
537 break;
538 case COPY_CALLBACK:
539 Assert(false); /* Not yet supported. */
540 break;
541 }
542
543 resetStringInfo(fe_msgbuf);
544 }
545
546 /*
547 * CopyGetData reads data from the source (file or frontend)
548 *
549 * We attempt to read at least minread, and at most maxread, bytes from
550 * the source. The actual number of bytes read is returned; if this is
551 * less than minread, EOF was detected.
552 *
553 * Note: when copying from the frontend, we expect a proper EOF mark per
554 * protocol; if the frontend simply drops the connection, we raise error.
555 * It seems unwise to allow the COPY IN to complete normally in that case.
556 *
557 * NB: no data conversion is applied here.
558 */
559 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)560 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
561 {
562 int bytesread = 0;
563
564 switch (cstate->copy_dest)
565 {
566 case COPY_FILE:
567 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
568 if (ferror(cstate->copy_file))
569 ereport(ERROR,
570 (errcode_for_file_access(),
571 errmsg("could not read from COPY file: %m")));
572 if (bytesread == 0)
573 cstate->reached_eof = true;
574 break;
575 case COPY_OLD_FE:
576
577 /*
578 * We cannot read more than minread bytes (which in practice is 1)
579 * because old protocol doesn't have any clear way of separating
580 * the COPY stream from following data. This is slow, but not any
581 * slower than the code path was originally, and we don't care
582 * much anymore about the performance of old protocol.
583 */
584 if (pq_getbytes((char *) databuf, minread))
585 {
586 /* Only a \. terminator is legal EOF in old protocol */
587 ereport(ERROR,
588 (errcode(ERRCODE_CONNECTION_FAILURE),
589 errmsg("unexpected EOF on client connection with an open transaction")));
590 }
591 bytesread = minread;
592 break;
593 case COPY_NEW_FE:
594 while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
595 {
596 int avail;
597
598 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
599 {
600 /* Try to receive another message */
601 int mtype;
602
603 readmessage:
604 HOLD_CANCEL_INTERRUPTS();
605 pq_startmsgread();
606 mtype = pq_getbyte();
607 if (mtype == EOF)
608 ereport(ERROR,
609 (errcode(ERRCODE_CONNECTION_FAILURE),
610 errmsg("unexpected EOF on client connection with an open transaction")));
611 if (pq_getmessage(cstate->fe_msgbuf, 0))
612 ereport(ERROR,
613 (errcode(ERRCODE_CONNECTION_FAILURE),
614 errmsg("unexpected EOF on client connection with an open transaction")));
615 RESUME_CANCEL_INTERRUPTS();
616 switch (mtype)
617 {
618 case 'd': /* CopyData */
619 break;
620 case 'c': /* CopyDone */
621 /* COPY IN correctly terminated by frontend */
622 cstate->reached_eof = true;
623 return bytesread;
624 case 'f': /* CopyFail */
625 ereport(ERROR,
626 (errcode(ERRCODE_QUERY_CANCELED),
627 errmsg("COPY from stdin failed: %s",
628 pq_getmsgstring(cstate->fe_msgbuf))));
629 break;
630 case 'H': /* Flush */
631 case 'S': /* Sync */
632
633 /*
634 * Ignore Flush/Sync for the convenience of client
635 * libraries (such as libpq) that may send those
636 * without noticing that the command they just
637 * sent was COPY.
638 */
639 goto readmessage;
640 default:
641 ereport(ERROR,
642 (errcode(ERRCODE_PROTOCOL_VIOLATION),
643 errmsg("unexpected message type 0x%02X during COPY from stdin",
644 mtype)));
645 break;
646 }
647 }
648 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
649 if (avail > maxread)
650 avail = maxread;
651 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
652 databuf = (void *) ((char *) databuf + avail);
653 maxread -= avail;
654 bytesread += avail;
655 }
656 break;
657 case COPY_CALLBACK:
658 bytesread = cstate->data_source_cb(databuf, minread, maxread);
659 break;
660 }
661
662 return bytesread;
663 }
664
665
666 /*
667 * These functions do apply some data conversion
668 */
669
670 /*
671 * CopySendInt32 sends an int32 in network byte order
672 */
673 static void
CopySendInt32(CopyState cstate,int32 val)674 CopySendInt32(CopyState cstate, int32 val)
675 {
676 uint32 buf;
677
678 buf = htonl((uint32) val);
679 CopySendData(cstate, &buf, sizeof(buf));
680 }
681
682 /*
683 * CopyGetInt32 reads an int32 that appears in network byte order
684 *
685 * Returns true if OK, false if EOF
686 */
687 static bool
CopyGetInt32(CopyState cstate,int32 * val)688 CopyGetInt32(CopyState cstate, int32 *val)
689 {
690 uint32 buf;
691
692 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
693 {
694 *val = 0; /* suppress compiler warning */
695 return false;
696 }
697 *val = (int32) ntohl(buf);
698 return true;
699 }
700
701 /*
702 * CopySendInt16 sends an int16 in network byte order
703 */
704 static void
CopySendInt16(CopyState cstate,int16 val)705 CopySendInt16(CopyState cstate, int16 val)
706 {
707 uint16 buf;
708
709 buf = htons((uint16) val);
710 CopySendData(cstate, &buf, sizeof(buf));
711 }
712
713 /*
714 * CopyGetInt16 reads an int16 that appears in network byte order
715 */
716 static bool
CopyGetInt16(CopyState cstate,int16 * val)717 CopyGetInt16(CopyState cstate, int16 *val)
718 {
719 uint16 buf;
720
721 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
722 {
723 *val = 0; /* suppress compiler warning */
724 return false;
725 }
726 *val = (int16) ntohs(buf);
727 return true;
728 }
729
730
731 /*
732 * CopyLoadRawBuf loads some more data into raw_buf
733 *
734 * Returns TRUE if able to obtain at least one more byte, else FALSE.
735 *
736 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
737 * down to the start of the buffer and then we load more data after that.
738 * This case is used only when a frontend multibyte character crosses a
739 * bufferload boundary.
740 */
741 static bool
CopyLoadRawBuf(CopyState cstate)742 CopyLoadRawBuf(CopyState cstate)
743 {
744 int nbytes;
745 int inbytes;
746
747 if (cstate->raw_buf_index < cstate->raw_buf_len)
748 {
749 /* Copy down the unprocessed data */
750 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
751 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
752 nbytes);
753 }
754 else
755 nbytes = 0; /* no data need be saved */
756
757 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
758 1, RAW_BUF_SIZE - nbytes);
759 nbytes += inbytes;
760 cstate->raw_buf[nbytes] = '\0';
761 cstate->raw_buf_index = 0;
762 cstate->raw_buf_len = nbytes;
763 return (inbytes > 0);
764 }
765
766
767 /*
768 * DoCopy executes the SQL COPY statement
769 *
770 * Either unload or reload contents of table <relation>, depending on <from>.
771 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
772 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
773 * or DELETE query.
774 *
775 * If <pipe> is false, transfer is between the table and the file named
776 * <filename>. Otherwise, transfer is between the table and our regular
777 * input/output stream. The latter could be either stdin/stdout or a
778 * socket, depending on whether we're running under Postmaster control.
779 *
780 * Do not allow a Postgres user without superuser privilege to read from
781 * or write to a file.
782 *
783 * Do not allow the copy if user doesn't have proper permission to access
784 * the table or the specifically requested columns.
785 */
786 void
DoCopy(ParseState * pstate,const CopyStmt * stmt,int stmt_location,int stmt_len,uint64 * processed)787 DoCopy(ParseState *pstate, const CopyStmt *stmt,
788 int stmt_location, int stmt_len,
789 uint64 *processed)
790 {
791 CopyState cstate;
792 bool is_from = stmt->is_from;
793 bool pipe = (stmt->filename == NULL);
794 Relation rel;
795 Oid relid;
796 RawStmt *query = NULL;
797
798 /* Disallow COPY to/from file or program except to superusers. */
799 if (!pipe && !superuser())
800 {
801 if (stmt->is_program)
802 ereport(ERROR,
803 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
804 errmsg("must be superuser to COPY to or from an external program"),
805 errhint("Anyone can COPY to stdout or from stdin. "
806 "psql's \\copy command also works for anyone.")));
807 else
808 ereport(ERROR,
809 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
810 errmsg("must be superuser to COPY to or from a file"),
811 errhint("Anyone can COPY to stdout or from stdin. "
812 "psql's \\copy command also works for anyone.")));
813 }
814
815 if (stmt->relation)
816 {
817 TupleDesc tupDesc;
818 List *attnums;
819 ListCell *cur;
820 RangeTblEntry *rte;
821
822 Assert(!stmt->query);
823
824 /* Open and lock the relation, using the appropriate lock type. */
825 rel = heap_openrv(stmt->relation,
826 (is_from ? RowExclusiveLock : AccessShareLock));
827
828 relid = RelationGetRelid(rel);
829
830 rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
831 rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
832
833 tupDesc = RelationGetDescr(rel);
834 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
835 foreach(cur, attnums)
836 {
837 int attno = lfirst_int(cur) -
838 FirstLowInvalidHeapAttributeNumber;
839
840 if (is_from)
841 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
842 else
843 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
844 }
845 ExecCheckRTPerms(pstate->p_rtable, true);
846
847 /*
848 * Permission check for row security policies.
849 *
850 * check_enable_rls will ereport(ERROR) if the user has requested
851 * something invalid and will otherwise indicate if we should enable
852 * RLS (returns RLS_ENABLED) or not for this COPY statement.
853 *
854 * If the relation has a row security policy and we are to apply it
855 * then perform a "query" copy and allow the normal query processing
856 * to handle the policies.
857 *
858 * If RLS is not enabled for this, then just fall through to the
859 * normal non-filtering relation handling.
860 */
861 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
862 {
863 SelectStmt *select;
864 ColumnRef *cr;
865 ResTarget *target;
866 RangeVar *from;
867 List *targetList = NIL;
868
869 if (is_from)
870 ereport(ERROR,
871 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 errmsg("COPY FROM not supported with row-level security"),
873 errhint("Use INSERT statements instead.")));
874
875 /*
876 * Build target list
877 *
878 * If no columns are specified in the attribute list of the COPY
879 * command, then the target list is 'all' columns. Therefore, '*'
880 * should be used as the target list for the resulting SELECT
881 * statement.
882 *
883 * In the case that columns are specified in the attribute list,
884 * create a ColumnRef and ResTarget for each column and add them
885 * to the target list for the resulting SELECT statement.
886 */
887 if (!stmt->attlist)
888 {
889 cr = makeNode(ColumnRef);
890 cr->fields = list_make1(makeNode(A_Star));
891 cr->location = -1;
892
893 target = makeNode(ResTarget);
894 target->name = NULL;
895 target->indirection = NIL;
896 target->val = (Node *) cr;
897 target->location = -1;
898
899 targetList = list_make1(target);
900 }
901 else
902 {
903 ListCell *lc;
904
905 foreach(lc, stmt->attlist)
906 {
907 /*
908 * Build the ColumnRef for each column. The ColumnRef
909 * 'fields' property is a String 'Value' node (see
910 * nodes/value.h) that corresponds to the column name
911 * respectively.
912 */
913 cr = makeNode(ColumnRef);
914 cr->fields = list_make1(lfirst(lc));
915 cr->location = -1;
916
917 /* Build the ResTarget and add the ColumnRef to it. */
918 target = makeNode(ResTarget);
919 target->name = NULL;
920 target->indirection = NIL;
921 target->val = (Node *) cr;
922 target->location = -1;
923
924 /* Add each column to the SELECT statement's target list */
925 targetList = lappend(targetList, target);
926 }
927 }
928
929 /*
930 * Build RangeVar for from clause, fully qualified based on the
931 * relation which we have opened and locked.
932 */
933 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
934 pstrdup(RelationGetRelationName(rel)),
935 -1);
936
937 /* Build query */
938 select = makeNode(SelectStmt);
939 select->targetList = targetList;
940 select->fromClause = list_make1(from);
941
942 query = makeNode(RawStmt);
943 query->stmt = (Node *) select;
944 query->stmt_location = stmt_location;
945 query->stmt_len = stmt_len;
946
947 /*
948 * Close the relation for now, but keep the lock on it to prevent
949 * changes between now and when we start the query-based COPY.
950 *
951 * We'll reopen it later as part of the query-based COPY.
952 */
953 heap_close(rel, NoLock);
954 rel = NULL;
955 }
956 }
957 else
958 {
959 Assert(stmt->query);
960
961 query = makeNode(RawStmt);
962 query->stmt = stmt->query;
963 query->stmt_location = stmt_location;
964 query->stmt_len = stmt_len;
965
966 relid = InvalidOid;
967 rel = NULL;
968 }
969
970 if (is_from)
971 {
972 Assert(rel);
973
974 /* check read-only transaction and parallel mode */
975 if (XactReadOnly && !rel->rd_islocaltemp)
976 PreventCommandIfReadOnly("COPY FROM");
977 PreventCommandIfParallelMode("COPY FROM");
978
979 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
980 NULL, stmt->attlist, stmt->options);
981 *processed = CopyFrom(cstate); /* copy from file to database */
982 EndCopyFrom(cstate);
983 }
984 else
985 {
986 cstate = BeginCopyTo(pstate, rel, query, relid,
987 stmt->filename, stmt->is_program,
988 stmt->attlist, stmt->options);
989 *processed = DoCopyTo(cstate); /* copy from database to file */
990 EndCopyTo(cstate);
991 }
992
993 /*
994 * Close the relation. If reading, we can release the AccessShareLock we
995 * got; if writing, we should hold the lock until end of transaction to
996 * ensure that updates will be committed before lock is released.
997 */
998 if (rel != NULL)
999 heap_close(rel, (is_from ? NoLock : AccessShareLock));
1000 }
1001
1002 /*
1003 * Process the statement option list for COPY.
1004 *
1005 * Scan the options list (a list of DefElem) and transpose the information
1006 * into cstate, applying appropriate error checking.
1007 *
1008 * cstate is assumed to be filled with zeroes initially.
1009 *
1010 * This is exported so that external users of the COPY API can sanity-check
1011 * a list of options. In that usage, cstate should be passed as NULL
1012 * (since external users don't know sizeof(CopyStateData)) and the collected
1013 * data is just leaked until CurrentMemoryContext is reset.
1014 *
1015 * Note that additional checking, such as whether column names listed in FORCE
1016 * QUOTE actually exist, has to be applied later. This just checks for
1017 * self-consistency of the options list.
1018 */
1019 void
ProcessCopyOptions(ParseState * pstate,CopyState cstate,bool is_from,List * options)1020 ProcessCopyOptions(ParseState *pstate,
1021 CopyState cstate,
1022 bool is_from,
1023 List *options)
1024 {
1025 bool format_specified = false;
1026 ListCell *option;
1027
1028 /* Support external use for option sanity checking */
1029 if (cstate == NULL)
1030 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1031
1032 cstate->is_copy_from = is_from;
1033
1034 cstate->file_encoding = -1;
1035
1036 /* Extract options from the statement node tree */
1037 foreach(option, options)
1038 {
1039 DefElem *defel = lfirst_node(DefElem, option);
1040
1041 if (strcmp(defel->defname, "format") == 0)
1042 {
1043 char *fmt = defGetString(defel);
1044
1045 if (format_specified)
1046 ereport(ERROR,
1047 (errcode(ERRCODE_SYNTAX_ERROR),
1048 errmsg("conflicting or redundant options"),
1049 parser_errposition(pstate, defel->location)));
1050 format_specified = true;
1051 if (strcmp(fmt, "text") == 0)
1052 /* default format */ ;
1053 else if (strcmp(fmt, "csv") == 0)
1054 cstate->csv_mode = true;
1055 else if (strcmp(fmt, "binary") == 0)
1056 cstate->binary = true;
1057 else
1058 ereport(ERROR,
1059 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1060 errmsg("COPY format \"%s\" not recognized", fmt),
1061 parser_errposition(pstate, defel->location)));
1062 }
1063 else if (strcmp(defel->defname, "oids") == 0)
1064 {
1065 if (cstate->oids)
1066 ereport(ERROR,
1067 (errcode(ERRCODE_SYNTAX_ERROR),
1068 errmsg("conflicting or redundant options"),
1069 parser_errposition(pstate, defel->location)));
1070 cstate->oids = defGetBoolean(defel);
1071 }
1072 else if (strcmp(defel->defname, "freeze") == 0)
1073 {
1074 if (cstate->freeze)
1075 ereport(ERROR,
1076 (errcode(ERRCODE_SYNTAX_ERROR),
1077 errmsg("conflicting or redundant options"),
1078 parser_errposition(pstate, defel->location)));
1079 cstate->freeze = defGetBoolean(defel);
1080 }
1081 else if (strcmp(defel->defname, "delimiter") == 0)
1082 {
1083 if (cstate->delim)
1084 ereport(ERROR,
1085 (errcode(ERRCODE_SYNTAX_ERROR),
1086 errmsg("conflicting or redundant options"),
1087 parser_errposition(pstate, defel->location)));
1088 cstate->delim = defGetString(defel);
1089 }
1090 else if (strcmp(defel->defname, "null") == 0)
1091 {
1092 if (cstate->null_print)
1093 ereport(ERROR,
1094 (errcode(ERRCODE_SYNTAX_ERROR),
1095 errmsg("conflicting or redundant options"),
1096 parser_errposition(pstate, defel->location)));
1097 cstate->null_print = defGetString(defel);
1098 }
1099 else if (strcmp(defel->defname, "header") == 0)
1100 {
1101 if (cstate->header_line)
1102 ereport(ERROR,
1103 (errcode(ERRCODE_SYNTAX_ERROR),
1104 errmsg("conflicting or redundant options"),
1105 parser_errposition(pstate, defel->location)));
1106 cstate->header_line = defGetBoolean(defel);
1107 }
1108 else if (strcmp(defel->defname, "quote") == 0)
1109 {
1110 if (cstate->quote)
1111 ereport(ERROR,
1112 (errcode(ERRCODE_SYNTAX_ERROR),
1113 errmsg("conflicting or redundant options"),
1114 parser_errposition(pstate, defel->location)));
1115 cstate->quote = defGetString(defel);
1116 }
1117 else if (strcmp(defel->defname, "escape") == 0)
1118 {
1119 if (cstate->escape)
1120 ereport(ERROR,
1121 (errcode(ERRCODE_SYNTAX_ERROR),
1122 errmsg("conflicting or redundant options"),
1123 parser_errposition(pstate, defel->location)));
1124 cstate->escape = defGetString(defel);
1125 }
1126 else if (strcmp(defel->defname, "force_quote") == 0)
1127 {
1128 if (cstate->force_quote || cstate->force_quote_all)
1129 ereport(ERROR,
1130 (errcode(ERRCODE_SYNTAX_ERROR),
1131 errmsg("conflicting or redundant options"),
1132 parser_errposition(pstate, defel->location)));
1133 if (defel->arg && IsA(defel->arg, A_Star))
1134 cstate->force_quote_all = true;
1135 else if (defel->arg && IsA(defel->arg, List))
1136 cstate->force_quote = castNode(List, defel->arg);
1137 else
1138 ereport(ERROR,
1139 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1140 errmsg("argument to option \"%s\" must be a list of column names",
1141 defel->defname),
1142 parser_errposition(pstate, defel->location)));
1143 }
1144 else if (strcmp(defel->defname, "force_not_null") == 0)
1145 {
1146 if (cstate->force_notnull)
1147 ereport(ERROR,
1148 (errcode(ERRCODE_SYNTAX_ERROR),
1149 errmsg("conflicting or redundant options"),
1150 parser_errposition(pstate, defel->location)));
1151 if (defel->arg && IsA(defel->arg, List))
1152 cstate->force_notnull = castNode(List, defel->arg);
1153 else
1154 ereport(ERROR,
1155 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1156 errmsg("argument to option \"%s\" must be a list of column names",
1157 defel->defname),
1158 parser_errposition(pstate, defel->location)));
1159 }
1160 else if (strcmp(defel->defname, "force_null") == 0)
1161 {
1162 if (cstate->force_null)
1163 ereport(ERROR,
1164 (errcode(ERRCODE_SYNTAX_ERROR),
1165 errmsg("conflicting or redundant options")));
1166 if (defel->arg && IsA(defel->arg, List))
1167 cstate->force_null = castNode(List, defel->arg);
1168 else
1169 ereport(ERROR,
1170 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1171 errmsg("argument to option \"%s\" must be a list of column names",
1172 defel->defname),
1173 parser_errposition(pstate, defel->location)));
1174 }
1175 else if (strcmp(defel->defname, "convert_selectively") == 0)
1176 {
1177 /*
1178 * Undocumented, not-accessible-from-SQL option: convert only the
1179 * named columns to binary form, storing the rest as NULLs. It's
1180 * allowed for the column list to be NIL.
1181 */
1182 if (cstate->convert_selectively)
1183 ereport(ERROR,
1184 (errcode(ERRCODE_SYNTAX_ERROR),
1185 errmsg("conflicting or redundant options"),
1186 parser_errposition(pstate, defel->location)));
1187 cstate->convert_selectively = true;
1188 if (defel->arg == NULL || IsA(defel->arg, List))
1189 cstate->convert_select = castNode(List, defel->arg);
1190 else
1191 ereport(ERROR,
1192 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1193 errmsg("argument to option \"%s\" must be a list of column names",
1194 defel->defname),
1195 parser_errposition(pstate, defel->location)));
1196 }
1197 else if (strcmp(defel->defname, "encoding") == 0)
1198 {
1199 if (cstate->file_encoding >= 0)
1200 ereport(ERROR,
1201 (errcode(ERRCODE_SYNTAX_ERROR),
1202 errmsg("conflicting or redundant options"),
1203 parser_errposition(pstate, defel->location)));
1204 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1205 if (cstate->file_encoding < 0)
1206 ereport(ERROR,
1207 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1208 errmsg("argument to option \"%s\" must be a valid encoding name",
1209 defel->defname),
1210 parser_errposition(pstate, defel->location)));
1211 }
1212 else
1213 ereport(ERROR,
1214 (errcode(ERRCODE_SYNTAX_ERROR),
1215 errmsg("option \"%s\" not recognized",
1216 defel->defname),
1217 parser_errposition(pstate, defel->location)));
1218 }
1219
1220 /*
1221 * Check for incompatible options (must do these two before inserting
1222 * defaults)
1223 */
1224 if (cstate->binary && cstate->delim)
1225 ereport(ERROR,
1226 (errcode(ERRCODE_SYNTAX_ERROR),
1227 errmsg("cannot specify DELIMITER in BINARY mode")));
1228
1229 if (cstate->binary && cstate->null_print)
1230 ereport(ERROR,
1231 (errcode(ERRCODE_SYNTAX_ERROR),
1232 errmsg("cannot specify NULL in BINARY mode")));
1233
1234 /* Set defaults for omitted options */
1235 if (!cstate->delim)
1236 cstate->delim = cstate->csv_mode ? "," : "\t";
1237
1238 if (!cstate->null_print)
1239 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1240 cstate->null_print_len = strlen(cstate->null_print);
1241
1242 if (cstate->csv_mode)
1243 {
1244 if (!cstate->quote)
1245 cstate->quote = "\"";
1246 if (!cstate->escape)
1247 cstate->escape = cstate->quote;
1248 }
1249
1250 /* Only single-byte delimiter strings are supported. */
1251 if (strlen(cstate->delim) != 1)
1252 ereport(ERROR,
1253 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1254 errmsg("COPY delimiter must be a single one-byte character")));
1255
1256 /* Disallow end-of-line characters */
1257 if (strchr(cstate->delim, '\r') != NULL ||
1258 strchr(cstate->delim, '\n') != NULL)
1259 ereport(ERROR,
1260 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1261 errmsg("COPY delimiter cannot be newline or carriage return")));
1262
1263 if (strchr(cstate->null_print, '\r') != NULL ||
1264 strchr(cstate->null_print, '\n') != NULL)
1265 ereport(ERROR,
1266 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1267 errmsg("COPY null representation cannot use newline or carriage return")));
1268
1269 /*
1270 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1271 * backslash because it would be ambiguous. We can't allow the other
1272 * cases because data characters matching the delimiter must be
1273 * backslashed, and certain backslash combinations are interpreted
1274 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1275 * more than strictly necessary, but seems best for consistency and
1276 * future-proofing. Likewise we disallow all digits though only octal
1277 * digits are actually dangerous.
1278 */
1279 if (!cstate->csv_mode &&
1280 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1281 cstate->delim[0]) != NULL)
1282 ereport(ERROR,
1283 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1284 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1285
1286 /* Check header */
1287 if (!cstate->csv_mode && cstate->header_line)
1288 ereport(ERROR,
1289 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1290 errmsg("COPY HEADER available only in CSV mode")));
1291
1292 /* Check quote */
1293 if (!cstate->csv_mode && cstate->quote != NULL)
1294 ereport(ERROR,
1295 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1296 errmsg("COPY quote available only in CSV mode")));
1297
1298 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1299 ereport(ERROR,
1300 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1301 errmsg("COPY quote must be a single one-byte character")));
1302
1303 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1304 ereport(ERROR,
1305 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1306 errmsg("COPY delimiter and quote must be different")));
1307
1308 /* Check escape */
1309 if (!cstate->csv_mode && cstate->escape != NULL)
1310 ereport(ERROR,
1311 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1312 errmsg("COPY escape available only in CSV mode")));
1313
1314 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1315 ereport(ERROR,
1316 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317 errmsg("COPY escape must be a single one-byte character")));
1318
1319 /* Check force_quote */
1320 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1321 ereport(ERROR,
1322 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1323 errmsg("COPY force quote available only in CSV mode")));
1324 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1325 ereport(ERROR,
1326 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 errmsg("COPY force quote only available using COPY TO")));
1328
1329 /* Check force_notnull */
1330 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1331 ereport(ERROR,
1332 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1333 errmsg("COPY force not null available only in CSV mode")));
1334 if (cstate->force_notnull != NIL && !is_from)
1335 ereport(ERROR,
1336 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337 errmsg("COPY force not null only available using COPY FROM")));
1338
1339 /* Check force_null */
1340 if (!cstate->csv_mode && cstate->force_null != NIL)
1341 ereport(ERROR,
1342 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1343 errmsg("COPY force null available only in CSV mode")));
1344
1345 if (cstate->force_null != NIL && !is_from)
1346 ereport(ERROR,
1347 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 errmsg("COPY force null only available using COPY FROM")));
1349
1350 /* Don't allow the delimiter to appear in the null string. */
1351 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1352 ereport(ERROR,
1353 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1354 errmsg("COPY delimiter must not appear in the NULL specification")));
1355
1356 /* Don't allow the CSV quote char to appear in the null string. */
1357 if (cstate->csv_mode &&
1358 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1359 ereport(ERROR,
1360 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1361 errmsg("CSV quote character must not appear in the NULL specification")));
1362 }
1363
1364 /*
1365 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1366 *
1367 * Iff <binary>, unload or reload in the binary format, as opposed to the
1368 * more wasteful but more robust and portable text format.
1369 *
1370 * Iff <oids>, unload or reload the format that includes OID information.
1371 * On input, we accept OIDs whether or not the table has an OID column,
1372 * but silently drop them if it does not. On output, we report an error
1373 * if the user asks for OIDs in a table that has none (not providing an
1374 * OID column might seem friendlier, but could seriously confuse programs).
1375 *
1376 * If in the text format, delimit columns with delimiter <delim> and print
1377 * NULL values as <null_print>.
1378 */
1379 static CopyState
BeginCopy(ParseState * pstate,bool is_from,Relation rel,RawStmt * raw_query,Oid queryRelId,List * attnamelist,List * options)1380 BeginCopy(ParseState *pstate,
1381 bool is_from,
1382 Relation rel,
1383 RawStmt *raw_query,
1384 Oid queryRelId,
1385 List *attnamelist,
1386 List *options)
1387 {
1388 CopyState cstate;
1389 TupleDesc tupDesc;
1390 int num_phys_attrs;
1391 MemoryContext oldcontext;
1392
1393 /* Allocate workspace and zero all fields */
1394 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1395
1396 /*
1397 * We allocate everything used by a cstate in a new memory context. This
1398 * avoids memory leaks during repeated use of COPY in a query.
1399 */
1400 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1401 "COPY",
1402 ALLOCSET_DEFAULT_SIZES);
1403
1404 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1405
1406 /* Extract options from the statement node tree */
1407 ProcessCopyOptions(pstate, cstate, is_from, options);
1408
1409 /* Process the source/target relation or query */
1410 if (rel)
1411 {
1412 Assert(!raw_query);
1413
1414 cstate->rel = rel;
1415
1416 tupDesc = RelationGetDescr(cstate->rel);
1417
1418 /* Don't allow COPY w/ OIDs to or from a table without them */
1419 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1420 ereport(ERROR,
1421 (errcode(ERRCODE_UNDEFINED_COLUMN),
1422 errmsg("table \"%s\" does not have OIDs",
1423 RelationGetRelationName(cstate->rel))));
1424 }
1425 else
1426 {
1427 List *rewritten;
1428 Query *query;
1429 PlannedStmt *plan;
1430 DestReceiver *dest;
1431
1432 Assert(!is_from);
1433 cstate->rel = NULL;
1434
1435 /* Don't allow COPY w/ OIDs from a query */
1436 if (cstate->oids)
1437 ereport(ERROR,
1438 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1439 errmsg("COPY (query) WITH OIDS is not supported")));
1440
1441 /*
1442 * Run parse analysis and rewrite. Note this also acquires sufficient
1443 * locks on the source table(s).
1444 *
1445 * Because the parser and planner tend to scribble on their input, we
1446 * make a preliminary copy of the source querytree. This prevents
1447 * problems in the case that the COPY is in a portal or plpgsql
1448 * function and is executed repeatedly. (See also the same hack in
1449 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1450 */
1451 rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1452 pstate->p_sourcetext, NULL, 0,
1453 NULL);
1454
1455 /* check that we got back something we can work with */
1456 if (rewritten == NIL)
1457 {
1458 ereport(ERROR,
1459 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1460 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1461 }
1462 else if (list_length(rewritten) > 1)
1463 {
1464 ListCell *lc;
1465
1466 /* examine queries to determine which error message to issue */
1467 foreach(lc, rewritten)
1468 {
1469 Query *q = lfirst_node(Query, lc);
1470
1471 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1472 ereport(ERROR,
1473 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1474 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1475 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1476 ereport(ERROR,
1477 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1478 errmsg("DO ALSO rules are not supported for the COPY")));
1479 }
1480
1481 ereport(ERROR,
1482 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1483 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1484 }
1485
1486 query = linitial_node(Query, rewritten);
1487
1488 /* The grammar allows SELECT INTO, but we don't support that */
1489 if (query->utilityStmt != NULL &&
1490 IsA(query->utilityStmt, CreateTableAsStmt))
1491 ereport(ERROR,
1492 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1493 errmsg("COPY (SELECT INTO) is not supported")));
1494
1495 Assert(query->utilityStmt == NULL);
1496
1497 /*
1498 * Similarly the grammar doesn't enforce the presence of a RETURNING
1499 * clause, but this is required here.
1500 */
1501 if (query->commandType != CMD_SELECT &&
1502 query->returningList == NIL)
1503 {
1504 Assert(query->commandType == CMD_INSERT ||
1505 query->commandType == CMD_UPDATE ||
1506 query->commandType == CMD_DELETE);
1507
1508 ereport(ERROR,
1509 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1510 errmsg("COPY query must have a RETURNING clause")));
1511 }
1512
1513 /* plan the query */
1514 plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1515
1516 /*
1517 * With row level security and a user using "COPY relation TO", we
1518 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1519 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1520 * in any RLS clauses.
1521 *
1522 * When this happens, we are passed in the relid of the originally
1523 * found relation (which we have locked). As the planner will look up
1524 * the relation again, we double-check here to make sure it found the
1525 * same one that we have locked.
1526 */
1527 if (queryRelId != InvalidOid)
1528 {
1529 /*
1530 * Note that with RLS involved there may be multiple relations,
1531 * and while the one we need is almost certainly first, we don't
1532 * make any guarantees of that in the planner, so check the whole
1533 * list and make sure we find the original relation.
1534 */
1535 if (!list_member_oid(plan->relationOids, queryRelId))
1536 ereport(ERROR,
1537 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1538 errmsg("relation referenced by COPY statement has changed")));
1539 }
1540
1541 /*
1542 * Use a snapshot with an updated command ID to ensure this query sees
1543 * results of any previously executed queries.
1544 */
1545 PushCopiedSnapshot(GetActiveSnapshot());
1546 UpdateActiveSnapshotCommandId();
1547
1548 /* Create dest receiver for COPY OUT */
1549 dest = CreateDestReceiver(DestCopyOut);
1550 ((DR_copy *) dest)->cstate = cstate;
1551
1552 /* Create a QueryDesc requesting no output */
1553 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1554 GetActiveSnapshot(),
1555 InvalidSnapshot,
1556 dest, NULL, NULL, 0);
1557
1558 /*
1559 * Call ExecutorStart to prepare the plan for execution.
1560 *
1561 * ExecutorStart computes a result tupdesc for us
1562 */
1563 ExecutorStart(cstate->queryDesc, 0);
1564
1565 tupDesc = cstate->queryDesc->tupDesc;
1566 }
1567
1568 /* Generate or convert list of attributes to process */
1569 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1570
1571 num_phys_attrs = tupDesc->natts;
1572
1573 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1574 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1575 if (cstate->force_quote_all)
1576 {
1577 int i;
1578
1579 for (i = 0; i < num_phys_attrs; i++)
1580 cstate->force_quote_flags[i] = true;
1581 }
1582 else if (cstate->force_quote)
1583 {
1584 List *attnums;
1585 ListCell *cur;
1586
1587 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1588
1589 foreach(cur, attnums)
1590 {
1591 int attnum = lfirst_int(cur);
1592
1593 if (!list_member_int(cstate->attnumlist, attnum))
1594 ereport(ERROR,
1595 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1596 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1597 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1598 cstate->force_quote_flags[attnum - 1] = true;
1599 }
1600 }
1601
1602 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1603 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1604 if (cstate->force_notnull)
1605 {
1606 List *attnums;
1607 ListCell *cur;
1608
1609 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1610
1611 foreach(cur, attnums)
1612 {
1613 int attnum = lfirst_int(cur);
1614
1615 if (!list_member_int(cstate->attnumlist, attnum))
1616 ereport(ERROR,
1617 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1618 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1619 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1620 cstate->force_notnull_flags[attnum - 1] = true;
1621 }
1622 }
1623
1624 /* Convert FORCE_NULL name list to per-column flags, check validity */
1625 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1626 if (cstate->force_null)
1627 {
1628 List *attnums;
1629 ListCell *cur;
1630
1631 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1632
1633 foreach(cur, attnums)
1634 {
1635 int attnum = lfirst_int(cur);
1636
1637 if (!list_member_int(cstate->attnumlist, attnum))
1638 ereport(ERROR,
1639 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1640 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1641 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1642 cstate->force_null_flags[attnum - 1] = true;
1643 }
1644 }
1645
1646 /* Convert convert_selectively name list to per-column flags */
1647 if (cstate->convert_selectively)
1648 {
1649 List *attnums;
1650 ListCell *cur;
1651
1652 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1653
1654 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1655
1656 foreach(cur, attnums)
1657 {
1658 int attnum = lfirst_int(cur);
1659
1660 if (!list_member_int(cstate->attnumlist, attnum))
1661 ereport(ERROR,
1662 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1663 errmsg_internal("selected column \"%s\" not referenced by COPY",
1664 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1665 cstate->convert_select_flags[attnum - 1] = true;
1666 }
1667 }
1668
1669 /* Use client encoding when ENCODING option is not specified. */
1670 if (cstate->file_encoding < 0)
1671 cstate->file_encoding = pg_get_client_encoding();
1672
1673 /*
1674 * Set up encoding conversion info. Even if the file and server encodings
1675 * are the same, we must apply pg_any_to_server() to validate data in
1676 * multibyte encodings.
1677 */
1678 cstate->need_transcoding =
1679 (cstate->file_encoding != GetDatabaseEncoding() ||
1680 pg_database_encoding_max_length() > 1);
1681 /* See Multibyte encoding comment above */
1682 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1683
1684 cstate->copy_dest = COPY_FILE; /* default */
1685
1686 MemoryContextSwitchTo(oldcontext);
1687
1688 return cstate;
1689 }
1690
1691 /*
1692 * Closes the pipe to an external program, checking the pclose() return code.
1693 */
1694 static void
ClosePipeToProgram(CopyState cstate)1695 ClosePipeToProgram(CopyState cstate)
1696 {
1697 int pclose_rc;
1698
1699 Assert(cstate->is_program);
1700
1701 pclose_rc = ClosePipeStream(cstate->copy_file);
1702 if (pclose_rc == -1)
1703 ereport(ERROR,
1704 (errcode_for_file_access(),
1705 errmsg("could not close pipe to external command: %m")));
1706 else if (pclose_rc != 0)
1707 {
1708 /*
1709 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1710 * expectable for the called program to fail with SIGPIPE, and we
1711 * should not report that as an error. Otherwise, SIGPIPE indicates a
1712 * problem.
1713 */
1714 if (cstate->is_copy_from && !cstate->reached_eof &&
1715 wait_result_is_signal(pclose_rc, SIGPIPE))
1716 return;
1717
1718 ereport(ERROR,
1719 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1720 errmsg("program \"%s\" failed",
1721 cstate->filename),
1722 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1723 }
1724 }
1725
1726 /*
1727 * Release resources allocated in a cstate for COPY TO/FROM.
1728 */
1729 static void
EndCopy(CopyState cstate)1730 EndCopy(CopyState cstate)
1731 {
1732 if (cstate->is_program)
1733 {
1734 ClosePipeToProgram(cstate);
1735 }
1736 else
1737 {
1738 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1739 ereport(ERROR,
1740 (errcode_for_file_access(),
1741 errmsg("could not close file \"%s\": %m",
1742 cstate->filename)));
1743 }
1744
1745 MemoryContextDelete(cstate->copycontext);
1746 pfree(cstate);
1747 }
1748
1749 /*
1750 * Setup CopyState to read tuples from a table or a query for COPY TO.
1751 */
1752 static CopyState
BeginCopyTo(ParseState * pstate,Relation rel,RawStmt * query,Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1753 BeginCopyTo(ParseState *pstate,
1754 Relation rel,
1755 RawStmt *query,
1756 Oid queryRelId,
1757 const char *filename,
1758 bool is_program,
1759 List *attnamelist,
1760 List *options)
1761 {
1762 CopyState cstate;
1763 bool pipe = (filename == NULL);
1764 MemoryContext oldcontext;
1765
1766 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1767 {
1768 if (rel->rd_rel->relkind == RELKIND_VIEW)
1769 ereport(ERROR,
1770 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1771 errmsg("cannot copy from view \"%s\"",
1772 RelationGetRelationName(rel)),
1773 errhint("Try the COPY (SELECT ...) TO variant.")));
1774 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1775 ereport(ERROR,
1776 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1777 errmsg("cannot copy from materialized view \"%s\"",
1778 RelationGetRelationName(rel)),
1779 errhint("Try the COPY (SELECT ...) TO variant.")));
1780 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1781 ereport(ERROR,
1782 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1783 errmsg("cannot copy from foreign table \"%s\"",
1784 RelationGetRelationName(rel)),
1785 errhint("Try the COPY (SELECT ...) TO variant.")));
1786 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1787 ereport(ERROR,
1788 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1789 errmsg("cannot copy from sequence \"%s\"",
1790 RelationGetRelationName(rel))));
1791 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1792 ereport(ERROR,
1793 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1794 errmsg("cannot copy from partitioned table \"%s\"",
1795 RelationGetRelationName(rel)),
1796 errhint("Try the COPY (SELECT ...) TO variant.")));
1797 else
1798 ereport(ERROR,
1799 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1800 errmsg("cannot copy from non-table relation \"%s\"",
1801 RelationGetRelationName(rel))));
1802 }
1803
1804 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1805 options);
1806 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1807
1808 if (pipe)
1809 {
1810 Assert(!is_program); /* the grammar does not allow this */
1811 if (whereToSendOutput != DestRemote)
1812 cstate->copy_file = stdout;
1813 }
1814 else
1815 {
1816 cstate->filename = pstrdup(filename);
1817 cstate->is_program = is_program;
1818
1819 if (is_program)
1820 {
1821 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1822 if (cstate->copy_file == NULL)
1823 ereport(ERROR,
1824 (errcode_for_file_access(),
1825 errmsg("could not execute command \"%s\": %m",
1826 cstate->filename)));
1827 }
1828 else
1829 {
1830 mode_t oumask; /* Pre-existing umask value */
1831 struct stat st;
1832
1833 /*
1834 * Prevent write to relative path ... too easy to shoot oneself in
1835 * the foot by overwriting a database file ...
1836 */
1837 if (!is_absolute_path(filename))
1838 ereport(ERROR,
1839 (errcode(ERRCODE_INVALID_NAME),
1840 errmsg("relative path not allowed for COPY to file")));
1841
1842 oumask = umask(S_IWGRP | S_IWOTH);
1843 PG_TRY();
1844 {
1845 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1846 }
1847 PG_CATCH();
1848 {
1849 umask(oumask);
1850 PG_RE_THROW();
1851 }
1852 PG_END_TRY();
1853 umask(oumask);
1854 if (cstate->copy_file == NULL)
1855 {
1856 /* copy errno because ereport subfunctions might change it */
1857 int save_errno = errno;
1858
1859 ereport(ERROR,
1860 (errcode_for_file_access(),
1861 errmsg("could not open file \"%s\" for writing: %m",
1862 cstate->filename),
1863 (save_errno == ENOENT || save_errno == EACCES) ?
1864 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1865 "You may want a client-side facility such as psql's \\copy.") : 0));
1866 }
1867
1868 if (fstat(fileno(cstate->copy_file), &st))
1869 ereport(ERROR,
1870 (errcode_for_file_access(),
1871 errmsg("could not stat file \"%s\": %m",
1872 cstate->filename)));
1873
1874 if (S_ISDIR(st.st_mode))
1875 ereport(ERROR,
1876 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1877 errmsg("\"%s\" is a directory", cstate->filename)));
1878 }
1879 }
1880
1881 MemoryContextSwitchTo(oldcontext);
1882
1883 return cstate;
1884 }
1885
1886 /*
1887 * This intermediate routine exists mainly to localize the effects of setjmp
1888 * so we don't need to plaster a lot of variables with "volatile".
1889 */
1890 static uint64
DoCopyTo(CopyState cstate)1891 DoCopyTo(CopyState cstate)
1892 {
1893 bool pipe = (cstate->filename == NULL);
1894 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1895 uint64 processed;
1896
1897 PG_TRY();
1898 {
1899 if (fe_copy)
1900 SendCopyBegin(cstate);
1901
1902 processed = CopyTo(cstate);
1903
1904 if (fe_copy)
1905 SendCopyEnd(cstate);
1906 }
1907 PG_CATCH();
1908 {
1909 /*
1910 * Make sure we turn off old-style COPY OUT mode upon error. It is
1911 * okay to do this in all cases, since it does nothing if the mode is
1912 * not on.
1913 */
1914 pq_endcopyout(true);
1915 PG_RE_THROW();
1916 }
1917 PG_END_TRY();
1918
1919 return processed;
1920 }
1921
1922 /*
1923 * Clean up storage and release resources for COPY TO.
1924 */
1925 static void
EndCopyTo(CopyState cstate)1926 EndCopyTo(CopyState cstate)
1927 {
1928 if (cstate->queryDesc != NULL)
1929 {
1930 /* Close down the query and free resources. */
1931 ExecutorFinish(cstate->queryDesc);
1932 ExecutorEnd(cstate->queryDesc);
1933 FreeQueryDesc(cstate->queryDesc);
1934 PopActiveSnapshot();
1935 }
1936
1937 /* Clean up storage */
1938 EndCopy(cstate);
1939 }
1940
1941 /*
1942 * Copy from relation or query TO file.
1943 */
1944 static uint64
CopyTo(CopyState cstate)1945 CopyTo(CopyState cstate)
1946 {
1947 TupleDesc tupDesc;
1948 int num_phys_attrs;
1949 Form_pg_attribute *attr;
1950 ListCell *cur;
1951 uint64 processed;
1952
1953 if (cstate->rel)
1954 tupDesc = RelationGetDescr(cstate->rel);
1955 else
1956 tupDesc = cstate->queryDesc->tupDesc;
1957 attr = tupDesc->attrs;
1958 num_phys_attrs = tupDesc->natts;
1959 cstate->null_print_client = cstate->null_print; /* default */
1960
1961 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1962 cstate->fe_msgbuf = makeStringInfo();
1963
1964 /* Get info about the columns we need to process. */
1965 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1966 foreach(cur, cstate->attnumlist)
1967 {
1968 int attnum = lfirst_int(cur);
1969 Oid out_func_oid;
1970 bool isvarlena;
1971
1972 if (cstate->binary)
1973 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1974 &out_func_oid,
1975 &isvarlena);
1976 else
1977 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1978 &out_func_oid,
1979 &isvarlena);
1980 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1981 }
1982
1983 /*
1984 * Create a temporary memory context that we can reset once per row to
1985 * recover palloc'd memory. This avoids any problems with leaks inside
1986 * datatype output routines, and should be faster than retail pfree's
1987 * anyway. (We don't need a whole econtext as CopyFrom does.)
1988 */
1989 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1990 "COPY TO",
1991 ALLOCSET_DEFAULT_SIZES);
1992
1993 if (cstate->binary)
1994 {
1995 /* Generate header for a binary copy */
1996 int32 tmp;
1997
1998 /* Signature */
1999 CopySendData(cstate, BinarySignature, 11);
2000 /* Flags field */
2001 tmp = 0;
2002 if (cstate->oids)
2003 tmp |= (1 << 16);
2004 CopySendInt32(cstate, tmp);
2005 /* No header extension */
2006 tmp = 0;
2007 CopySendInt32(cstate, tmp);
2008 }
2009 else
2010 {
2011 /*
2012 * For non-binary copy, we need to convert null_print to file
2013 * encoding, because it will be sent directly with CopySendString.
2014 */
2015 if (cstate->need_transcoding)
2016 cstate->null_print_client = pg_server_to_any(cstate->null_print,
2017 cstate->null_print_len,
2018 cstate->file_encoding);
2019
2020 /* if a header has been requested send the line */
2021 if (cstate->header_line)
2022 {
2023 bool hdr_delim = false;
2024
2025 foreach(cur, cstate->attnumlist)
2026 {
2027 int attnum = lfirst_int(cur);
2028 char *colname;
2029
2030 if (hdr_delim)
2031 CopySendChar(cstate, cstate->delim[0]);
2032 hdr_delim = true;
2033
2034 colname = NameStr(attr[attnum - 1]->attname);
2035
2036 CopyAttributeOutCSV(cstate, colname, false,
2037 list_length(cstate->attnumlist) == 1);
2038 }
2039
2040 CopySendEndOfRow(cstate);
2041 }
2042 }
2043
2044 if (cstate->rel)
2045 {
2046 Datum *values;
2047 bool *nulls;
2048 HeapScanDesc scandesc;
2049 HeapTuple tuple;
2050
2051 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2052 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2053
2054 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2055
2056 processed = 0;
2057 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2058 {
2059 CHECK_FOR_INTERRUPTS();
2060
2061 /* Deconstruct the tuple ... faster than repeated heap_getattr */
2062 heap_deform_tuple(tuple, tupDesc, values, nulls);
2063
2064 /* Format and send the data */
2065 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2066 processed++;
2067 }
2068
2069 heap_endscan(scandesc);
2070
2071 pfree(values);
2072 pfree(nulls);
2073 }
2074 else
2075 {
2076 /* run the plan --- the dest receiver will send tuples */
2077 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2078 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2079 }
2080
2081 if (cstate->binary)
2082 {
2083 /* Generate trailer for a binary copy */
2084 CopySendInt16(cstate, -1);
2085 /* Need to flush out the trailer */
2086 CopySendEndOfRow(cstate);
2087 }
2088
2089 MemoryContextDelete(cstate->rowcontext);
2090
2091 return processed;
2092 }
2093
2094 /*
2095 * Emit one row during CopyTo().
2096 */
2097 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2098 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2099 {
2100 bool need_delim = false;
2101 FmgrInfo *out_functions = cstate->out_functions;
2102 MemoryContext oldcontext;
2103 ListCell *cur;
2104 char *string;
2105
2106 MemoryContextReset(cstate->rowcontext);
2107 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2108
2109 if (cstate->binary)
2110 {
2111 /* Binary per-tuple header */
2112 CopySendInt16(cstate, list_length(cstate->attnumlist));
2113 /* Send OID if wanted --- note attnumlist doesn't include it */
2114 if (cstate->oids)
2115 {
2116 /* Hack --- assume Oid is same size as int32 */
2117 CopySendInt32(cstate, sizeof(int32));
2118 CopySendInt32(cstate, tupleOid);
2119 }
2120 }
2121 else
2122 {
2123 /* Text format has no per-tuple header, but send OID if wanted */
2124 /* Assume digits don't need any quoting or encoding conversion */
2125 if (cstate->oids)
2126 {
2127 string = DatumGetCString(DirectFunctionCall1(oidout,
2128 ObjectIdGetDatum(tupleOid)));
2129 CopySendString(cstate, string);
2130 need_delim = true;
2131 }
2132 }
2133
2134 foreach(cur, cstate->attnumlist)
2135 {
2136 int attnum = lfirst_int(cur);
2137 Datum value = values[attnum - 1];
2138 bool isnull = nulls[attnum - 1];
2139
2140 if (!cstate->binary)
2141 {
2142 if (need_delim)
2143 CopySendChar(cstate, cstate->delim[0]);
2144 need_delim = true;
2145 }
2146
2147 if (isnull)
2148 {
2149 if (!cstate->binary)
2150 CopySendString(cstate, cstate->null_print_client);
2151 else
2152 CopySendInt32(cstate, -1);
2153 }
2154 else
2155 {
2156 if (!cstate->binary)
2157 {
2158 string = OutputFunctionCall(&out_functions[attnum - 1],
2159 value);
2160 if (cstate->csv_mode)
2161 CopyAttributeOutCSV(cstate, string,
2162 cstate->force_quote_flags[attnum - 1],
2163 list_length(cstate->attnumlist) == 1);
2164 else
2165 CopyAttributeOutText(cstate, string);
2166 }
2167 else
2168 {
2169 bytea *outputbytes;
2170
2171 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2172 value);
2173 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2174 CopySendData(cstate, VARDATA(outputbytes),
2175 VARSIZE(outputbytes) - VARHDRSZ);
2176 }
2177 }
2178 }
2179
2180 CopySendEndOfRow(cstate);
2181
2182 MemoryContextSwitchTo(oldcontext);
2183 }
2184
2185
2186 /*
2187 * error context callback for COPY FROM
2188 *
2189 * The argument for the error context must be CopyState.
2190 */
2191 void
CopyFromErrorCallback(void * arg)2192 CopyFromErrorCallback(void *arg)
2193 {
2194 CopyState cstate = (CopyState) arg;
2195 char curlineno_str[32];
2196
2197 snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2198 cstate->cur_lineno);
2199
2200 if (cstate->binary)
2201 {
2202 /* can't usefully display the data */
2203 if (cstate->cur_attname)
2204 errcontext("COPY %s, line %s, column %s",
2205 cstate->cur_relname, curlineno_str,
2206 cstate->cur_attname);
2207 else
2208 errcontext("COPY %s, line %s",
2209 cstate->cur_relname, curlineno_str);
2210 }
2211 else
2212 {
2213 if (cstate->cur_attname && cstate->cur_attval)
2214 {
2215 /* error is relevant to a particular column */
2216 char *attval;
2217
2218 attval = limit_printout_length(cstate->cur_attval);
2219 errcontext("COPY %s, line %s, column %s: \"%s\"",
2220 cstate->cur_relname, curlineno_str,
2221 cstate->cur_attname, attval);
2222 pfree(attval);
2223 }
2224 else if (cstate->cur_attname)
2225 {
2226 /* error is relevant to a particular column, value is NULL */
2227 errcontext("COPY %s, line %s, column %s: null input",
2228 cstate->cur_relname, curlineno_str,
2229 cstate->cur_attname);
2230 }
2231 else
2232 {
2233 /*
2234 * Error is relevant to a particular line.
2235 *
2236 * If line_buf still contains the correct line, and it's already
2237 * transcoded, print it. If it's still in a foreign encoding, it's
2238 * quite likely that the error is precisely a failure to do
2239 * encoding conversion (ie, bad data). We dare not try to convert
2240 * it, and at present there's no way to regurgitate it without
2241 * conversion. So we have to punt and just report the line number.
2242 */
2243 if (cstate->line_buf_valid &&
2244 (cstate->line_buf_converted || !cstate->need_transcoding))
2245 {
2246 char *lineval;
2247
2248 lineval = limit_printout_length(cstate->line_buf.data);
2249 errcontext("COPY %s, line %s: \"%s\"",
2250 cstate->cur_relname, curlineno_str, lineval);
2251 pfree(lineval);
2252 }
2253 else
2254 {
2255 errcontext("COPY %s, line %s",
2256 cstate->cur_relname, curlineno_str);
2257 }
2258 }
2259 }
2260 }
2261
2262 /*
2263 * Make sure we don't print an unreasonable amount of COPY data in a message.
2264 *
2265 * It would seem a lot easier to just use the sprintf "precision" limit to
2266 * truncate the string. However, some versions of glibc have a bug/misfeature
2267 * that vsnprintf will always fail (return -1) if it is asked to truncate
2268 * a string that contains invalid byte sequences for the current encoding.
2269 * So, do our own truncation. We return a pstrdup'd copy of the input.
2270 */
2271 static char *
limit_printout_length(const char * str)2272 limit_printout_length(const char *str)
2273 {
2274 #define MAX_COPY_DATA_DISPLAY 100
2275
2276 int slen = strlen(str);
2277 int len;
2278 char *res;
2279
2280 /* Fast path if definitely okay */
2281 if (slen <= MAX_COPY_DATA_DISPLAY)
2282 return pstrdup(str);
2283
2284 /* Apply encoding-dependent truncation */
2285 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2286
2287 /*
2288 * Truncate, and add "..." to show we truncated the input.
2289 */
2290 res = (char *) palloc(len + 4);
2291 memcpy(res, str, len);
2292 strcpy(res + len, "...");
2293
2294 return res;
2295 }
2296
2297 /*
2298 * Copy FROM file to relation.
2299 */
2300 uint64
CopyFrom(CopyState cstate)2301 CopyFrom(CopyState cstate)
2302 {
2303 HeapTuple tuple;
2304 TupleDesc tupDesc;
2305 Datum *values;
2306 bool *nulls;
2307 ResultRelInfo *resultRelInfo;
2308 ResultRelInfo *saved_resultRelInfo = NULL;
2309 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2310 ExprContext *econtext;
2311 TupleTableSlot *myslot;
2312 MemoryContext oldcontext = CurrentMemoryContext;
2313
2314 ErrorContextCallback errcallback;
2315 CommandId mycid = GetCurrentCommandId(true);
2316 int hi_options = 0; /* start with default heap_insert options */
2317 BulkInsertState bistate;
2318 uint64 processed = 0;
2319 bool useHeapMultiInsert;
2320 int nBufferedTuples = 0;
2321 int prev_leaf_part_index = -1;
2322
2323 #define MAX_BUFFERED_TUPLES 1000
2324 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2325 Size bufferedTuplesSize = 0;
2326 uint64 firstBufferedLineNo = 0;
2327
2328 Assert(cstate->rel);
2329
2330 /*
2331 * The target must be a plain relation or have an INSTEAD OF INSERT row
2332 * trigger. (Currently, such triggers are only allowed on views, so we
2333 * only hint about them in the view case.)
2334 */
2335 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2336 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2337 !(cstate->rel->trigdesc &&
2338 cstate->rel->trigdesc->trig_insert_instead_row))
2339 {
2340 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2341 ereport(ERROR,
2342 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2343 errmsg("cannot copy to view \"%s\"",
2344 RelationGetRelationName(cstate->rel)),
2345 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2346 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2347 ereport(ERROR,
2348 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2349 errmsg("cannot copy to materialized view \"%s\"",
2350 RelationGetRelationName(cstate->rel))));
2351 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2352 ereport(ERROR,
2353 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2354 errmsg("cannot copy to foreign table \"%s\"",
2355 RelationGetRelationName(cstate->rel))));
2356 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2357 ereport(ERROR,
2358 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2359 errmsg("cannot copy to sequence \"%s\"",
2360 RelationGetRelationName(cstate->rel))));
2361 else
2362 ereport(ERROR,
2363 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2364 errmsg("cannot copy to non-table relation \"%s\"",
2365 RelationGetRelationName(cstate->rel))));
2366 }
2367
2368 tupDesc = RelationGetDescr(cstate->rel);
2369
2370 /*----------
2371 * Check to see if we can avoid writing WAL
2372 *
2373 * If archive logging/streaming is not enabled *and* either
2374 * - table was created in same transaction as this COPY
2375 * - data is being written to relfilenode created in this transaction
2376 * then we can skip writing WAL. It's safe because if the transaction
2377 * doesn't commit, we'll discard the table (or the new relfilenode file).
2378 * If it does commit, we'll have done the heap_sync at the bottom of this
2379 * routine first.
2380 *
2381 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2382 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2383 * can be cleared before the end of the transaction. The exact case is
2384 * when a relation sets a new relfilenode twice in same transaction, yet
2385 * the second one fails in an aborted subtransaction, e.g.
2386 *
2387 * BEGIN;
2388 * TRUNCATE t;
2389 * SAVEPOINT save;
2390 * TRUNCATE t;
2391 * ROLLBACK TO save;
2392 * COPY ...
2393 *
2394 * Also, if the target file is new-in-transaction, we assume that checking
2395 * FSM for free space is a waste of time, even if we must use WAL because
2396 * of archiving. This could possibly be wrong, but it's unlikely.
2397 *
2398 * The comments for heap_insert and RelationGetBufferForTuple specify that
2399 * skipping WAL logging is only safe if we ensure that our tuples do not
2400 * go into pages containing tuples from any other transactions --- but this
2401 * must be the case if we have a new table or new relfilenode, so we need
2402 * no additional work to enforce that.
2403 *
2404 * We currently don't support this optimization if the COPY target is a
2405 * partitioned table as we currently only lazily initialize partition
2406 * information when routing the first tuple to the partition. We cannot
2407 * know at this stage if we can perform this optimization. It should be
2408 * possible to improve on this, but it does mean maintaining heap insert
2409 * option flags per partition and setting them when we first open the
2410 * partition.
2411 *
2412 * This optimization is not supported for relation types which do not
2413 * have any physical storage, with views entering in this category.
2414 * Partitioned tables are not supported as per the description above.
2415 *----------
2416 */
2417 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2418 if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2419 cstate->rel->rd_rel->relkind != RELKIND_VIEW &&
2420 (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2421 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
2422 {
2423 hi_options |= HEAP_INSERT_SKIP_FSM;
2424 if (!XLogIsNeeded())
2425 hi_options |= HEAP_INSERT_SKIP_WAL;
2426 }
2427
2428 /*
2429 * Optimize if new relfilenode was created in this subxact or one of its
2430 * committed children and we won't see those rows later as part of an
2431 * earlier scan or command. The subxact test ensures that if this subxact
2432 * aborts then the frozen rows won't be visible after xact cleanup. Note
2433 * that the stronger test of exactly which subtransaction created it is
2434 * crucial for correctness of this optimization. The test for an earlier
2435 * scan or command tolerates false negatives. FREEZE causes other sessions
2436 * to see rows they would not see under MVCC, and a false negative merely
2437 * spreads that anomaly to the current session.
2438 */
2439 if (cstate->freeze)
2440 {
2441 /*
2442 * We currently disallow COPY FREEZE on partitioned tables. The
2443 * reason for this is that we've simply not yet opened the partitions
2444 * to determine if the optimization can be applied to them. We could
2445 * go and open them all here, but doing so may be quite a costly
2446 * overhead for small copies. In any case, we may just end up routing
2447 * tuples to a small number of partitions. It seems better just to
2448 * raise an ERROR for partitioned tables.
2449 */
2450 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2451 {
2452 ereport(ERROR,
2453 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2454 errmsg("cannot perform FREEZE on a partitioned table")));
2455 }
2456
2457 /*
2458 * Tolerate one registration for the benefit of FirstXactSnapshot.
2459 * Scan-bearing queries generally create at least two registrations,
2460 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2461 * Clear CatalogSnapshot to avoid counting its registration. We'll
2462 * still detect ongoing catalog scans, each of which separately
2463 * registers the snapshot it uses.
2464 */
2465 InvalidateCatalogSnapshot();
2466 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2467 ereport(ERROR,
2468 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2469 errmsg("cannot perform FREEZE because of prior transaction activity")));
2470
2471 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2472 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2473 ereport(ERROR,
2474 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2475 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2476
2477 hi_options |= HEAP_INSERT_FROZEN;
2478 }
2479
2480 /*
2481 * We need a ResultRelInfo so we can use the regular executor's
2482 * index-entry-making machinery. (There used to be a huge amount of code
2483 * here that basically duplicated execUtils.c ...)
2484 */
2485 resultRelInfo = makeNode(ResultRelInfo);
2486 InitResultRelInfo(resultRelInfo,
2487 cstate->rel,
2488 1, /* dummy rangetable index */
2489 NULL,
2490 0);
2491
2492 ExecOpenIndices(resultRelInfo, false);
2493
2494 estate->es_result_relations = resultRelInfo;
2495 estate->es_num_result_relations = 1;
2496 estate->es_result_relation_info = resultRelInfo;
2497 estate->es_range_table = cstate->range_table;
2498
2499 /* Set up a tuple slot too */
2500 myslot = ExecInitExtraTupleSlot(estate);
2501 ExecSetSlotDescriptor(myslot, tupDesc);
2502 /* Triggers might need a slot as well */
2503 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2504
2505 /* Prepare to catch AFTER triggers. */
2506 AfterTriggerBeginQuery();
2507
2508 /*
2509 * If there are any triggers with transition tables on the named relation,
2510 * we need to be prepared to capture transition tuples.
2511 */
2512 cstate->transition_capture =
2513 MakeTransitionCaptureState(cstate->rel->trigdesc,
2514 RelationGetRelid(cstate->rel),
2515 CMD_INSERT);
2516
2517 /*
2518 * If the named relation is a partitioned table, initialize state for
2519 * CopyFrom tuple routing.
2520 */
2521 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2522 {
2523 PartitionDispatch *partition_dispatch_info;
2524 ResultRelInfo *partitions;
2525 TupleConversionMap **partition_tupconv_maps;
2526 TupleTableSlot *partition_tuple_slot;
2527 int num_parted,
2528 num_partitions;
2529
2530 ExecSetupPartitionTupleRouting(cstate->rel,
2531 1,
2532 estate,
2533 &partition_dispatch_info,
2534 &partitions,
2535 &partition_tupconv_maps,
2536 &partition_tuple_slot,
2537 &num_parted, &num_partitions);
2538 cstate->partition_dispatch_info = partition_dispatch_info;
2539 cstate->num_dispatch = num_parted;
2540 cstate->partitions = partitions;
2541 cstate->num_partitions = num_partitions;
2542 cstate->partition_tupconv_maps = partition_tupconv_maps;
2543 cstate->partition_tuple_slot = partition_tuple_slot;
2544
2545 /*
2546 * If we are capturing transition tuples, they may need to be
2547 * converted from partition format back to partitioned table format
2548 * (this is only ever necessary if a BEFORE trigger modifies the
2549 * tuple).
2550 */
2551 if (cstate->transition_capture != NULL)
2552 {
2553 int i;
2554
2555 cstate->transition_tupconv_maps = (TupleConversionMap **)
2556 palloc0(sizeof(TupleConversionMap *) * cstate->num_partitions);
2557 for (i = 0; i < cstate->num_partitions; ++i)
2558 {
2559 cstate->transition_tupconv_maps[i] =
2560 convert_tuples_by_name(RelationGetDescr(cstate->partitions[i].ri_RelationDesc),
2561 RelationGetDescr(cstate->rel),
2562 gettext_noop("could not convert row type"));
2563 }
2564 }
2565 }
2566
2567 /*
2568 * It's more efficient to prepare a bunch of tuples for insertion, and
2569 * insert them in one heap_multi_insert() call, than call heap_insert()
2570 * separately for every tuple. However, we can't do that if there are
2571 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2572 * expressions. Such triggers or expressions might query the table we're
2573 * inserting to, and act differently if the tuples that have already been
2574 * processed and prepared for insertion are not there. We also can't do
2575 * it if the table is partitioned.
2576 */
2577 if ((resultRelInfo->ri_TrigDesc != NULL &&
2578 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2579 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2580 cstate->partition_dispatch_info != NULL ||
2581 cstate->volatile_defexprs)
2582 {
2583 useHeapMultiInsert = false;
2584 }
2585 else
2586 {
2587 useHeapMultiInsert = true;
2588 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2589 }
2590
2591 /*
2592 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2593 * should do this for COPY, since it's not really an "INSERT" statement as
2594 * such. However, executing these triggers maintains consistency with the
2595 * EACH ROW triggers that we already fire on COPY.
2596 */
2597 ExecBSInsertTriggers(estate, resultRelInfo);
2598
2599 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2600 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2601
2602 bistate = GetBulkInsertState();
2603 econtext = GetPerTupleExprContext(estate);
2604
2605 /* Set up callback to identify error line number */
2606 errcallback.callback = CopyFromErrorCallback;
2607 errcallback.arg = (void *) cstate;
2608 errcallback.previous = error_context_stack;
2609 error_context_stack = &errcallback;
2610
2611 for (;;)
2612 {
2613 TupleTableSlot *slot;
2614 bool skip_tuple;
2615 Oid loaded_oid = InvalidOid;
2616
2617 CHECK_FOR_INTERRUPTS();
2618
2619 if (nBufferedTuples == 0)
2620 {
2621 /*
2622 * Reset the per-tuple exprcontext. We can only do this if the
2623 * tuple buffer is empty. (Calling the context the per-tuple
2624 * memory context is a bit of a misnomer now.)
2625 */
2626 ResetPerTupleExprContext(estate);
2627 }
2628
2629 /* Switch into its memory context */
2630 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2631
2632 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2633 break;
2634
2635 /* And now we can form the input tuple. */
2636 tuple = heap_form_tuple(tupDesc, values, nulls);
2637
2638 if (loaded_oid != InvalidOid)
2639 HeapTupleSetOid(tuple, loaded_oid);
2640
2641 /*
2642 * Constraints might reference the tableoid column, so initialize
2643 * t_tableOid before evaluating them.
2644 */
2645 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2646
2647 /* Triggers and stuff need to be invoked in query context. */
2648 MemoryContextSwitchTo(oldcontext);
2649
2650 /* Place tuple in tuple slot --- but slot shouldn't free it */
2651 slot = myslot;
2652 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2653
2654 /* Determine the partition to heap_insert the tuple into */
2655 if (cstate->partition_dispatch_info)
2656 {
2657 int leaf_part_index;
2658 TupleConversionMap *map;
2659
2660 /*
2661 * Away we go ... If we end up not finding a partition after all,
2662 * ExecFindPartition() does not return and errors out instead.
2663 * Otherwise, the returned value is to be used as an index into
2664 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2665 * will get us the ResultRelInfo and TupleConversionMap for the
2666 * partition, respectively.
2667 */
2668 leaf_part_index = ExecFindPartition(resultRelInfo,
2669 cstate->partition_dispatch_info,
2670 slot,
2671 estate);
2672 Assert(leaf_part_index >= 0 &&
2673 leaf_part_index < cstate->num_partitions);
2674
2675 /*
2676 * If this tuple is mapped to a partition that is not same as the
2677 * previous one, we'd better make the bulk insert mechanism gets a
2678 * new buffer.
2679 */
2680 if (prev_leaf_part_index != leaf_part_index)
2681 {
2682 ReleaseBulkInsertStatePin(bistate);
2683 prev_leaf_part_index = leaf_part_index;
2684 }
2685
2686 /*
2687 * Save the old ResultRelInfo and switch to the one corresponding
2688 * to the selected partition.
2689 */
2690 saved_resultRelInfo = resultRelInfo;
2691 resultRelInfo = cstate->partitions + leaf_part_index;
2692
2693 /* We do not yet have a way to insert into a foreign partition */
2694 if (resultRelInfo->ri_FdwRoutine)
2695 ereport(ERROR,
2696 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2697 errmsg("cannot route inserted tuples to a foreign table")));
2698
2699 /*
2700 * For ExecInsertIndexTuples() to work on the partition's indexes
2701 */
2702 estate->es_result_relation_info = resultRelInfo;
2703
2704 /*
2705 * If we're capturing transition tuples, we might need to convert
2706 * from the partition rowtype to parent rowtype.
2707 */
2708 if (cstate->transition_capture != NULL)
2709 {
2710 if (resultRelInfo->ri_TrigDesc &&
2711 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2712 {
2713 /*
2714 * If there are any BEFORE triggers on the partition,
2715 * we'll have to be ready to convert their result back to
2716 * tuplestore format.
2717 */
2718 cstate->transition_capture->tcs_original_insert_tuple = NULL;
2719 cstate->transition_capture->tcs_map =
2720 cstate->transition_tupconv_maps[leaf_part_index];
2721 }
2722 else
2723 {
2724 /*
2725 * Otherwise, just remember the original unconverted
2726 * tuple, to avoid a needless round trip conversion.
2727 */
2728 cstate->transition_capture->tcs_original_insert_tuple = tuple;
2729 cstate->transition_capture->tcs_map = NULL;
2730 }
2731 }
2732
2733 /*
2734 * We might need to convert from the parent rowtype to the
2735 * partition rowtype.
2736 */
2737 map = cstate->partition_tupconv_maps[leaf_part_index];
2738 if (map)
2739 {
2740 Relation partrel = resultRelInfo->ri_RelationDesc;
2741
2742 tuple = do_convert_tuple(tuple, map);
2743
2744 /*
2745 * We must use the partition's tuple descriptor from this
2746 * point on. Use a dedicated slot from this point on until
2747 * we're finished dealing with the partition.
2748 */
2749 slot = cstate->partition_tuple_slot;
2750 Assert(slot != NULL);
2751 ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2752 ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2753 }
2754
2755 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2756 }
2757
2758 skip_tuple = false;
2759
2760 /* BEFORE ROW INSERT Triggers */
2761 if (resultRelInfo->ri_TrigDesc &&
2762 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2763 {
2764 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2765
2766 if (slot == NULL) /* "do nothing" */
2767 skip_tuple = true;
2768 else /* trigger might have changed tuple */
2769 tuple = ExecMaterializeSlot(slot);
2770 }
2771
2772 if (!skip_tuple)
2773 {
2774 if (resultRelInfo->ri_TrigDesc &&
2775 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2776 {
2777 /* Pass the data to the INSTEAD ROW INSERT trigger */
2778 ExecIRInsertTriggers(estate, resultRelInfo, slot);
2779 }
2780 else
2781 {
2782 /*
2783 * We always check the partition constraint, including when
2784 * the tuple got here via tuple-routing. However we don't
2785 * need to in the latter case if no BR trigger is defined on
2786 * the partition. Note that a BR trigger might modify the
2787 * tuple such that the partition constraint is no longer
2788 * satisfied, so we need to check in that case.
2789 */
2790 bool check_partition_constr =
2791 (resultRelInfo->ri_PartitionCheck != NIL);
2792
2793 if (saved_resultRelInfo != NULL &&
2794 !(resultRelInfo->ri_TrigDesc &&
2795 resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2796 check_partition_constr = false;
2797
2798 /* Check the constraints of the tuple */
2799 if (resultRelInfo->ri_RelationDesc->rd_att->constr ||
2800 check_partition_constr)
2801 ExecConstraints(resultRelInfo, slot, estate);
2802
2803 if (useHeapMultiInsert)
2804 {
2805 /* Add this tuple to the tuple buffer */
2806 if (nBufferedTuples == 0)
2807 firstBufferedLineNo = cstate->cur_lineno;
2808 bufferedTuples[nBufferedTuples++] = tuple;
2809 bufferedTuplesSize += tuple->t_len;
2810
2811 /*
2812 * If the buffer filled up, flush it. Also flush if the
2813 * total size of all the tuples in the buffer becomes
2814 * large, to avoid using large amounts of memory for the
2815 * buffer when the tuples are exceptionally wide.
2816 */
2817 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2818 bufferedTuplesSize > 65535)
2819 {
2820 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2821 resultRelInfo, myslot, bistate,
2822 nBufferedTuples, bufferedTuples,
2823 firstBufferedLineNo);
2824 nBufferedTuples = 0;
2825 bufferedTuplesSize = 0;
2826 }
2827 }
2828 else
2829 {
2830 List *recheckIndexes = NIL;
2831
2832 /* OK, store the tuple and create index entries for it */
2833 heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2834 hi_options, bistate);
2835
2836 if (resultRelInfo->ri_NumIndices > 0)
2837 recheckIndexes = ExecInsertIndexTuples(slot,
2838 &(tuple->t_self),
2839 estate,
2840 false,
2841 NULL,
2842 NIL);
2843
2844 /* AFTER ROW INSERT Triggers */
2845 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2846 recheckIndexes, cstate->transition_capture);
2847
2848 list_free(recheckIndexes);
2849 }
2850 }
2851
2852 /*
2853 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2854 * this is the same definition used by execMain.c for counting
2855 * tuples inserted by an INSERT command.
2856 */
2857 processed++;
2858 }
2859
2860 /* Restore the saved ResultRelInfo */
2861 if (saved_resultRelInfo)
2862 {
2863 resultRelInfo = saved_resultRelInfo;
2864 estate->es_result_relation_info = resultRelInfo;
2865 }
2866 }
2867
2868 /* Flush any remaining buffered tuples */
2869 if (nBufferedTuples > 0)
2870 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2871 resultRelInfo, myslot, bistate,
2872 nBufferedTuples, bufferedTuples,
2873 firstBufferedLineNo);
2874
2875 /* Done, clean up */
2876 error_context_stack = errcallback.previous;
2877
2878 FreeBulkInsertState(bistate);
2879
2880 MemoryContextSwitchTo(oldcontext);
2881
2882 /*
2883 * In the old protocol, tell pqcomm that we can process normal protocol
2884 * messages again.
2885 */
2886 if (cstate->copy_dest == COPY_OLD_FE)
2887 pq_endmsgread();
2888
2889 /* Execute AFTER STATEMENT insertion triggers */
2890 ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2891
2892 /* Handle queued AFTER triggers */
2893 AfterTriggerEndQuery(estate);
2894
2895 pfree(values);
2896 pfree(nulls);
2897
2898 ExecResetTupleTable(estate->es_tupleTable, false);
2899
2900 ExecCloseIndices(resultRelInfo);
2901
2902 /* Close all the partitioned tables, leaf partitions, and their indices */
2903 if (cstate->partition_dispatch_info)
2904 {
2905 int i;
2906
2907 /*
2908 * Remember cstate->partition_dispatch_info[0] corresponds to the root
2909 * partitioned table, which we must not try to close, because it is
2910 * the main target table of COPY that will be closed eventually by
2911 * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2912 */
2913 for (i = 1; i < cstate->num_dispatch; i++)
2914 {
2915 PartitionDispatch pd = cstate->partition_dispatch_info[i];
2916
2917 heap_close(pd->reldesc, NoLock);
2918 ExecDropSingleTupleTableSlot(pd->tupslot);
2919 }
2920 for (i = 0; i < cstate->num_partitions; i++)
2921 {
2922 ResultRelInfo *resultRelInfo = cstate->partitions + i;
2923
2924 ExecCloseIndices(resultRelInfo);
2925 heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2926 }
2927
2928 /* Release the standalone partition tuple descriptor */
2929 ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
2930 }
2931
2932 /* Close any trigger target relations */
2933 ExecCleanUpTriggerState(estate);
2934
2935 FreeExecutorState(estate);
2936
2937 /*
2938 * If we skipped writing WAL, then we need to sync the heap (but not
2939 * indexes since those use WAL anyway)
2940 */
2941 if (hi_options & HEAP_INSERT_SKIP_WAL)
2942 heap_sync(cstate->rel);
2943
2944 return processed;
2945 }
2946
2947 /*
2948 * A subroutine of CopyFrom, to write the current batch of buffered heap
2949 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2950 * triggers.
2951 */
2952 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2953 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2954 int hi_options, ResultRelInfo *resultRelInfo,
2955 TupleTableSlot *myslot, BulkInsertState bistate,
2956 int nBufferedTuples, HeapTuple *bufferedTuples,
2957 uint64 firstBufferedLineNo)
2958 {
2959 MemoryContext oldcontext;
2960 int i;
2961 uint64 save_cur_lineno;
2962
2963 /*
2964 * Print error context information correctly, if one of the operations
2965 * below fail.
2966 */
2967 cstate->line_buf_valid = false;
2968 save_cur_lineno = cstate->cur_lineno;
2969
2970 /*
2971 * heap_multi_insert leaks memory, so switch to short-lived memory context
2972 * before calling it.
2973 */
2974 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2975 heap_multi_insert(cstate->rel,
2976 bufferedTuples,
2977 nBufferedTuples,
2978 mycid,
2979 hi_options,
2980 bistate);
2981 MemoryContextSwitchTo(oldcontext);
2982
2983 /*
2984 * If there are any indexes, update them for all the inserted tuples, and
2985 * run AFTER ROW INSERT triggers.
2986 */
2987 if (resultRelInfo->ri_NumIndices > 0)
2988 {
2989 for (i = 0; i < nBufferedTuples; i++)
2990 {
2991 List *recheckIndexes;
2992
2993 cstate->cur_lineno = firstBufferedLineNo + i;
2994 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2995 recheckIndexes =
2996 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2997 estate, false, NULL, NIL);
2998 ExecARInsertTriggers(estate, resultRelInfo,
2999 bufferedTuples[i],
3000 recheckIndexes, cstate->transition_capture);
3001 list_free(recheckIndexes);
3002 }
3003 }
3004
3005 /*
3006 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
3007 * anyway.
3008 */
3009 else if (resultRelInfo->ri_TrigDesc != NULL &&
3010 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
3011 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
3012 {
3013 for (i = 0; i < nBufferedTuples; i++)
3014 {
3015 cstate->cur_lineno = firstBufferedLineNo + i;
3016 ExecARInsertTriggers(estate, resultRelInfo,
3017 bufferedTuples[i],
3018 NIL, cstate->transition_capture);
3019 }
3020 }
3021
3022 /* reset cur_lineno to where we were */
3023 cstate->cur_lineno = save_cur_lineno;
3024 }
3025
3026 /*
3027 * Setup to read tuples from a file for COPY FROM.
3028 *
3029 * 'rel': Used as a template for the tuples
3030 * 'filename': Name of server-local file to read
3031 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3032 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3033 *
3034 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3035 */
3036 CopyState
BeginCopyFrom(ParseState * pstate,Relation rel,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)3037 BeginCopyFrom(ParseState *pstate,
3038 Relation rel,
3039 const char *filename,
3040 bool is_program,
3041 copy_data_source_cb data_source_cb,
3042 List *attnamelist,
3043 List *options)
3044 {
3045 CopyState cstate;
3046 bool pipe = (filename == NULL);
3047 TupleDesc tupDesc;
3048 Form_pg_attribute *attr;
3049 AttrNumber num_phys_attrs,
3050 num_defaults;
3051 FmgrInfo *in_functions;
3052 Oid *typioparams;
3053 int attnum;
3054 Oid in_func_oid;
3055 int *defmap;
3056 ExprState **defexprs;
3057 MemoryContext oldcontext;
3058 bool volatile_defexprs;
3059
3060 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3061 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3062
3063 /* Initialize state variables */
3064 cstate->reached_eof = false;
3065 cstate->eol_type = EOL_UNKNOWN;
3066 cstate->cur_relname = RelationGetRelationName(cstate->rel);
3067 cstate->cur_lineno = 0;
3068 cstate->cur_attname = NULL;
3069 cstate->cur_attval = NULL;
3070
3071 /* Set up variables to avoid per-attribute overhead. */
3072 initStringInfo(&cstate->attribute_buf);
3073 initStringInfo(&cstate->line_buf);
3074 cstate->line_buf_converted = false;
3075 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3076 cstate->raw_buf_index = cstate->raw_buf_len = 0;
3077
3078 /* Assign range table, we'll need it in CopyFrom. */
3079 if (pstate)
3080 cstate->range_table = pstate->p_rtable;
3081
3082 tupDesc = RelationGetDescr(cstate->rel);
3083 attr = tupDesc->attrs;
3084 num_phys_attrs = tupDesc->natts;
3085 num_defaults = 0;
3086 volatile_defexprs = false;
3087
3088 /*
3089 * Pick up the required catalog information for each attribute in the
3090 * relation, including the input function, the element type (to pass to
3091 * the input function), and info about defaults and constraints. (Which
3092 * input function we use depends on text/binary format choice.)
3093 */
3094 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3095 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3096 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3097 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3098
3099 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3100 {
3101 /* We don't need info for dropped attributes */
3102 if (attr[attnum - 1]->attisdropped)
3103 continue;
3104
3105 /* Fetch the input function and typioparam info */
3106 if (cstate->binary)
3107 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
3108 &in_func_oid, &typioparams[attnum - 1]);
3109 else
3110 getTypeInputInfo(attr[attnum - 1]->atttypid,
3111 &in_func_oid, &typioparams[attnum - 1]);
3112 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3113
3114 /* Get default info if needed */
3115 if (!list_member_int(cstate->attnumlist, attnum))
3116 {
3117 /* attribute is NOT to be copied from input */
3118 /* use default value if one exists */
3119 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3120 attnum);
3121
3122 if (defexpr != NULL)
3123 {
3124 /* Run the expression through planner */
3125 defexpr = expression_planner(defexpr);
3126
3127 /* Initialize executable expression in copycontext */
3128 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3129 defmap[num_defaults] = attnum - 1;
3130 num_defaults++;
3131
3132 /*
3133 * If a default expression looks at the table being loaded,
3134 * then it could give the wrong answer when using
3135 * multi-insert. Since database access can be dynamic this is
3136 * hard to test for exactly, so we use the much wider test of
3137 * whether the default expression is volatile. We allow for
3138 * the special case of when the default expression is the
3139 * nextval() of a sequence which in this specific case is
3140 * known to be safe for use with the multi-insert
3141 * optimization. Hence we use this special case function
3142 * checker rather than the standard check for
3143 * contain_volatile_functions().
3144 */
3145 if (!volatile_defexprs)
3146 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3147 }
3148 }
3149 }
3150
3151 /* We keep those variables in cstate. */
3152 cstate->in_functions = in_functions;
3153 cstate->typioparams = typioparams;
3154 cstate->defmap = defmap;
3155 cstate->defexprs = defexprs;
3156 cstate->volatile_defexprs = volatile_defexprs;
3157 cstate->num_defaults = num_defaults;
3158 cstate->is_program = is_program;
3159
3160 if (data_source_cb)
3161 {
3162 cstate->copy_dest = COPY_CALLBACK;
3163 cstate->data_source_cb = data_source_cb;
3164 }
3165 else if (pipe)
3166 {
3167 Assert(!is_program); /* the grammar does not allow this */
3168 if (whereToSendOutput == DestRemote)
3169 ReceiveCopyBegin(cstate);
3170 else
3171 cstate->copy_file = stdin;
3172 }
3173 else
3174 {
3175 cstate->filename = pstrdup(filename);
3176
3177 if (cstate->is_program)
3178 {
3179 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3180 if (cstate->copy_file == NULL)
3181 ereport(ERROR,
3182 (errcode_for_file_access(),
3183 errmsg("could not execute command \"%s\": %m",
3184 cstate->filename)));
3185 }
3186 else
3187 {
3188 struct stat st;
3189
3190 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3191 if (cstate->copy_file == NULL)
3192 {
3193 /* copy errno because ereport subfunctions might change it */
3194 int save_errno = errno;
3195
3196 ereport(ERROR,
3197 (errcode_for_file_access(),
3198 errmsg("could not open file \"%s\" for reading: %m",
3199 cstate->filename),
3200 (save_errno == ENOENT || save_errno == EACCES) ?
3201 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3202 "You may want a client-side facility such as psql's \\copy.") : 0));
3203 }
3204
3205 if (fstat(fileno(cstate->copy_file), &st))
3206 ereport(ERROR,
3207 (errcode_for_file_access(),
3208 errmsg("could not stat file \"%s\": %m",
3209 cstate->filename)));
3210
3211 if (S_ISDIR(st.st_mode))
3212 ereport(ERROR,
3213 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3214 errmsg("\"%s\" is a directory", cstate->filename)));
3215 }
3216 }
3217
3218 if (!cstate->binary)
3219 {
3220 /* must rely on user to tell us... */
3221 cstate->file_has_oids = cstate->oids;
3222 }
3223 else
3224 {
3225 /* Read and verify binary header */
3226 char readSig[11];
3227 int32 tmp;
3228
3229 /* Signature */
3230 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3231 memcmp(readSig, BinarySignature, 11) != 0)
3232 ereport(ERROR,
3233 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3234 errmsg("COPY file signature not recognized")));
3235 /* Flags field */
3236 if (!CopyGetInt32(cstate, &tmp))
3237 ereport(ERROR,
3238 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3239 errmsg("invalid COPY file header (missing flags)")));
3240 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3241 tmp &= ~(1 << 16);
3242 if ((tmp >> 16) != 0)
3243 ereport(ERROR,
3244 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3245 errmsg("unrecognized critical flags in COPY file header")));
3246 /* Header extension length */
3247 if (!CopyGetInt32(cstate, &tmp) ||
3248 tmp < 0)
3249 ereport(ERROR,
3250 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3251 errmsg("invalid COPY file header (missing length)")));
3252 /* Skip extension header, if present */
3253 while (tmp-- > 0)
3254 {
3255 if (CopyGetData(cstate, readSig, 1, 1) != 1)
3256 ereport(ERROR,
3257 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3258 errmsg("invalid COPY file header (wrong length)")));
3259 }
3260 }
3261
3262 if (cstate->file_has_oids && cstate->binary)
3263 {
3264 getTypeBinaryInputInfo(OIDOID,
3265 &in_func_oid, &cstate->oid_typioparam);
3266 fmgr_info(in_func_oid, &cstate->oid_in_function);
3267 }
3268
3269 /* create workspace for CopyReadAttributes results */
3270 if (!cstate->binary)
3271 {
3272 AttrNumber attr_count = list_length(cstate->attnumlist);
3273 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3274
3275 cstate->max_fields = nfields;
3276 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3277 }
3278
3279 MemoryContextSwitchTo(oldcontext);
3280
3281 return cstate;
3282 }
3283
3284 /*
3285 * Read raw fields in the next line for COPY FROM in text or csv mode.
3286 * Return false if no more lines.
3287 *
3288 * An internal temporary buffer is returned via 'fields'. It is valid until
3289 * the next call of the function. Since the function returns all raw fields
3290 * in the input file, 'nfields' could be different from the number of columns
3291 * in the relation.
3292 *
3293 * NOTE: force_not_null option are not applied to the returned fields.
3294 */
3295 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)3296 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3297 {
3298 int fldct;
3299 bool done;
3300
3301 /* only available for text or csv input */
3302 Assert(!cstate->binary);
3303
3304 /* on input just throw the header line away */
3305 if (cstate->cur_lineno == 0 && cstate->header_line)
3306 {
3307 cstate->cur_lineno++;
3308 if (CopyReadLine(cstate))
3309 return false; /* done */
3310 }
3311
3312 cstate->cur_lineno++;
3313
3314 /* Actually read the line into memory here */
3315 done = CopyReadLine(cstate);
3316
3317 /*
3318 * EOF at start of line means we're done. If we see EOF after some
3319 * characters, we act as though it was newline followed by EOF, ie,
3320 * process the line and then exit loop on next iteration.
3321 */
3322 if (done && cstate->line_buf.len == 0)
3323 return false;
3324
3325 /* Parse the line into de-escaped field values */
3326 if (cstate->csv_mode)
3327 fldct = CopyReadAttributesCSV(cstate);
3328 else
3329 fldct = CopyReadAttributesText(cstate);
3330
3331 *fields = cstate->raw_fields;
3332 *nfields = fldct;
3333 return true;
3334 }
3335
3336 /*
3337 * Read next tuple from file for COPY FROM. Return false if no more tuples.
3338 *
3339 * 'econtext' is used to evaluate default expression for each columns not
3340 * read from the file. It can be NULL when no default values are used, i.e.
3341 * when all columns are read from the file.
3342 *
3343 * 'values' and 'nulls' arrays must be the same length as columns of the
3344 * relation passed to BeginCopyFrom. This function fills the arrays.
3345 * Oid of the tuple is returned with 'tupleOid' separately.
3346 */
3347 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3348 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3349 Datum *values, bool *nulls, Oid *tupleOid)
3350 {
3351 TupleDesc tupDesc;
3352 Form_pg_attribute *attr;
3353 AttrNumber num_phys_attrs,
3354 attr_count,
3355 num_defaults = cstate->num_defaults;
3356 FmgrInfo *in_functions = cstate->in_functions;
3357 Oid *typioparams = cstate->typioparams;
3358 int i;
3359 int nfields;
3360 bool isnull;
3361 bool file_has_oids = cstate->file_has_oids;
3362 int *defmap = cstate->defmap;
3363 ExprState **defexprs = cstate->defexprs;
3364
3365 tupDesc = RelationGetDescr(cstate->rel);
3366 attr = tupDesc->attrs;
3367 num_phys_attrs = tupDesc->natts;
3368 attr_count = list_length(cstate->attnumlist);
3369 nfields = file_has_oids ? (attr_count + 1) : attr_count;
3370
3371 /* Initialize all values for row to NULL */
3372 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3373 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3374
3375 if (!cstate->binary)
3376 {
3377 char **field_strings;
3378 ListCell *cur;
3379 int fldct;
3380 int fieldno;
3381 char *string;
3382
3383 /* read raw fields in the next line */
3384 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3385 return false;
3386
3387 /* check for overflowing fields */
3388 if (nfields > 0 && fldct > nfields)
3389 ereport(ERROR,
3390 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3391 errmsg("extra data after last expected column")));
3392
3393 fieldno = 0;
3394
3395 /* Read the OID field if present */
3396 if (file_has_oids)
3397 {
3398 if (fieldno >= fldct)
3399 ereport(ERROR,
3400 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3401 errmsg("missing data for OID column")));
3402 string = field_strings[fieldno++];
3403
3404 if (string == NULL)
3405 ereport(ERROR,
3406 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3407 errmsg("null OID in COPY data")));
3408 else if (cstate->oids && tupleOid != NULL)
3409 {
3410 cstate->cur_attname = "oid";
3411 cstate->cur_attval = string;
3412 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3413 CStringGetDatum(string)));
3414 if (*tupleOid == InvalidOid)
3415 ereport(ERROR,
3416 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3417 errmsg("invalid OID in COPY data")));
3418 cstate->cur_attname = NULL;
3419 cstate->cur_attval = NULL;
3420 }
3421 }
3422
3423 /* Loop to read the user attributes on the line. */
3424 foreach(cur, cstate->attnumlist)
3425 {
3426 int attnum = lfirst_int(cur);
3427 int m = attnum - 1;
3428
3429 if (fieldno >= fldct)
3430 ereport(ERROR,
3431 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3432 errmsg("missing data for column \"%s\"",
3433 NameStr(attr[m]->attname))));
3434 string = field_strings[fieldno++];
3435
3436 if (cstate->convert_select_flags &&
3437 !cstate->convert_select_flags[m])
3438 {
3439 /* ignore input field, leaving column as NULL */
3440 continue;
3441 }
3442
3443 if (cstate->csv_mode)
3444 {
3445 if (string == NULL &&
3446 cstate->force_notnull_flags[m])
3447 {
3448 /*
3449 * FORCE_NOT_NULL option is set and column is NULL -
3450 * convert it to the NULL string.
3451 */
3452 string = cstate->null_print;
3453 }
3454 else if (string != NULL && cstate->force_null_flags[m]
3455 && strcmp(string, cstate->null_print) == 0)
3456 {
3457 /*
3458 * FORCE_NULL option is set and column matches the NULL
3459 * string. It must have been quoted, or otherwise the
3460 * string would already have been set to NULL. Convert it
3461 * to NULL as specified.
3462 */
3463 string = NULL;
3464 }
3465 }
3466
3467 cstate->cur_attname = NameStr(attr[m]->attname);
3468 cstate->cur_attval = string;
3469 values[m] = InputFunctionCall(&in_functions[m],
3470 string,
3471 typioparams[m],
3472 attr[m]->atttypmod);
3473 if (string != NULL)
3474 nulls[m] = false;
3475 cstate->cur_attname = NULL;
3476 cstate->cur_attval = NULL;
3477 }
3478
3479 Assert(fieldno == nfields);
3480 }
3481 else
3482 {
3483 /* binary */
3484 int16 fld_count;
3485 ListCell *cur;
3486
3487 cstate->cur_lineno++;
3488
3489 if (!CopyGetInt16(cstate, &fld_count))
3490 {
3491 /* EOF detected (end of file, or protocol-level EOF) */
3492 return false;
3493 }
3494
3495 if (fld_count == -1)
3496 {
3497 /*
3498 * Received EOF marker. In a V3-protocol copy, wait for the
3499 * protocol-level EOF, and complain if it doesn't come
3500 * immediately. This ensures that we correctly handle CopyFail,
3501 * if client chooses to send that now.
3502 *
3503 * Note that we MUST NOT try to read more data in an old-protocol
3504 * copy, since there is no protocol-level EOF marker then. We
3505 * could go either way for copy from file, but choose to throw
3506 * error if there's data after the EOF marker, for consistency
3507 * with the new-protocol case.
3508 */
3509 char dummy;
3510
3511 if (cstate->copy_dest != COPY_OLD_FE &&
3512 CopyGetData(cstate, &dummy, 1, 1) > 0)
3513 ereport(ERROR,
3514 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3515 errmsg("received copy data after EOF marker")));
3516 return false;
3517 }
3518
3519 if (fld_count != attr_count)
3520 ereport(ERROR,
3521 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3522 errmsg("row field count is %d, expected %d",
3523 (int) fld_count, attr_count)));
3524
3525 if (file_has_oids)
3526 {
3527 Oid loaded_oid;
3528
3529 cstate->cur_attname = "oid";
3530 loaded_oid =
3531 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3532 0,
3533 &cstate->oid_in_function,
3534 cstate->oid_typioparam,
3535 -1,
3536 &isnull));
3537 if (isnull || loaded_oid == InvalidOid)
3538 ereport(ERROR,
3539 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3540 errmsg("invalid OID in COPY data")));
3541 cstate->cur_attname = NULL;
3542 if (cstate->oids && tupleOid != NULL)
3543 *tupleOid = loaded_oid;
3544 }
3545
3546 i = 0;
3547 foreach(cur, cstate->attnumlist)
3548 {
3549 int attnum = lfirst_int(cur);
3550 int m = attnum - 1;
3551
3552 cstate->cur_attname = NameStr(attr[m]->attname);
3553 i++;
3554 values[m] = CopyReadBinaryAttribute(cstate,
3555 i,
3556 &in_functions[m],
3557 typioparams[m],
3558 attr[m]->atttypmod,
3559 &nulls[m]);
3560 cstate->cur_attname = NULL;
3561 }
3562 }
3563
3564 /*
3565 * Now compute and insert any defaults available for the columns not
3566 * provided by the input data. Anything not processed here or above will
3567 * remain NULL.
3568 */
3569 for (i = 0; i < num_defaults; i++)
3570 {
3571 /*
3572 * The caller must supply econtext and have switched into the
3573 * per-tuple memory context in it.
3574 */
3575 Assert(econtext != NULL);
3576 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3577
3578 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3579 &nulls[defmap[i]]);
3580 }
3581
3582 return true;
3583 }
3584
3585 /*
3586 * Clean up storage and release resources for COPY FROM.
3587 */
3588 void
EndCopyFrom(CopyState cstate)3589 EndCopyFrom(CopyState cstate)
3590 {
3591 /* No COPY FROM related resources except memory. */
3592
3593 EndCopy(cstate);
3594 }
3595
3596 /*
3597 * Read the next input line and stash it in line_buf, with conversion to
3598 * server encoding.
3599 *
3600 * Result is true if read was terminated by EOF, false if terminated
3601 * by newline. The terminating newline or EOF marker is not included
3602 * in the final value of line_buf.
3603 */
3604 static bool
CopyReadLine(CopyState cstate)3605 CopyReadLine(CopyState cstate)
3606 {
3607 bool result;
3608
3609 resetStringInfo(&cstate->line_buf);
3610 cstate->line_buf_valid = true;
3611
3612 /* Mark that encoding conversion hasn't occurred yet */
3613 cstate->line_buf_converted = false;
3614
3615 /* Parse data and transfer into line_buf */
3616 result = CopyReadLineText(cstate);
3617
3618 if (result)
3619 {
3620 /*
3621 * Reached EOF. In protocol version 3, we should ignore anything
3622 * after \. up to the protocol end of copy data. (XXX maybe better
3623 * not to treat \. as special?)
3624 */
3625 if (cstate->copy_dest == COPY_NEW_FE)
3626 {
3627 do
3628 {
3629 cstate->raw_buf_index = cstate->raw_buf_len;
3630 } while (CopyLoadRawBuf(cstate));
3631 }
3632 }
3633 else
3634 {
3635 /*
3636 * If we didn't hit EOF, then we must have transferred the EOL marker
3637 * to line_buf along with the data. Get rid of it.
3638 */
3639 switch (cstate->eol_type)
3640 {
3641 case EOL_NL:
3642 Assert(cstate->line_buf.len >= 1);
3643 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3644 cstate->line_buf.len--;
3645 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3646 break;
3647 case EOL_CR:
3648 Assert(cstate->line_buf.len >= 1);
3649 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3650 cstate->line_buf.len--;
3651 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3652 break;
3653 case EOL_CRNL:
3654 Assert(cstate->line_buf.len >= 2);
3655 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3656 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3657 cstate->line_buf.len -= 2;
3658 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3659 break;
3660 case EOL_UNKNOWN:
3661 /* shouldn't get here */
3662 Assert(false);
3663 break;
3664 }
3665 }
3666
3667 /* Done reading the line. Convert it to server encoding. */
3668 if (cstate->need_transcoding)
3669 {
3670 char *cvt;
3671
3672 cvt = pg_any_to_server(cstate->line_buf.data,
3673 cstate->line_buf.len,
3674 cstate->file_encoding);
3675 if (cvt != cstate->line_buf.data)
3676 {
3677 /* transfer converted data back to line_buf */
3678 resetStringInfo(&cstate->line_buf);
3679 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3680 pfree(cvt);
3681 }
3682 }
3683
3684 /* Now it's safe to use the buffer in error messages */
3685 cstate->line_buf_converted = true;
3686
3687 return result;
3688 }
3689
3690 /*
3691 * CopyReadLineText - inner loop of CopyReadLine for text mode
3692 */
3693 static bool
CopyReadLineText(CopyState cstate)3694 CopyReadLineText(CopyState cstate)
3695 {
3696 char *copy_raw_buf;
3697 int raw_buf_ptr;
3698 int copy_buf_len;
3699 bool need_data = false;
3700 bool hit_eof = false;
3701 bool result = false;
3702 char mblen_str[2];
3703
3704 /* CSV variables */
3705 bool first_char_in_line = true;
3706 bool in_quote = false,
3707 last_was_esc = false;
3708 char quotec = '\0';
3709 char escapec = '\0';
3710
3711 if (cstate->csv_mode)
3712 {
3713 quotec = cstate->quote[0];
3714 escapec = cstate->escape[0];
3715 /* ignore special escape processing if it's the same as quotec */
3716 if (quotec == escapec)
3717 escapec = '\0';
3718 }
3719
3720 mblen_str[1] = '\0';
3721
3722 /*
3723 * The objective of this loop is to transfer the entire next input line
3724 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3725 * \n) and the end-of-copy marker (\.).
3726 *
3727 * In CSV mode, \r and \n inside a quoted field are just part of the data
3728 * value and are put in line_buf. We keep just enough state to know if we
3729 * are currently in a quoted field or not.
3730 *
3731 * These four characters, and the CSV escape and quote characters, are
3732 * assumed the same in frontend and backend encodings.
3733 *
3734 * For speed, we try to move data from raw_buf to line_buf in chunks
3735 * rather than one character at a time. raw_buf_ptr points to the next
3736 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3737 * have been determined to be part of the line, but not yet transferred to
3738 * line_buf.
3739 *
3740 * For a little extra speed within the loop, we copy raw_buf and
3741 * raw_buf_len into local variables.
3742 */
3743 copy_raw_buf = cstate->raw_buf;
3744 raw_buf_ptr = cstate->raw_buf_index;
3745 copy_buf_len = cstate->raw_buf_len;
3746
3747 for (;;)
3748 {
3749 int prev_raw_ptr;
3750 char c;
3751
3752 /*
3753 * Load more data if needed. Ideally we would just force four bytes
3754 * of read-ahead and avoid the many calls to
3755 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3756 * does not allow us to read too far ahead or we might read into the
3757 * next data, so we read-ahead only as far we know we can. One
3758 * optimization would be to read-ahead four byte here if
3759 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3760 * considering the size of the buffer.
3761 */
3762 if (raw_buf_ptr >= copy_buf_len || need_data)
3763 {
3764 REFILL_LINEBUF;
3765
3766 /*
3767 * Try to read some more data. This will certainly reset
3768 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3769 */
3770 if (!CopyLoadRawBuf(cstate))
3771 hit_eof = true;
3772 raw_buf_ptr = 0;
3773 copy_buf_len = cstate->raw_buf_len;
3774
3775 /*
3776 * If we are completely out of data, break out of the loop,
3777 * reporting EOF.
3778 */
3779 if (copy_buf_len <= 0)
3780 {
3781 result = true;
3782 break;
3783 }
3784 need_data = false;
3785 }
3786
3787 /* OK to fetch a character */
3788 prev_raw_ptr = raw_buf_ptr;
3789 c = copy_raw_buf[raw_buf_ptr++];
3790
3791 if (cstate->csv_mode)
3792 {
3793 /*
3794 * If character is '\\' or '\r', we may need to look ahead below.
3795 * Force fetch of the next character if we don't already have it.
3796 * We need to do this before changing CSV state, in case one of
3797 * these characters is also the quote or escape character.
3798 *
3799 * Note: old-protocol does not like forced prefetch, but it's OK
3800 * here since we cannot validly be at EOF.
3801 */
3802 if (c == '\\' || c == '\r')
3803 {
3804 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3805 }
3806
3807 /*
3808 * Dealing with quotes and escapes here is mildly tricky. If the
3809 * quote char is also the escape char, there's no problem - we
3810 * just use the char as a toggle. If they are different, we need
3811 * to ensure that we only take account of an escape inside a
3812 * quoted field and immediately preceding a quote char, and not
3813 * the second in an escape-escape sequence.
3814 */
3815 if (in_quote && c == escapec)
3816 last_was_esc = !last_was_esc;
3817 if (c == quotec && !last_was_esc)
3818 in_quote = !in_quote;
3819 if (c != escapec)
3820 last_was_esc = false;
3821
3822 /*
3823 * Updating the line count for embedded CR and/or LF chars is
3824 * necessarily a little fragile - this test is probably about the
3825 * best we can do. (XXX it's arguable whether we should do this
3826 * at all --- is cur_lineno a physical or logical count?)
3827 */
3828 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3829 cstate->cur_lineno++;
3830 }
3831
3832 /* Process \r */
3833 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3834 {
3835 /* Check for \r\n on first line, _and_ handle \r\n. */
3836 if (cstate->eol_type == EOL_UNKNOWN ||
3837 cstate->eol_type == EOL_CRNL)
3838 {
3839 /*
3840 * If need more data, go back to loop top to load it.
3841 *
3842 * Note that if we are at EOF, c will wind up as '\0' because
3843 * of the guaranteed pad of raw_buf.
3844 */
3845 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3846
3847 /* get next char */
3848 c = copy_raw_buf[raw_buf_ptr];
3849
3850 if (c == '\n')
3851 {
3852 raw_buf_ptr++; /* eat newline */
3853 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3854 }
3855 else
3856 {
3857 /* found \r, but no \n */
3858 if (cstate->eol_type == EOL_CRNL)
3859 ereport(ERROR,
3860 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3861 !cstate->csv_mode ?
3862 errmsg("literal carriage return found in data") :
3863 errmsg("unquoted carriage return found in data"),
3864 !cstate->csv_mode ?
3865 errhint("Use \"\\r\" to represent carriage return.") :
3866 errhint("Use quoted CSV field to represent carriage return.")));
3867
3868 /*
3869 * if we got here, it is the first line and we didn't find
3870 * \n, so don't consume the peeked character
3871 */
3872 cstate->eol_type = EOL_CR;
3873 }
3874 }
3875 else if (cstate->eol_type == EOL_NL)
3876 ereport(ERROR,
3877 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3878 !cstate->csv_mode ?
3879 errmsg("literal carriage return found in data") :
3880 errmsg("unquoted carriage return found in data"),
3881 !cstate->csv_mode ?
3882 errhint("Use \"\\r\" to represent carriage return.") :
3883 errhint("Use quoted CSV field to represent carriage return.")));
3884 /* If reach here, we have found the line terminator */
3885 break;
3886 }
3887
3888 /* Process \n */
3889 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3890 {
3891 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3892 ereport(ERROR,
3893 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3894 !cstate->csv_mode ?
3895 errmsg("literal newline found in data") :
3896 errmsg("unquoted newline found in data"),
3897 !cstate->csv_mode ?
3898 errhint("Use \"\\n\" to represent newline.") :
3899 errhint("Use quoted CSV field to represent newline.")));
3900 cstate->eol_type = EOL_NL; /* in case not set yet */
3901 /* If reach here, we have found the line terminator */
3902 break;
3903 }
3904
3905 /*
3906 * In CSV mode, we only recognize \. alone on a line. This is because
3907 * \. is a valid CSV data value.
3908 */
3909 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3910 {
3911 char c2;
3912
3913 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3914 IF_NEED_REFILL_AND_EOF_BREAK(0);
3915
3916 /* -----
3917 * get next character
3918 * Note: we do not change c so if it isn't \., we can fall
3919 * through and continue processing for file encoding.
3920 * -----
3921 */
3922 c2 = copy_raw_buf[raw_buf_ptr];
3923
3924 if (c2 == '.')
3925 {
3926 raw_buf_ptr++; /* consume the '.' */
3927
3928 /*
3929 * Note: if we loop back for more data here, it does not
3930 * matter that the CSV state change checks are re-executed; we
3931 * will come back here with no important state changed.
3932 */
3933 if (cstate->eol_type == EOL_CRNL)
3934 {
3935 /* Get the next character */
3936 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3937 /* if hit_eof, c2 will become '\0' */
3938 c2 = copy_raw_buf[raw_buf_ptr++];
3939
3940 if (c2 == '\n')
3941 {
3942 if (!cstate->csv_mode)
3943 ereport(ERROR,
3944 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3945 errmsg("end-of-copy marker does not match previous newline style")));
3946 else
3947 NO_END_OF_COPY_GOTO;
3948 }
3949 else if (c2 != '\r')
3950 {
3951 if (!cstate->csv_mode)
3952 ereport(ERROR,
3953 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3954 errmsg("end-of-copy marker corrupt")));
3955 else
3956 NO_END_OF_COPY_GOTO;
3957 }
3958 }
3959
3960 /* Get the next character */
3961 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3962 /* if hit_eof, c2 will become '\0' */
3963 c2 = copy_raw_buf[raw_buf_ptr++];
3964
3965 if (c2 != '\r' && c2 != '\n')
3966 {
3967 if (!cstate->csv_mode)
3968 ereport(ERROR,
3969 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3970 errmsg("end-of-copy marker corrupt")));
3971 else
3972 NO_END_OF_COPY_GOTO;
3973 }
3974
3975 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3976 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3977 (cstate->eol_type == EOL_CR && c2 != '\r'))
3978 {
3979 ereport(ERROR,
3980 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3981 errmsg("end-of-copy marker does not match previous newline style")));
3982 }
3983
3984 /*
3985 * Transfer only the data before the \. into line_buf, then
3986 * discard the data and the \. sequence.
3987 */
3988 if (prev_raw_ptr > cstate->raw_buf_index)
3989 appendBinaryStringInfo(&cstate->line_buf,
3990 cstate->raw_buf + cstate->raw_buf_index,
3991 prev_raw_ptr - cstate->raw_buf_index);
3992 cstate->raw_buf_index = raw_buf_ptr;
3993 result = true; /* report EOF */
3994 break;
3995 }
3996 else if (!cstate->csv_mode)
3997 {
3998 /*
3999 * If we are here, it means we found a backslash followed by
4000 * something other than a period. In non-CSV mode, anything
4001 * after a backslash is special, so we skip over that second
4002 * character too. If we didn't do that \\. would be
4003 * considered an eof-of copy, while in non-CSV mode it is a
4004 * literal backslash followed by a period. In CSV mode,
4005 * backslashes are not special, so we want to process the
4006 * character after the backslash just like a normal character,
4007 * so we don't increment in those cases.
4008 *
4009 * Set 'c' to skip whole character correctly in multi-byte
4010 * encodings. If we don't have the whole character in the
4011 * buffer yet, we might loop back to process it, after all,
4012 * but that's OK because multi-byte characters cannot have any
4013 * special meaning.
4014 */
4015 raw_buf_ptr++;
4016 c = c2;
4017 }
4018 }
4019
4020 /*
4021 * This label is for CSV cases where \. appears at the start of a
4022 * line, but there is more text after it, meaning it was a data value.
4023 * We are more strict for \. in CSV mode because \. could be a data
4024 * value, while in non-CSV mode, \. cannot be a data value.
4025 */
4026 not_end_of_copy:
4027
4028 /*
4029 * Process all bytes of a multi-byte character as a group.
4030 *
4031 * We only support multi-byte sequences where the first byte has the
4032 * high-bit set, so as an optimization we can avoid this block
4033 * entirely if it is not set.
4034 */
4035 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4036 {
4037 int mblen;
4038
4039 mblen_str[0] = c;
4040 /* All our encodings only read the first byte to get the length */
4041 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4042 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
4043 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4044 raw_buf_ptr += mblen - 1;
4045 }
4046 first_char_in_line = false;
4047 } /* end of outer loop */
4048
4049 /*
4050 * Transfer any still-uncopied data to line_buf.
4051 */
4052 REFILL_LINEBUF;
4053
4054 return result;
4055 }
4056
4057 /*
4058 * Return decimal value for a hexadecimal digit
4059 */
4060 static int
GetDecimalFromHex(char hex)4061 GetDecimalFromHex(char hex)
4062 {
4063 if (isdigit((unsigned char) hex))
4064 return hex - '0';
4065 else
4066 return tolower((unsigned char) hex) - 'a' + 10;
4067 }
4068
4069 /*
4070 * Parse the current line into separate attributes (fields),
4071 * performing de-escaping as needed.
4072 *
4073 * The input is in line_buf. We use attribute_buf to hold the result
4074 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4075 * string, or NULL when the input matches the null marker string.
4076 * This array is expanded as necessary.
4077 *
4078 * (Note that the caller cannot check for nulls since the returned
4079 * string would be the post-de-escaping equivalent, which may look
4080 * the same as some valid data string.)
4081 *
4082 * delim is the column delimiter string (must be just one byte for now).
4083 * null_print is the null marker string. Note that this is compared to
4084 * the pre-de-escaped input string.
4085 *
4086 * The return value is the number of fields actually read.
4087 */
4088 static int
CopyReadAttributesText(CopyState cstate)4089 CopyReadAttributesText(CopyState cstate)
4090 {
4091 char delimc = cstate->delim[0];
4092 int fieldno;
4093 char *output_ptr;
4094 char *cur_ptr;
4095 char *line_end_ptr;
4096
4097 /*
4098 * We need a special case for zero-column tables: check that the input
4099 * line is empty, and return.
4100 */
4101 if (cstate->max_fields <= 0)
4102 {
4103 if (cstate->line_buf.len != 0)
4104 ereport(ERROR,
4105 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4106 errmsg("extra data after last expected column")));
4107 return 0;
4108 }
4109
4110 resetStringInfo(&cstate->attribute_buf);
4111
4112 /*
4113 * The de-escaped attributes will certainly not be longer than the input
4114 * data line, so we can just force attribute_buf to be large enough and
4115 * then transfer data without any checks for enough space. We need to do
4116 * it this way because enlarging attribute_buf mid-stream would invalidate
4117 * pointers already stored into cstate->raw_fields[].
4118 */
4119 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4120 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4121 output_ptr = cstate->attribute_buf.data;
4122
4123 /* set pointer variables for loop */
4124 cur_ptr = cstate->line_buf.data;
4125 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4126
4127 /* Outer loop iterates over fields */
4128 fieldno = 0;
4129 for (;;)
4130 {
4131 bool found_delim = false;
4132 char *start_ptr;
4133 char *end_ptr;
4134 int input_len;
4135 bool saw_non_ascii = false;
4136
4137 /* Make sure there is enough space for the next value */
4138 if (fieldno >= cstate->max_fields)
4139 {
4140 cstate->max_fields *= 2;
4141 cstate->raw_fields =
4142 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4143 }
4144
4145 /* Remember start of field on both input and output sides */
4146 start_ptr = cur_ptr;
4147 cstate->raw_fields[fieldno] = output_ptr;
4148
4149 /*
4150 * Scan data for field.
4151 *
4152 * Note that in this loop, we are scanning to locate the end of field
4153 * and also speculatively performing de-escaping. Once we find the
4154 * end-of-field, we can match the raw field contents against the null
4155 * marker string. Only after that comparison fails do we know that
4156 * de-escaping is actually the right thing to do; therefore we *must
4157 * not* throw any syntax errors before we've done the null-marker
4158 * check.
4159 */
4160 for (;;)
4161 {
4162 char c;
4163
4164 end_ptr = cur_ptr;
4165 if (cur_ptr >= line_end_ptr)
4166 break;
4167 c = *cur_ptr++;
4168 if (c == delimc)
4169 {
4170 found_delim = true;
4171 break;
4172 }
4173 if (c == '\\')
4174 {
4175 if (cur_ptr >= line_end_ptr)
4176 break;
4177 c = *cur_ptr++;
4178 switch (c)
4179 {
4180 case '0':
4181 case '1':
4182 case '2':
4183 case '3':
4184 case '4':
4185 case '5':
4186 case '6':
4187 case '7':
4188 {
4189 /* handle \013 */
4190 int val;
4191
4192 val = OCTVALUE(c);
4193 if (cur_ptr < line_end_ptr)
4194 {
4195 c = *cur_ptr;
4196 if (ISOCTAL(c))
4197 {
4198 cur_ptr++;
4199 val = (val << 3) + OCTVALUE(c);
4200 if (cur_ptr < line_end_ptr)
4201 {
4202 c = *cur_ptr;
4203 if (ISOCTAL(c))
4204 {
4205 cur_ptr++;
4206 val = (val << 3) + OCTVALUE(c);
4207 }
4208 }
4209 }
4210 }
4211 c = val & 0377;
4212 if (c == '\0' || IS_HIGHBIT_SET(c))
4213 saw_non_ascii = true;
4214 }
4215 break;
4216 case 'x':
4217 /* Handle \x3F */
4218 if (cur_ptr < line_end_ptr)
4219 {
4220 char hexchar = *cur_ptr;
4221
4222 if (isxdigit((unsigned char) hexchar))
4223 {
4224 int val = GetDecimalFromHex(hexchar);
4225
4226 cur_ptr++;
4227 if (cur_ptr < line_end_ptr)
4228 {
4229 hexchar = *cur_ptr;
4230 if (isxdigit((unsigned char) hexchar))
4231 {
4232 cur_ptr++;
4233 val = (val << 4) + GetDecimalFromHex(hexchar);
4234 }
4235 }
4236 c = val & 0xff;
4237 if (c == '\0' || IS_HIGHBIT_SET(c))
4238 saw_non_ascii = true;
4239 }
4240 }
4241 break;
4242 case 'b':
4243 c = '\b';
4244 break;
4245 case 'f':
4246 c = '\f';
4247 break;
4248 case 'n':
4249 c = '\n';
4250 break;
4251 case 'r':
4252 c = '\r';
4253 break;
4254 case 't':
4255 c = '\t';
4256 break;
4257 case 'v':
4258 c = '\v';
4259 break;
4260
4261 /*
4262 * in all other cases, take the char after '\'
4263 * literally
4264 */
4265 }
4266 }
4267
4268 /* Add c to output string */
4269 *output_ptr++ = c;
4270 }
4271
4272 /* Check whether raw input matched null marker */
4273 input_len = end_ptr - start_ptr;
4274 if (input_len == cstate->null_print_len &&
4275 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4276 cstate->raw_fields[fieldno] = NULL;
4277 else
4278 {
4279 /*
4280 * At this point we know the field is supposed to contain data.
4281 *
4282 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4283 * resulting string is valid data for the db encoding.
4284 */
4285 if (saw_non_ascii)
4286 {
4287 char *fld = cstate->raw_fields[fieldno];
4288
4289 pg_verifymbstr(fld, output_ptr - fld, false);
4290 }
4291 }
4292
4293 /* Terminate attribute value in output area */
4294 *output_ptr++ = '\0';
4295
4296 fieldno++;
4297 /* Done if we hit EOL instead of a delim */
4298 if (!found_delim)
4299 break;
4300 }
4301
4302 /* Clean up state of attribute_buf */
4303 output_ptr--;
4304 Assert(*output_ptr == '\0');
4305 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4306
4307 return fieldno;
4308 }
4309
4310 /*
4311 * Parse the current line into separate attributes (fields),
4312 * performing de-escaping as needed. This has exactly the same API as
4313 * CopyReadAttributesText, except we parse the fields according to
4314 * "standard" (i.e. common) CSV usage.
4315 */
4316 static int
CopyReadAttributesCSV(CopyState cstate)4317 CopyReadAttributesCSV(CopyState cstate)
4318 {
4319 char delimc = cstate->delim[0];
4320 char quotec = cstate->quote[0];
4321 char escapec = cstate->escape[0];
4322 int fieldno;
4323 char *output_ptr;
4324 char *cur_ptr;
4325 char *line_end_ptr;
4326
4327 /*
4328 * We need a special case for zero-column tables: check that the input
4329 * line is empty, and return.
4330 */
4331 if (cstate->max_fields <= 0)
4332 {
4333 if (cstate->line_buf.len != 0)
4334 ereport(ERROR,
4335 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4336 errmsg("extra data after last expected column")));
4337 return 0;
4338 }
4339
4340 resetStringInfo(&cstate->attribute_buf);
4341
4342 /*
4343 * The de-escaped attributes will certainly not be longer than the input
4344 * data line, so we can just force attribute_buf to be large enough and
4345 * then transfer data without any checks for enough space. We need to do
4346 * it this way because enlarging attribute_buf mid-stream would invalidate
4347 * pointers already stored into cstate->raw_fields[].
4348 */
4349 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4350 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4351 output_ptr = cstate->attribute_buf.data;
4352
4353 /* set pointer variables for loop */
4354 cur_ptr = cstate->line_buf.data;
4355 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4356
4357 /* Outer loop iterates over fields */
4358 fieldno = 0;
4359 for (;;)
4360 {
4361 bool found_delim = false;
4362 bool saw_quote = false;
4363 char *start_ptr;
4364 char *end_ptr;
4365 int input_len;
4366
4367 /* Make sure there is enough space for the next value */
4368 if (fieldno >= cstate->max_fields)
4369 {
4370 cstate->max_fields *= 2;
4371 cstate->raw_fields =
4372 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4373 }
4374
4375 /* Remember start of field on both input and output sides */
4376 start_ptr = cur_ptr;
4377 cstate->raw_fields[fieldno] = output_ptr;
4378
4379 /*
4380 * Scan data for field,
4381 *
4382 * The loop starts in "not quote" mode and then toggles between that
4383 * and "in quote" mode. The loop exits normally if it is in "not
4384 * quote" mode and a delimiter or line end is seen.
4385 */
4386 for (;;)
4387 {
4388 char c;
4389
4390 /* Not in quote */
4391 for (;;)
4392 {
4393 end_ptr = cur_ptr;
4394 if (cur_ptr >= line_end_ptr)
4395 goto endfield;
4396 c = *cur_ptr++;
4397 /* unquoted field delimiter */
4398 if (c == delimc)
4399 {
4400 found_delim = true;
4401 goto endfield;
4402 }
4403 /* start of quoted field (or part of field) */
4404 if (c == quotec)
4405 {
4406 saw_quote = true;
4407 break;
4408 }
4409 /* Add c to output string */
4410 *output_ptr++ = c;
4411 }
4412
4413 /* In quote */
4414 for (;;)
4415 {
4416 end_ptr = cur_ptr;
4417 if (cur_ptr >= line_end_ptr)
4418 ereport(ERROR,
4419 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4420 errmsg("unterminated CSV quoted field")));
4421
4422 c = *cur_ptr++;
4423
4424 /* escape within a quoted field */
4425 if (c == escapec)
4426 {
4427 /*
4428 * peek at the next char if available, and escape it if it
4429 * is an escape char or a quote char
4430 */
4431 if (cur_ptr < line_end_ptr)
4432 {
4433 char nextc = *cur_ptr;
4434
4435 if (nextc == escapec || nextc == quotec)
4436 {
4437 *output_ptr++ = nextc;
4438 cur_ptr++;
4439 continue;
4440 }
4441 }
4442 }
4443
4444 /*
4445 * end of quoted field. Must do this test after testing for
4446 * escape in case quote char and escape char are the same
4447 * (which is the common case).
4448 */
4449 if (c == quotec)
4450 break;
4451
4452 /* Add c to output string */
4453 *output_ptr++ = c;
4454 }
4455 }
4456 endfield:
4457
4458 /* Terminate attribute value in output area */
4459 *output_ptr++ = '\0';
4460
4461 /* Check whether raw input matched null marker */
4462 input_len = end_ptr - start_ptr;
4463 if (!saw_quote && input_len == cstate->null_print_len &&
4464 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4465 cstate->raw_fields[fieldno] = NULL;
4466
4467 fieldno++;
4468 /* Done if we hit EOL instead of a delim */
4469 if (!found_delim)
4470 break;
4471 }
4472
4473 /* Clean up state of attribute_buf */
4474 output_ptr--;
4475 Assert(*output_ptr == '\0');
4476 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4477
4478 return fieldno;
4479 }
4480
4481
4482 /*
4483 * Read a binary attribute
4484 */
4485 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4486 CopyReadBinaryAttribute(CopyState cstate,
4487 int column_no, FmgrInfo *flinfo,
4488 Oid typioparam, int32 typmod,
4489 bool *isnull)
4490 {
4491 int32 fld_size;
4492 Datum result;
4493
4494 if (!CopyGetInt32(cstate, &fld_size))
4495 ereport(ERROR,
4496 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4497 errmsg("unexpected EOF in COPY data")));
4498 if (fld_size == -1)
4499 {
4500 *isnull = true;
4501 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4502 }
4503 if (fld_size < 0)
4504 ereport(ERROR,
4505 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4506 errmsg("invalid field size")));
4507
4508 /* reset attribute_buf to empty, and load raw data in it */
4509 resetStringInfo(&cstate->attribute_buf);
4510
4511 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4512 if (CopyGetData(cstate, cstate->attribute_buf.data,
4513 fld_size, fld_size) != fld_size)
4514 ereport(ERROR,
4515 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4516 errmsg("unexpected EOF in COPY data")));
4517
4518 cstate->attribute_buf.len = fld_size;
4519 cstate->attribute_buf.data[fld_size] = '\0';
4520
4521 /* Call the column type's binary input converter */
4522 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4523 typioparam, typmod);
4524
4525 /* Trouble if it didn't eat the whole buffer */
4526 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4527 ereport(ERROR,
4528 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4529 errmsg("incorrect binary data format")));
4530
4531 *isnull = false;
4532 return result;
4533 }
4534
4535 /*
4536 * Send text representation of one attribute, with conversion and escaping
4537 */
4538 #define DUMPSOFAR() \
4539 do { \
4540 if (ptr > start) \
4541 CopySendData(cstate, start, ptr - start); \
4542 } while (0)
4543
4544 static void
CopyAttributeOutText(CopyState cstate,char * string)4545 CopyAttributeOutText(CopyState cstate, char *string)
4546 {
4547 char *ptr;
4548 char *start;
4549 char c;
4550 char delimc = cstate->delim[0];
4551
4552 if (cstate->need_transcoding)
4553 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4554 else
4555 ptr = string;
4556
4557 /*
4558 * We have to grovel through the string searching for control characters
4559 * and instances of the delimiter character. In most cases, though, these
4560 * are infrequent. To avoid overhead from calling CopySendData once per
4561 * character, we dump out all characters between escaped characters in a
4562 * single call. The loop invariant is that the data from "start" to "ptr"
4563 * can be sent literally, but hasn't yet been.
4564 *
4565 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4566 * in valid backend encodings, extra bytes of a multibyte character never
4567 * look like ASCII. This loop is sufficiently performance-critical that
4568 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4569 * of the normal safe-encoding path.
4570 */
4571 if (cstate->encoding_embeds_ascii)
4572 {
4573 start = ptr;
4574 while ((c = *ptr) != '\0')
4575 {
4576 if ((unsigned char) c < (unsigned char) 0x20)
4577 {
4578 /*
4579 * \r and \n must be escaped, the others are traditional. We
4580 * prefer to dump these using the C-like notation, rather than
4581 * a backslash and the literal character, because it makes the
4582 * dump file a bit more proof against Microsoftish data
4583 * mangling.
4584 */
4585 switch (c)
4586 {
4587 case '\b':
4588 c = 'b';
4589 break;
4590 case '\f':
4591 c = 'f';
4592 break;
4593 case '\n':
4594 c = 'n';
4595 break;
4596 case '\r':
4597 c = 'r';
4598 break;
4599 case '\t':
4600 c = 't';
4601 break;
4602 case '\v':
4603 c = 'v';
4604 break;
4605 default:
4606 /* If it's the delimiter, must backslash it */
4607 if (c == delimc)
4608 break;
4609 /* All ASCII control chars are length 1 */
4610 ptr++;
4611 continue; /* fall to end of loop */
4612 }
4613 /* if we get here, we need to convert the control char */
4614 DUMPSOFAR();
4615 CopySendChar(cstate, '\\');
4616 CopySendChar(cstate, c);
4617 start = ++ptr; /* do not include char in next run */
4618 }
4619 else if (c == '\\' || c == delimc)
4620 {
4621 DUMPSOFAR();
4622 CopySendChar(cstate, '\\');
4623 start = ptr++; /* we include char in next run */
4624 }
4625 else if (IS_HIGHBIT_SET(c))
4626 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4627 else
4628 ptr++;
4629 }
4630 }
4631 else
4632 {
4633 start = ptr;
4634 while ((c = *ptr) != '\0')
4635 {
4636 if ((unsigned char) c < (unsigned char) 0x20)
4637 {
4638 /*
4639 * \r and \n must be escaped, the others are traditional. We
4640 * prefer to dump these using the C-like notation, rather than
4641 * a backslash and the literal character, because it makes the
4642 * dump file a bit more proof against Microsoftish data
4643 * mangling.
4644 */
4645 switch (c)
4646 {
4647 case '\b':
4648 c = 'b';
4649 break;
4650 case '\f':
4651 c = 'f';
4652 break;
4653 case '\n':
4654 c = 'n';
4655 break;
4656 case '\r':
4657 c = 'r';
4658 break;
4659 case '\t':
4660 c = 't';
4661 break;
4662 case '\v':
4663 c = 'v';
4664 break;
4665 default:
4666 /* If it's the delimiter, must backslash it */
4667 if (c == delimc)
4668 break;
4669 /* All ASCII control chars are length 1 */
4670 ptr++;
4671 continue; /* fall to end of loop */
4672 }
4673 /* if we get here, we need to convert the control char */
4674 DUMPSOFAR();
4675 CopySendChar(cstate, '\\');
4676 CopySendChar(cstate, c);
4677 start = ++ptr; /* do not include char in next run */
4678 }
4679 else if (c == '\\' || c == delimc)
4680 {
4681 DUMPSOFAR();
4682 CopySendChar(cstate, '\\');
4683 start = ptr++; /* we include char in next run */
4684 }
4685 else
4686 ptr++;
4687 }
4688 }
4689
4690 DUMPSOFAR();
4691 }
4692
4693 /*
4694 * Send text representation of one attribute, with conversion and
4695 * CSV-style escaping
4696 */
4697 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4698 CopyAttributeOutCSV(CopyState cstate, char *string,
4699 bool use_quote, bool single_attr)
4700 {
4701 char *ptr;
4702 char *start;
4703 char c;
4704 char delimc = cstate->delim[0];
4705 char quotec = cstate->quote[0];
4706 char escapec = cstate->escape[0];
4707
4708 /* force quoting if it matches null_print (before conversion!) */
4709 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4710 use_quote = true;
4711
4712 if (cstate->need_transcoding)
4713 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4714 else
4715 ptr = string;
4716
4717 /*
4718 * Make a preliminary pass to discover if it needs quoting
4719 */
4720 if (!use_quote)
4721 {
4722 /*
4723 * Because '\.' can be a data value, quote it if it appears alone on a
4724 * line so it is not interpreted as the end-of-data marker.
4725 */
4726 if (single_attr && strcmp(ptr, "\\.") == 0)
4727 use_quote = true;
4728 else
4729 {
4730 char *tptr = ptr;
4731
4732 while ((c = *tptr) != '\0')
4733 {
4734 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4735 {
4736 use_quote = true;
4737 break;
4738 }
4739 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4740 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4741 else
4742 tptr++;
4743 }
4744 }
4745 }
4746
4747 if (use_quote)
4748 {
4749 CopySendChar(cstate, quotec);
4750
4751 /*
4752 * We adopt the same optimization strategy as in CopyAttributeOutText
4753 */
4754 start = ptr;
4755 while ((c = *ptr) != '\0')
4756 {
4757 if (c == quotec || c == escapec)
4758 {
4759 DUMPSOFAR();
4760 CopySendChar(cstate, escapec);
4761 start = ptr; /* we include char in next run */
4762 }
4763 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4764 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4765 else
4766 ptr++;
4767 }
4768 DUMPSOFAR();
4769
4770 CopySendChar(cstate, quotec);
4771 }
4772 else
4773 {
4774 /* If it doesn't need quoting, we can just dump it as-is */
4775 CopySendString(cstate, ptr);
4776 }
4777 }
4778
4779 /*
4780 * CopyGetAttnums - build an integer list of attnums to be copied
4781 *
4782 * The input attnamelist is either the user-specified column list,
4783 * or NIL if there was none (in which case we want all the non-dropped
4784 * columns).
4785 *
4786 * rel can be NULL ... it's only used for error reports.
4787 */
4788 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4789 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4790 {
4791 List *attnums = NIL;
4792
4793 if (attnamelist == NIL)
4794 {
4795 /* Generate default column list */
4796 Form_pg_attribute *attr = tupDesc->attrs;
4797 int attr_count = tupDesc->natts;
4798 int i;
4799
4800 for (i = 0; i < attr_count; i++)
4801 {
4802 if (attr[i]->attisdropped)
4803 continue;
4804 attnums = lappend_int(attnums, i + 1);
4805 }
4806 }
4807 else
4808 {
4809 /* Validate the user-supplied list and extract attnums */
4810 ListCell *l;
4811
4812 foreach(l, attnamelist)
4813 {
4814 char *name = strVal(lfirst(l));
4815 int attnum;
4816 int i;
4817
4818 /* Lookup column name */
4819 attnum = InvalidAttrNumber;
4820 for (i = 0; i < tupDesc->natts; i++)
4821 {
4822 if (tupDesc->attrs[i]->attisdropped)
4823 continue;
4824 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4825 {
4826 attnum = tupDesc->attrs[i]->attnum;
4827 break;
4828 }
4829 }
4830 if (attnum == InvalidAttrNumber)
4831 {
4832 if (rel != NULL)
4833 ereport(ERROR,
4834 (errcode(ERRCODE_UNDEFINED_COLUMN),
4835 errmsg("column \"%s\" of relation \"%s\" does not exist",
4836 name, RelationGetRelationName(rel))));
4837 else
4838 ereport(ERROR,
4839 (errcode(ERRCODE_UNDEFINED_COLUMN),
4840 errmsg("column \"%s\" does not exist",
4841 name)));
4842 }
4843 /* Check for duplicates */
4844 if (list_member_int(attnums, attnum))
4845 ereport(ERROR,
4846 (errcode(ERRCODE_DUPLICATE_COLUMN),
4847 errmsg("column \"%s\" specified more than once",
4848 name)));
4849 attnums = lappend_int(attnums, attnum);
4850 }
4851 }
4852
4853 return attnums;
4854 }
4855
4856
4857 /*
4858 * copy_dest_startup --- executor startup
4859 */
4860 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4861 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4862 {
4863 /* no-op */
4864 }
4865
4866 /*
4867 * copy_dest_receive --- receive one tuple
4868 */
4869 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4870 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4871 {
4872 DR_copy *myState = (DR_copy *) self;
4873 CopyState cstate = myState->cstate;
4874
4875 /* Make sure the tuple is fully deconstructed */
4876 slot_getallattrs(slot);
4877
4878 /* And send the data */
4879 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4880 myState->processed++;
4881
4882 return true;
4883 }
4884
4885 /*
4886 * copy_dest_shutdown --- executor end
4887 */
4888 static void
copy_dest_shutdown(DestReceiver * self)4889 copy_dest_shutdown(DestReceiver *self)
4890 {
4891 /* no-op */
4892 }
4893
4894 /*
4895 * copy_dest_destroy --- release DestReceiver object
4896 */
4897 static void
copy_dest_destroy(DestReceiver * self)4898 copy_dest_destroy(DestReceiver *self)
4899 {
4900 pfree(self);
4901 }
4902
4903 /*
4904 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4905 */
4906 DestReceiver *
CreateCopyDestReceiver(void)4907 CreateCopyDestReceiver(void)
4908 {
4909 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4910
4911 self->pub.receiveSlot = copy_dest_receive;
4912 self->pub.rStartup = copy_dest_startup;
4913 self->pub.rShutdown = copy_dest_shutdown;
4914 self->pub.rDestroy = copy_dest_destroy;
4915 self->pub.mydest = DestCopyOut;
4916
4917 self->cstate = NULL; /* will be set later */
4918 self->processed = 0;
4919
4920 return (DestReceiver *) self;
4921 }
4922