1 /*-------------------------------------------------------------------------
2 *
3 * copy.c
4 * Implements the COPY utility command
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/copy.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/xact.h"
25 #include "access/xlog.h"
26 #include "catalog/dependency.h"
27 #include "catalog/pg_authid.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/execPartition.h"
33 #include "executor/executor.h"
34 #include "foreign/fdwapi.h"
35 #include "libpq/libpq.h"
36 #include "libpq/pqformat.h"
37 #include "mb/pg_wchar.h"
38 #include "miscadmin.h"
39 #include "optimizer/clauses.h"
40 #include "optimizer/planner.h"
41 #include "nodes/makefuncs.h"
42 #include "parser/parse_relation.h"
43 #include "port/pg_bswap.h"
44 #include "rewrite/rewriteHandler.h"
45 #include "storage/fd.h"
46 #include "tcop/tcopprot.h"
47 #include "utils/builtins.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/portal.h"
51 #include "utils/rel.h"
52 #include "utils/rls.h"
53 #include "utils/snapmgr.h"
54
55
56 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
57 #define OCTVALUE(c) ((c) - '0')
58
59 /*
60 * Represents the different source/dest cases we need to worry about at
61 * the bottom level
62 */
63 typedef enum CopyDest
64 {
65 COPY_FILE, /* to/from file (or a piped program) */
66 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
67 COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
68 COPY_CALLBACK /* to/from callback function */
69 } CopyDest;
70
71 /*
72 * Represents the end-of-line terminator type of the input
73 */
74 typedef enum EolType
75 {
76 EOL_UNKNOWN,
77 EOL_NL,
78 EOL_CR,
79 EOL_CRNL
80 } EolType;
81
82 /*
83 * This struct contains all the state variables used throughout a COPY
84 * operation. For simplicity, we use the same struct for all variants of COPY,
85 * even though some fields are used in only some cases.
86 *
87 * Multi-byte encodings: all supported client-side encodings encode multi-byte
88 * characters by having the first byte's high bit set. Subsequent bytes of the
89 * character can have the high bit not set. When scanning data in such an
90 * encoding to look for a match to a single-byte (ie ASCII) character, we must
91 * use the full pg_encoding_mblen() machinery to skip over multibyte
92 * characters, else we might find a false match to a trailing byte. In
93 * supported server encodings, there is no possibility of a false match, and
94 * it's faster to make useless comparisons to trailing bytes than it is to
95 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
96 * when we have to do it the hard way.
97 */
98 typedef struct CopyStateData
99 {
100 /* low-level state data */
101 CopyDest copy_dest; /* type of copy source/destination */
102 FILE *copy_file; /* used if copy_dest == COPY_FILE */
103 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
104 * dest == COPY_NEW_FE in COPY FROM */
105 bool is_copy_from; /* COPY TO, or COPY FROM? */
106 bool reached_eof; /* true if we read to end of copy data (not
107 * all copy_dest types maintain this) */
108 EolType eol_type; /* EOL type of input */
109 int file_encoding; /* file or remote side's character encoding */
110 bool need_transcoding; /* file encoding diff from server? */
111 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
112
113 /* parameters from the COPY command */
114 Relation rel; /* relation to copy to or from */
115 QueryDesc *queryDesc; /* executable query to copy from */
116 List *attnumlist; /* integer list of attnums to copy */
117 char *filename; /* filename, or NULL for STDIN/STDOUT */
118 bool is_program; /* is 'filename' a program to popen? */
119 copy_data_source_cb data_source_cb; /* function for reading data */
120 bool binary; /* binary format? */
121 bool oids; /* include OIDs? */
122 bool freeze; /* freeze rows on loading? */
123 bool csv_mode; /* Comma Separated Value format? */
124 bool header_line; /* CSV header line? */
125 char *null_print; /* NULL marker string (server encoding!) */
126 int null_print_len; /* length of same */
127 char *null_print_client; /* same converted to file encoding */
128 char *delim; /* column delimiter (must be 1 byte) */
129 char *quote; /* CSV quote char (must be 1 byte) */
130 char *escape; /* CSV escape char (must be 1 byte) */
131 List *force_quote; /* list of column names */
132 bool force_quote_all; /* FORCE_QUOTE *? */
133 bool *force_quote_flags; /* per-column CSV FQ flags */
134 List *force_notnull; /* list of column names */
135 bool *force_notnull_flags; /* per-column CSV FNN flags */
136 List *force_null; /* list of column names */
137 bool *force_null_flags; /* per-column CSV FN flags */
138 bool convert_selectively; /* do selective binary conversion? */
139 List *convert_select; /* list of column names (can be NIL) */
140 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
141
142 /* these are just for error messages, see CopyFromErrorCallback */
143 const char *cur_relname; /* table name for error messages */
144 uint64 cur_lineno; /* line number for error messages */
145 const char *cur_attname; /* current att for error messages */
146 const char *cur_attval; /* current att value for error messages */
147
148 /*
149 * Working state for COPY TO/FROM
150 */
151 MemoryContext copycontext; /* per-copy execution context */
152
153 /*
154 * Working state for COPY TO
155 */
156 FmgrInfo *out_functions; /* lookup info for output functions */
157 MemoryContext rowcontext; /* per-row evaluation context */
158
159 /*
160 * Working state for COPY FROM
161 */
162 AttrNumber num_defaults;
163 bool file_has_oids;
164 FmgrInfo oid_in_function;
165 Oid oid_typioparam;
166 FmgrInfo *in_functions; /* array of input functions for each attrs */
167 Oid *typioparams; /* array of element types for in_functions */
168 int *defmap; /* array of default att numbers */
169 ExprState **defexprs; /* array of default att expressions */
170 bool volatile_defexprs; /* is any of defexprs volatile? */
171 List *range_table;
172
173 /* Tuple-routing support info */
174 PartitionTupleRouting *partition_tuple_routing;
175
176 TransitionCaptureState *transition_capture;
177
178 /*
179 * These variables are used to reduce overhead in textual COPY FROM.
180 *
181 * attribute_buf holds the separated, de-escaped text for each field of
182 * the current line. The CopyReadAttributes functions return arrays of
183 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
184 * the buffer on each cycle.
185 */
186 StringInfoData attribute_buf;
187
188 /* field raw data pointers found by COPY FROM */
189
190 int max_fields;
191 char **raw_fields;
192
193 /*
194 * Similarly, line_buf holds the whole input line being processed. The
195 * input cycle is first to read the whole line into line_buf, convert it
196 * to server encoding there, and then extract the individual attribute
197 * fields into attribute_buf. line_buf is preserved unmodified so that we
198 * can display it in error messages if appropriate.
199 */
200 StringInfoData line_buf;
201 bool line_buf_converted; /* converted to server encoding? */
202 bool line_buf_valid; /* contains the row being processed? */
203
204 /*
205 * Finally, raw_buf holds raw data read from the data source (file or
206 * client connection). CopyReadLine parses this data sufficiently to
207 * locate line boundaries, then transfers the data to line_buf and
208 * converts it. Note: we guarantee that there is a \0 at
209 * raw_buf[raw_buf_len].
210 */
211 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
212 char *raw_buf;
213 int raw_buf_index; /* next byte to process */
214 int raw_buf_len; /* total # of bytes stored */
215 } CopyStateData;
216
217 /* DestReceiver for COPY (query) TO */
218 typedef struct
219 {
220 DestReceiver pub; /* publicly-known function pointers */
221 CopyState cstate; /* CopyStateData for the command */
222 uint64 processed; /* # of tuples processed */
223 } DR_copy;
224
225
226 /*
227 * These macros centralize code used to process line_buf and raw_buf buffers.
228 * They are macros because they often do continue/break control and to avoid
229 * function call overhead in tight COPY loops.
230 *
231 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
232 * prevent the continue/break processing from working. We end the "if (1)"
233 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
234 * any "else" in the calling code, and to avoid any compiler warnings about
235 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
236 */
237
238 /*
239 * This keeps the character read at the top of the loop in the buffer
240 * even if there is more than one read-ahead.
241 */
242 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
243 if (1) \
244 { \
245 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
246 { \
247 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
248 need_data = true; \
249 continue; \
250 } \
251 } else ((void) 0)
252
253 /* This consumes the remainder of the buffer and breaks */
254 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
255 if (1) \
256 { \
257 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
258 { \
259 if (extralen) \
260 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
261 /* backslash just before EOF, treat as data char */ \
262 result = true; \
263 break; \
264 } \
265 } else ((void) 0)
266
267 /*
268 * Transfer any approved data to line_buf; must do this to be sure
269 * there is some room in raw_buf.
270 */
271 #define REFILL_LINEBUF \
272 if (1) \
273 { \
274 if (raw_buf_ptr > cstate->raw_buf_index) \
275 { \
276 appendBinaryStringInfo(&cstate->line_buf, \
277 cstate->raw_buf + cstate->raw_buf_index, \
278 raw_buf_ptr - cstate->raw_buf_index); \
279 cstate->raw_buf_index = raw_buf_ptr; \
280 } \
281 } else ((void) 0)
282
283 /* Undo any read-ahead and jump out of the block. */
284 #define NO_END_OF_COPY_GOTO \
285 if (1) \
286 { \
287 raw_buf_ptr = prev_raw_ptr + 1; \
288 goto not_end_of_copy; \
289 } else ((void) 0)
290
291 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
292
293
294 /* non-export function prototypes */
295 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
296 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
297 List *options);
298 static void EndCopy(CopyState cstate);
299 static void ClosePipeToProgram(CopyState cstate);
300 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
301 Oid queryRelId, const char *filename, bool is_program,
302 List *attnamelist, List *options);
303 static void EndCopyTo(CopyState cstate);
304 static uint64 DoCopyTo(CopyState cstate);
305 static uint64 CopyTo(CopyState cstate);
306 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
307 Datum *values, bool *nulls);
308 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
309 CommandId mycid, int hi_options,
310 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
311 BulkInsertState bistate,
312 int nBufferedTuples, HeapTuple *bufferedTuples,
313 uint64 firstBufferedLineNo);
314 static bool CopyReadLine(CopyState cstate);
315 static bool CopyReadLineText(CopyState cstate);
316 static int CopyReadAttributesText(CopyState cstate);
317 static int CopyReadAttributesCSV(CopyState cstate);
318 static Datum CopyReadBinaryAttribute(CopyState cstate,
319 int column_no, FmgrInfo *flinfo,
320 Oid typioparam, int32 typmod,
321 bool *isnull);
322 static void CopyAttributeOutText(CopyState cstate, char *string);
323 static void CopyAttributeOutCSV(CopyState cstate, char *string,
324 bool use_quote, bool single_attr);
325 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
326 List *attnamelist);
327 static char *limit_printout_length(const char *str);
328
329 /* Low-level communications functions */
330 static void SendCopyBegin(CopyState cstate);
331 static void ReceiveCopyBegin(CopyState cstate);
332 static void SendCopyEnd(CopyState cstate);
333 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
334 static void CopySendString(CopyState cstate, const char *str);
335 static void CopySendChar(CopyState cstate, char c);
336 static void CopySendEndOfRow(CopyState cstate);
337 static int CopyGetData(CopyState cstate, void *databuf,
338 int minread, int maxread);
339 static void CopySendInt32(CopyState cstate, int32 val);
340 static bool CopyGetInt32(CopyState cstate, int32 *val);
341 static void CopySendInt16(CopyState cstate, int16 val);
342 static bool CopyGetInt16(CopyState cstate, int16 *val);
343
344
345 /*
346 * Send copy start/stop messages for frontend copies. These have changed
347 * in past protocol redesigns.
348 */
349 static void
SendCopyBegin(CopyState cstate)350 SendCopyBegin(CopyState cstate)
351 {
352 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
353 {
354 /* new way */
355 StringInfoData buf;
356 int natts = list_length(cstate->attnumlist);
357 int16 format = (cstate->binary ? 1 : 0);
358 int i;
359
360 pq_beginmessage(&buf, 'H');
361 pq_sendbyte(&buf, format); /* overall format */
362 pq_sendint16(&buf, natts);
363 for (i = 0; i < natts; i++)
364 pq_sendint16(&buf, format); /* per-column formats */
365 pq_endmessage(&buf);
366 cstate->copy_dest = COPY_NEW_FE;
367 }
368 else
369 {
370 /* old way */
371 if (cstate->binary)
372 ereport(ERROR,
373 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374 errmsg("COPY BINARY is not supported to stdout or from stdin")));
375 pq_putemptymessage('H');
376 /* grottiness needed for old COPY OUT protocol */
377 pq_startcopyout();
378 cstate->copy_dest = COPY_OLD_FE;
379 }
380 }
381
382 static void
ReceiveCopyBegin(CopyState cstate)383 ReceiveCopyBegin(CopyState cstate)
384 {
385 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
386 {
387 /* new way */
388 StringInfoData buf;
389 int natts = list_length(cstate->attnumlist);
390 int16 format = (cstate->binary ? 1 : 0);
391 int i;
392
393 pq_beginmessage(&buf, 'G');
394 pq_sendbyte(&buf, format); /* overall format */
395 pq_sendint16(&buf, natts);
396 for (i = 0; i < natts; i++)
397 pq_sendint16(&buf, format); /* per-column formats */
398 pq_endmessage(&buf);
399 cstate->copy_dest = COPY_NEW_FE;
400 cstate->fe_msgbuf = makeStringInfo();
401 }
402 else
403 {
404 /* old way */
405 if (cstate->binary)
406 ereport(ERROR,
407 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 errmsg("COPY BINARY is not supported to stdout or from stdin")));
409 pq_putemptymessage('G');
410 /* any error in old protocol will make us lose sync */
411 pq_startmsgread();
412 cstate->copy_dest = COPY_OLD_FE;
413 }
414 /* We *must* flush here to ensure FE knows it can send. */
415 pq_flush();
416 }
417
418 static void
SendCopyEnd(CopyState cstate)419 SendCopyEnd(CopyState cstate)
420 {
421 if (cstate->copy_dest == COPY_NEW_FE)
422 {
423 /* Shouldn't have any unsent data */
424 Assert(cstate->fe_msgbuf->len == 0);
425 /* Send Copy Done message */
426 pq_putemptymessage('c');
427 }
428 else
429 {
430 CopySendData(cstate, "\\.", 2);
431 /* Need to flush out the trailer (this also appends a newline) */
432 CopySendEndOfRow(cstate);
433 pq_endcopyout(false);
434 }
435 }
436
437 /*----------
438 * CopySendData sends output data to the destination (file or frontend)
439 * CopySendString does the same for null-terminated strings
440 * CopySendChar does the same for single characters
441 * CopySendEndOfRow does the appropriate thing at end of each data row
442 * (data is not actually flushed except by CopySendEndOfRow)
443 *
444 * NB: no data conversion is applied by these functions
445 *----------
446 */
447 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)448 CopySendData(CopyState cstate, const void *databuf, int datasize)
449 {
450 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
451 }
452
453 static void
CopySendString(CopyState cstate,const char * str)454 CopySendString(CopyState cstate, const char *str)
455 {
456 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
457 }
458
459 static void
CopySendChar(CopyState cstate,char c)460 CopySendChar(CopyState cstate, char c)
461 {
462 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
463 }
464
465 static void
CopySendEndOfRow(CopyState cstate)466 CopySendEndOfRow(CopyState cstate)
467 {
468 StringInfo fe_msgbuf = cstate->fe_msgbuf;
469
470 switch (cstate->copy_dest)
471 {
472 case COPY_FILE:
473 if (!cstate->binary)
474 {
475 /* Default line termination depends on platform */
476 #ifndef WIN32
477 CopySendChar(cstate, '\n');
478 #else
479 CopySendString(cstate, "\r\n");
480 #endif
481 }
482
483 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
484 cstate->copy_file) != 1 ||
485 ferror(cstate->copy_file))
486 {
487 if (cstate->is_program)
488 {
489 if (errno == EPIPE)
490 {
491 /*
492 * The pipe will be closed automatically on error at
493 * the end of transaction, but we might get a better
494 * error message from the subprocess' exit code than
495 * just "Broken Pipe"
496 */
497 ClosePipeToProgram(cstate);
498
499 /*
500 * If ClosePipeToProgram() didn't throw an error, the
501 * program terminated normally, but closed the pipe
502 * first. Restore errno, and throw an error.
503 */
504 errno = EPIPE;
505 }
506 ereport(ERROR,
507 (errcode_for_file_access(),
508 errmsg("could not write to COPY program: %m")));
509 }
510 else
511 ereport(ERROR,
512 (errcode_for_file_access(),
513 errmsg("could not write to COPY file: %m")));
514 }
515 break;
516 case COPY_OLD_FE:
517 /* The FE/BE protocol uses \n as newline for all platforms */
518 if (!cstate->binary)
519 CopySendChar(cstate, '\n');
520
521 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
522 {
523 /* no hope of recovering connection sync, so FATAL */
524 ereport(FATAL,
525 (errcode(ERRCODE_CONNECTION_FAILURE),
526 errmsg("connection lost during COPY to stdout")));
527 }
528 break;
529 case COPY_NEW_FE:
530 /* The FE/BE protocol uses \n as newline for all platforms */
531 if (!cstate->binary)
532 CopySendChar(cstate, '\n');
533
534 /* Dump the accumulated row as one CopyData message */
535 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
536 break;
537 case COPY_CALLBACK:
538 Assert(false); /* Not yet supported. */
539 break;
540 }
541
542 resetStringInfo(fe_msgbuf);
543 }
544
545 /*
546 * CopyGetData reads data from the source (file or frontend)
547 *
548 * We attempt to read at least minread, and at most maxread, bytes from
549 * the source. The actual number of bytes read is returned; if this is
550 * less than minread, EOF was detected.
551 *
552 * Note: when copying from the frontend, we expect a proper EOF mark per
553 * protocol; if the frontend simply drops the connection, we raise error.
554 * It seems unwise to allow the COPY IN to complete normally in that case.
555 *
556 * NB: no data conversion is applied here.
557 */
558 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)559 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
560 {
561 int bytesread = 0;
562
563 switch (cstate->copy_dest)
564 {
565 case COPY_FILE:
566 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
567 if (ferror(cstate->copy_file))
568 ereport(ERROR,
569 (errcode_for_file_access(),
570 errmsg("could not read from COPY file: %m")));
571 if (bytesread == 0)
572 cstate->reached_eof = true;
573 break;
574 case COPY_OLD_FE:
575
576 /*
577 * We cannot read more than minread bytes (which in practice is 1)
578 * because old protocol doesn't have any clear way of separating
579 * the COPY stream from following data. This is slow, but not any
580 * slower than the code path was originally, and we don't care
581 * much anymore about the performance of old protocol.
582 */
583 if (pq_getbytes((char *) databuf, minread))
584 {
585 /* Only a \. terminator is legal EOF in old protocol */
586 ereport(ERROR,
587 (errcode(ERRCODE_CONNECTION_FAILURE),
588 errmsg("unexpected EOF on client connection with an open transaction")));
589 }
590 bytesread = minread;
591 break;
592 case COPY_NEW_FE:
593 while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
594 {
595 int avail;
596
597 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
598 {
599 /* Try to receive another message */
600 int mtype;
601
602 readmessage:
603 HOLD_CANCEL_INTERRUPTS();
604 pq_startmsgread();
605 mtype = pq_getbyte();
606 if (mtype == EOF)
607 ereport(ERROR,
608 (errcode(ERRCODE_CONNECTION_FAILURE),
609 errmsg("unexpected EOF on client connection with an open transaction")));
610 if (pq_getmessage(cstate->fe_msgbuf, 0))
611 ereport(ERROR,
612 (errcode(ERRCODE_CONNECTION_FAILURE),
613 errmsg("unexpected EOF on client connection with an open transaction")));
614 RESUME_CANCEL_INTERRUPTS();
615 switch (mtype)
616 {
617 case 'd': /* CopyData */
618 break;
619 case 'c': /* CopyDone */
620 /* COPY IN correctly terminated by frontend */
621 cstate->reached_eof = true;
622 return bytesread;
623 case 'f': /* CopyFail */
624 ereport(ERROR,
625 (errcode(ERRCODE_QUERY_CANCELED),
626 errmsg("COPY from stdin failed: %s",
627 pq_getmsgstring(cstate->fe_msgbuf))));
628 break;
629 case 'H': /* Flush */
630 case 'S': /* Sync */
631
632 /*
633 * Ignore Flush/Sync for the convenience of client
634 * libraries (such as libpq) that may send those
635 * without noticing that the command they just
636 * sent was COPY.
637 */
638 goto readmessage;
639 default:
640 ereport(ERROR,
641 (errcode(ERRCODE_PROTOCOL_VIOLATION),
642 errmsg("unexpected message type 0x%02X during COPY from stdin",
643 mtype)));
644 break;
645 }
646 }
647 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
648 if (avail > maxread)
649 avail = maxread;
650 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
651 databuf = (void *) ((char *) databuf + avail);
652 maxread -= avail;
653 bytesread += avail;
654 }
655 break;
656 case COPY_CALLBACK:
657 bytesread = cstate->data_source_cb(databuf, minread, maxread);
658 break;
659 }
660
661 return bytesread;
662 }
663
664
665 /*
666 * These functions do apply some data conversion
667 */
668
669 /*
670 * CopySendInt32 sends an int32 in network byte order
671 */
672 static void
CopySendInt32(CopyState cstate,int32 val)673 CopySendInt32(CopyState cstate, int32 val)
674 {
675 uint32 buf;
676
677 buf = pg_hton32((uint32) val);
678 CopySendData(cstate, &buf, sizeof(buf));
679 }
680
681 /*
682 * CopyGetInt32 reads an int32 that appears in network byte order
683 *
684 * Returns true if OK, false if EOF
685 */
686 static bool
CopyGetInt32(CopyState cstate,int32 * val)687 CopyGetInt32(CopyState cstate, int32 *val)
688 {
689 uint32 buf;
690
691 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
692 {
693 *val = 0; /* suppress compiler warning */
694 return false;
695 }
696 *val = (int32) pg_ntoh32(buf);
697 return true;
698 }
699
700 /*
701 * CopySendInt16 sends an int16 in network byte order
702 */
703 static void
CopySendInt16(CopyState cstate,int16 val)704 CopySendInt16(CopyState cstate, int16 val)
705 {
706 uint16 buf;
707
708 buf = pg_hton16((uint16) val);
709 CopySendData(cstate, &buf, sizeof(buf));
710 }
711
712 /*
713 * CopyGetInt16 reads an int16 that appears in network byte order
714 */
715 static bool
CopyGetInt16(CopyState cstate,int16 * val)716 CopyGetInt16(CopyState cstate, int16 *val)
717 {
718 uint16 buf;
719
720 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
721 {
722 *val = 0; /* suppress compiler warning */
723 return false;
724 }
725 *val = (int16) pg_ntoh16(buf);
726 return true;
727 }
728
729
730 /*
731 * CopyLoadRawBuf loads some more data into raw_buf
732 *
733 * Returns true if able to obtain at least one more byte, else false.
734 *
735 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
736 * down to the start of the buffer and then we load more data after that.
737 * This case is used only when a frontend multibyte character crosses a
738 * bufferload boundary.
739 */
740 static bool
CopyLoadRawBuf(CopyState cstate)741 CopyLoadRawBuf(CopyState cstate)
742 {
743 int nbytes;
744 int inbytes;
745
746 if (cstate->raw_buf_index < cstate->raw_buf_len)
747 {
748 /* Copy down the unprocessed data */
749 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
750 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
751 nbytes);
752 }
753 else
754 nbytes = 0; /* no data need be saved */
755
756 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
757 1, RAW_BUF_SIZE - nbytes);
758 nbytes += inbytes;
759 cstate->raw_buf[nbytes] = '\0';
760 cstate->raw_buf_index = 0;
761 cstate->raw_buf_len = nbytes;
762 return (inbytes > 0);
763 }
764
765
766 /*
767 * DoCopy executes the SQL COPY statement
768 *
769 * Either unload or reload contents of table <relation>, depending on <from>.
770 * (<from> = true means we are inserting into the table.) In the "TO" case
771 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
772 * or DELETE query.
773 *
774 * If <pipe> is false, transfer is between the table and the file named
775 * <filename>. Otherwise, transfer is between the table and our regular
776 * input/output stream. The latter could be either stdin/stdout or a
777 * socket, depending on whether we're running under Postmaster control.
778 *
779 * Do not allow a Postgres user without the 'pg_access_server_files' role to
780 * read from or write to a file.
781 *
782 * Do not allow the copy if user doesn't have proper permission to access
783 * the table or the specifically requested columns.
784 */
785 void
DoCopy(ParseState * pstate,const CopyStmt * stmt,int stmt_location,int stmt_len,uint64 * processed)786 DoCopy(ParseState *pstate, const CopyStmt *stmt,
787 int stmt_location, int stmt_len,
788 uint64 *processed)
789 {
790 CopyState cstate;
791 bool is_from = stmt->is_from;
792 bool pipe = (stmt->filename == NULL);
793 Relation rel;
794 Oid relid;
795 RawStmt *query = NULL;
796
797 /*
798 * Disallow COPY to/from file or program except to users with the
799 * appropriate role.
800 */
801 if (!pipe)
802 {
803 if (stmt->is_program)
804 {
805 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
806 ereport(ERROR,
807 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
808 errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
809 errhint("Anyone can COPY to stdout or from stdin. "
810 "psql's \\copy command also works for anyone.")));
811 }
812 else
813 {
814 if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
815 ereport(ERROR,
816 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
817 errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
818 errhint("Anyone can COPY to stdout or from stdin. "
819 "psql's \\copy command also works for anyone.")));
820
821 if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES))
822 ereport(ERROR,
823 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
824 errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
825 errhint("Anyone can COPY to stdout or from stdin. "
826 "psql's \\copy command also works for anyone.")));
827 }
828 }
829
830 if (stmt->relation)
831 {
832 TupleDesc tupDesc;
833 List *attnums;
834 ListCell *cur;
835 RangeTblEntry *rte;
836
837 Assert(!stmt->query);
838
839 /* Open and lock the relation, using the appropriate lock type. */
840 rel = heap_openrv(stmt->relation,
841 (is_from ? RowExclusiveLock : AccessShareLock));
842
843 relid = RelationGetRelid(rel);
844
845 rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
846 rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
847
848 tupDesc = RelationGetDescr(rel);
849 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
850 foreach(cur, attnums)
851 {
852 int attno = lfirst_int(cur) -
853 FirstLowInvalidHeapAttributeNumber;
854
855 if (is_from)
856 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
857 else
858 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
859 }
860 ExecCheckRTPerms(pstate->p_rtable, true);
861
862 /*
863 * Permission check for row security policies.
864 *
865 * check_enable_rls will ereport(ERROR) if the user has requested
866 * something invalid and will otherwise indicate if we should enable
867 * RLS (returns RLS_ENABLED) or not for this COPY statement.
868 *
869 * If the relation has a row security policy and we are to apply it
870 * then perform a "query" copy and allow the normal query processing
871 * to handle the policies.
872 *
873 * If RLS is not enabled for this, then just fall through to the
874 * normal non-filtering relation handling.
875 */
876 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
877 {
878 SelectStmt *select;
879 ColumnRef *cr;
880 ResTarget *target;
881 RangeVar *from;
882 List *targetList = NIL;
883
884 if (is_from)
885 ereport(ERROR,
886 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
887 errmsg("COPY FROM not supported with row-level security"),
888 errhint("Use INSERT statements instead.")));
889
890 /*
891 * Build target list
892 *
893 * If no columns are specified in the attribute list of the COPY
894 * command, then the target list is 'all' columns. Therefore, '*'
895 * should be used as the target list for the resulting SELECT
896 * statement.
897 *
898 * In the case that columns are specified in the attribute list,
899 * create a ColumnRef and ResTarget for each column and add them
900 * to the target list for the resulting SELECT statement.
901 */
902 if (!stmt->attlist)
903 {
904 cr = makeNode(ColumnRef);
905 cr->fields = list_make1(makeNode(A_Star));
906 cr->location = -1;
907
908 target = makeNode(ResTarget);
909 target->name = NULL;
910 target->indirection = NIL;
911 target->val = (Node *) cr;
912 target->location = -1;
913
914 targetList = list_make1(target);
915 }
916 else
917 {
918 ListCell *lc;
919
920 foreach(lc, stmt->attlist)
921 {
922 /*
923 * Build the ColumnRef for each column. The ColumnRef
924 * 'fields' property is a String 'Value' node (see
925 * nodes/value.h) that corresponds to the column name
926 * respectively.
927 */
928 cr = makeNode(ColumnRef);
929 cr->fields = list_make1(lfirst(lc));
930 cr->location = -1;
931
932 /* Build the ResTarget and add the ColumnRef to it. */
933 target = makeNode(ResTarget);
934 target->name = NULL;
935 target->indirection = NIL;
936 target->val = (Node *) cr;
937 target->location = -1;
938
939 /* Add each column to the SELECT statement's target list */
940 targetList = lappend(targetList, target);
941 }
942 }
943
944 /*
945 * Build RangeVar for from clause, fully qualified based on the
946 * relation which we have opened and locked.
947 */
948 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
949 pstrdup(RelationGetRelationName(rel)),
950 -1);
951
952 /* Build query */
953 select = makeNode(SelectStmt);
954 select->targetList = targetList;
955 select->fromClause = list_make1(from);
956
957 query = makeNode(RawStmt);
958 query->stmt = (Node *) select;
959 query->stmt_location = stmt_location;
960 query->stmt_len = stmt_len;
961
962 /*
963 * Close the relation for now, but keep the lock on it to prevent
964 * changes between now and when we start the query-based COPY.
965 *
966 * We'll reopen it later as part of the query-based COPY.
967 */
968 heap_close(rel, NoLock);
969 rel = NULL;
970 }
971 }
972 else
973 {
974 Assert(stmt->query);
975
976 query = makeNode(RawStmt);
977 query->stmt = stmt->query;
978 query->stmt_location = stmt_location;
979 query->stmt_len = stmt_len;
980
981 relid = InvalidOid;
982 rel = NULL;
983 }
984
985 if (is_from)
986 {
987 Assert(rel);
988
989 /* check read-only transaction and parallel mode */
990 if (XactReadOnly && !rel->rd_islocaltemp)
991 PreventCommandIfReadOnly("COPY FROM");
992 PreventCommandIfParallelMode("COPY FROM");
993
994 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
995 NULL, stmt->attlist, stmt->options);
996 *processed = CopyFrom(cstate); /* copy from file to database */
997 EndCopyFrom(cstate);
998 }
999 else
1000 {
1001 cstate = BeginCopyTo(pstate, rel, query, relid,
1002 stmt->filename, stmt->is_program,
1003 stmt->attlist, stmt->options);
1004 *processed = DoCopyTo(cstate); /* copy from database to file */
1005 EndCopyTo(cstate);
1006 }
1007
1008 /*
1009 * Close the relation. If reading, we can release the AccessShareLock we
1010 * got; if writing, we should hold the lock until end of transaction to
1011 * ensure that updates will be committed before lock is released.
1012 */
1013 if (rel != NULL)
1014 heap_close(rel, (is_from ? NoLock : AccessShareLock));
1015 }
1016
1017 /*
1018 * Process the statement option list for COPY.
1019 *
1020 * Scan the options list (a list of DefElem) and transpose the information
1021 * into cstate, applying appropriate error checking.
1022 *
1023 * cstate is assumed to be filled with zeroes initially.
1024 *
1025 * This is exported so that external users of the COPY API can sanity-check
1026 * a list of options. In that usage, cstate should be passed as NULL
1027 * (since external users don't know sizeof(CopyStateData)) and the collected
1028 * data is just leaked until CurrentMemoryContext is reset.
1029 *
1030 * Note that additional checking, such as whether column names listed in FORCE
1031 * QUOTE actually exist, has to be applied later. This just checks for
1032 * self-consistency of the options list.
1033 */
1034 void
ProcessCopyOptions(ParseState * pstate,CopyState cstate,bool is_from,List * options)1035 ProcessCopyOptions(ParseState *pstate,
1036 CopyState cstate,
1037 bool is_from,
1038 List *options)
1039 {
1040 bool format_specified = false;
1041 ListCell *option;
1042
1043 /* Support external use for option sanity checking */
1044 if (cstate == NULL)
1045 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1046
1047 cstate->is_copy_from = is_from;
1048
1049 cstate->file_encoding = -1;
1050
1051 /* Extract options from the statement node tree */
1052 foreach(option, options)
1053 {
1054 DefElem *defel = lfirst_node(DefElem, option);
1055
1056 if (strcmp(defel->defname, "format") == 0)
1057 {
1058 char *fmt = defGetString(defel);
1059
1060 if (format_specified)
1061 ereport(ERROR,
1062 (errcode(ERRCODE_SYNTAX_ERROR),
1063 errmsg("conflicting or redundant options"),
1064 parser_errposition(pstate, defel->location)));
1065 format_specified = true;
1066 if (strcmp(fmt, "text") == 0)
1067 /* default format */ ;
1068 else if (strcmp(fmt, "csv") == 0)
1069 cstate->csv_mode = true;
1070 else if (strcmp(fmt, "binary") == 0)
1071 cstate->binary = true;
1072 else
1073 ereport(ERROR,
1074 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1075 errmsg("COPY format \"%s\" not recognized", fmt),
1076 parser_errposition(pstate, defel->location)));
1077 }
1078 else if (strcmp(defel->defname, "oids") == 0)
1079 {
1080 if (cstate->oids)
1081 ereport(ERROR,
1082 (errcode(ERRCODE_SYNTAX_ERROR),
1083 errmsg("conflicting or redundant options"),
1084 parser_errposition(pstate, defel->location)));
1085 cstate->oids = defGetBoolean(defel);
1086 }
1087 else if (strcmp(defel->defname, "freeze") == 0)
1088 {
1089 if (cstate->freeze)
1090 ereport(ERROR,
1091 (errcode(ERRCODE_SYNTAX_ERROR),
1092 errmsg("conflicting or redundant options"),
1093 parser_errposition(pstate, defel->location)));
1094 cstate->freeze = defGetBoolean(defel);
1095 }
1096 else if (strcmp(defel->defname, "delimiter") == 0)
1097 {
1098 if (cstate->delim)
1099 ereport(ERROR,
1100 (errcode(ERRCODE_SYNTAX_ERROR),
1101 errmsg("conflicting or redundant options"),
1102 parser_errposition(pstate, defel->location)));
1103 cstate->delim = defGetString(defel);
1104 }
1105 else if (strcmp(defel->defname, "null") == 0)
1106 {
1107 if (cstate->null_print)
1108 ereport(ERROR,
1109 (errcode(ERRCODE_SYNTAX_ERROR),
1110 errmsg("conflicting or redundant options"),
1111 parser_errposition(pstate, defel->location)));
1112 cstate->null_print = defGetString(defel);
1113 }
1114 else if (strcmp(defel->defname, "header") == 0)
1115 {
1116 if (cstate->header_line)
1117 ereport(ERROR,
1118 (errcode(ERRCODE_SYNTAX_ERROR),
1119 errmsg("conflicting or redundant options"),
1120 parser_errposition(pstate, defel->location)));
1121 cstate->header_line = defGetBoolean(defel);
1122 }
1123 else if (strcmp(defel->defname, "quote") == 0)
1124 {
1125 if (cstate->quote)
1126 ereport(ERROR,
1127 (errcode(ERRCODE_SYNTAX_ERROR),
1128 errmsg("conflicting or redundant options"),
1129 parser_errposition(pstate, defel->location)));
1130 cstate->quote = defGetString(defel);
1131 }
1132 else if (strcmp(defel->defname, "escape") == 0)
1133 {
1134 if (cstate->escape)
1135 ereport(ERROR,
1136 (errcode(ERRCODE_SYNTAX_ERROR),
1137 errmsg("conflicting or redundant options"),
1138 parser_errposition(pstate, defel->location)));
1139 cstate->escape = defGetString(defel);
1140 }
1141 else if (strcmp(defel->defname, "force_quote") == 0)
1142 {
1143 if (cstate->force_quote || cstate->force_quote_all)
1144 ereport(ERROR,
1145 (errcode(ERRCODE_SYNTAX_ERROR),
1146 errmsg("conflicting or redundant options"),
1147 parser_errposition(pstate, defel->location)));
1148 if (defel->arg && IsA(defel->arg, A_Star))
1149 cstate->force_quote_all = true;
1150 else if (defel->arg && IsA(defel->arg, List))
1151 cstate->force_quote = castNode(List, defel->arg);
1152 else
1153 ereport(ERROR,
1154 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1155 errmsg("argument to option \"%s\" must be a list of column names",
1156 defel->defname),
1157 parser_errposition(pstate, defel->location)));
1158 }
1159 else if (strcmp(defel->defname, "force_not_null") == 0)
1160 {
1161 if (cstate->force_notnull)
1162 ereport(ERROR,
1163 (errcode(ERRCODE_SYNTAX_ERROR),
1164 errmsg("conflicting or redundant options"),
1165 parser_errposition(pstate, defel->location)));
1166 if (defel->arg && IsA(defel->arg, List))
1167 cstate->force_notnull = castNode(List, defel->arg);
1168 else
1169 ereport(ERROR,
1170 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1171 errmsg("argument to option \"%s\" must be a list of column names",
1172 defel->defname),
1173 parser_errposition(pstate, defel->location)));
1174 }
1175 else if (strcmp(defel->defname, "force_null") == 0)
1176 {
1177 if (cstate->force_null)
1178 ereport(ERROR,
1179 (errcode(ERRCODE_SYNTAX_ERROR),
1180 errmsg("conflicting or redundant options")));
1181 if (defel->arg && IsA(defel->arg, List))
1182 cstate->force_null = castNode(List, defel->arg);
1183 else
1184 ereport(ERROR,
1185 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1186 errmsg("argument to option \"%s\" must be a list of column names",
1187 defel->defname),
1188 parser_errposition(pstate, defel->location)));
1189 }
1190 else if (strcmp(defel->defname, "convert_selectively") == 0)
1191 {
1192 /*
1193 * Undocumented, not-accessible-from-SQL option: convert only the
1194 * named columns to binary form, storing the rest as NULLs. It's
1195 * allowed for the column list to be NIL.
1196 */
1197 if (cstate->convert_selectively)
1198 ereport(ERROR,
1199 (errcode(ERRCODE_SYNTAX_ERROR),
1200 errmsg("conflicting or redundant options"),
1201 parser_errposition(pstate, defel->location)));
1202 cstate->convert_selectively = true;
1203 if (defel->arg == NULL || IsA(defel->arg, List))
1204 cstate->convert_select = castNode(List, defel->arg);
1205 else
1206 ereport(ERROR,
1207 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1208 errmsg("argument to option \"%s\" must be a list of column names",
1209 defel->defname),
1210 parser_errposition(pstate, defel->location)));
1211 }
1212 else if (strcmp(defel->defname, "encoding") == 0)
1213 {
1214 if (cstate->file_encoding >= 0)
1215 ereport(ERROR,
1216 (errcode(ERRCODE_SYNTAX_ERROR),
1217 errmsg("conflicting or redundant options"),
1218 parser_errposition(pstate, defel->location)));
1219 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1220 if (cstate->file_encoding < 0)
1221 ereport(ERROR,
1222 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1223 errmsg("argument to option \"%s\" must be a valid encoding name",
1224 defel->defname),
1225 parser_errposition(pstate, defel->location)));
1226 }
1227 else
1228 ereport(ERROR,
1229 (errcode(ERRCODE_SYNTAX_ERROR),
1230 errmsg("option \"%s\" not recognized",
1231 defel->defname),
1232 parser_errposition(pstate, defel->location)));
1233 }
1234
1235 /*
1236 * Check for incompatible options (must do these two before inserting
1237 * defaults)
1238 */
1239 if (cstate->binary && cstate->delim)
1240 ereport(ERROR,
1241 (errcode(ERRCODE_SYNTAX_ERROR),
1242 errmsg("cannot specify DELIMITER in BINARY mode")));
1243
1244 if (cstate->binary && cstate->null_print)
1245 ereport(ERROR,
1246 (errcode(ERRCODE_SYNTAX_ERROR),
1247 errmsg("cannot specify NULL in BINARY mode")));
1248
1249 /* Set defaults for omitted options */
1250 if (!cstate->delim)
1251 cstate->delim = cstate->csv_mode ? "," : "\t";
1252
1253 if (!cstate->null_print)
1254 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1255 cstate->null_print_len = strlen(cstate->null_print);
1256
1257 if (cstate->csv_mode)
1258 {
1259 if (!cstate->quote)
1260 cstate->quote = "\"";
1261 if (!cstate->escape)
1262 cstate->escape = cstate->quote;
1263 }
1264
1265 /* Only single-byte delimiter strings are supported. */
1266 if (strlen(cstate->delim) != 1)
1267 ereport(ERROR,
1268 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1269 errmsg("COPY delimiter must be a single one-byte character")));
1270
1271 /* Disallow end-of-line characters */
1272 if (strchr(cstate->delim, '\r') != NULL ||
1273 strchr(cstate->delim, '\n') != NULL)
1274 ereport(ERROR,
1275 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1276 errmsg("COPY delimiter cannot be newline or carriage return")));
1277
1278 if (strchr(cstate->null_print, '\r') != NULL ||
1279 strchr(cstate->null_print, '\n') != NULL)
1280 ereport(ERROR,
1281 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1282 errmsg("COPY null representation cannot use newline or carriage return")));
1283
1284 /*
1285 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1286 * backslash because it would be ambiguous. We can't allow the other
1287 * cases because data characters matching the delimiter must be
1288 * backslashed, and certain backslash combinations are interpreted
1289 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1290 * more than strictly necessary, but seems best for consistency and
1291 * future-proofing. Likewise we disallow all digits though only octal
1292 * digits are actually dangerous.
1293 */
1294 if (!cstate->csv_mode &&
1295 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1296 cstate->delim[0]) != NULL)
1297 ereport(ERROR,
1298 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1299 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1300
1301 /* Check header */
1302 if (!cstate->csv_mode && cstate->header_line)
1303 ereport(ERROR,
1304 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1305 errmsg("COPY HEADER available only in CSV mode")));
1306
1307 /* Check quote */
1308 if (!cstate->csv_mode && cstate->quote != NULL)
1309 ereport(ERROR,
1310 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1311 errmsg("COPY quote available only in CSV mode")));
1312
1313 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1314 ereport(ERROR,
1315 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1316 errmsg("COPY quote must be a single one-byte character")));
1317
1318 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1319 ereport(ERROR,
1320 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1321 errmsg("COPY delimiter and quote must be different")));
1322
1323 /* Check escape */
1324 if (!cstate->csv_mode && cstate->escape != NULL)
1325 ereport(ERROR,
1326 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 errmsg("COPY escape available only in CSV mode")));
1328
1329 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1330 ereport(ERROR,
1331 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1332 errmsg("COPY escape must be a single one-byte character")));
1333
1334 /* Check force_quote */
1335 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1336 ereport(ERROR,
1337 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338 errmsg("COPY force quote available only in CSV mode")));
1339 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1340 ereport(ERROR,
1341 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 errmsg("COPY force quote only available using COPY TO")));
1343
1344 /* Check force_notnull */
1345 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1346 ereport(ERROR,
1347 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 errmsg("COPY force not null available only in CSV mode")));
1349 if (cstate->force_notnull != NIL && !is_from)
1350 ereport(ERROR,
1351 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1352 errmsg("COPY force not null only available using COPY FROM")));
1353
1354 /* Check force_null */
1355 if (!cstate->csv_mode && cstate->force_null != NIL)
1356 ereport(ERROR,
1357 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1358 errmsg("COPY force null available only in CSV mode")));
1359
1360 if (cstate->force_null != NIL && !is_from)
1361 ereport(ERROR,
1362 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1363 errmsg("COPY force null only available using COPY FROM")));
1364
1365 /* Don't allow the delimiter to appear in the null string. */
1366 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1367 ereport(ERROR,
1368 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1369 errmsg("COPY delimiter must not appear in the NULL specification")));
1370
1371 /* Don't allow the CSV quote char to appear in the null string. */
1372 if (cstate->csv_mode &&
1373 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1374 ereport(ERROR,
1375 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1376 errmsg("CSV quote character must not appear in the NULL specification")));
1377 }
1378
1379 /*
1380 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1381 *
1382 * Iff <binary>, unload or reload in the binary format, as opposed to the
1383 * more wasteful but more robust and portable text format.
1384 *
1385 * Iff <oids>, unload or reload the format that includes OID information.
1386 * On input, we accept OIDs whether or not the table has an OID column,
1387 * but silently drop them if it does not. On output, we report an error
1388 * if the user asks for OIDs in a table that has none (not providing an
1389 * OID column might seem friendlier, but could seriously confuse programs).
1390 *
1391 * If in the text format, delimit columns with delimiter <delim> and print
1392 * NULL values as <null_print>.
1393 */
1394 static CopyState
BeginCopy(ParseState * pstate,bool is_from,Relation rel,RawStmt * raw_query,Oid queryRelId,List * attnamelist,List * options)1395 BeginCopy(ParseState *pstate,
1396 bool is_from,
1397 Relation rel,
1398 RawStmt *raw_query,
1399 Oid queryRelId,
1400 List *attnamelist,
1401 List *options)
1402 {
1403 CopyState cstate;
1404 TupleDesc tupDesc;
1405 int num_phys_attrs;
1406 MemoryContext oldcontext;
1407
1408 /* Allocate workspace and zero all fields */
1409 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1410
1411 /*
1412 * We allocate everything used by a cstate in a new memory context. This
1413 * avoids memory leaks during repeated use of COPY in a query.
1414 */
1415 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1416 "COPY",
1417 ALLOCSET_DEFAULT_SIZES);
1418
1419 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1420
1421 /* Extract options from the statement node tree */
1422 ProcessCopyOptions(pstate, cstate, is_from, options);
1423
1424 /* Process the source/target relation or query */
1425 if (rel)
1426 {
1427 Assert(!raw_query);
1428
1429 cstate->rel = rel;
1430
1431 tupDesc = RelationGetDescr(cstate->rel);
1432
1433 /* Don't allow COPY w/ OIDs to or from a table without them */
1434 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1435 ereport(ERROR,
1436 (errcode(ERRCODE_UNDEFINED_COLUMN),
1437 errmsg("table \"%s\" does not have OIDs",
1438 RelationGetRelationName(cstate->rel))));
1439 }
1440 else
1441 {
1442 List *rewritten;
1443 Query *query;
1444 PlannedStmt *plan;
1445 DestReceiver *dest;
1446
1447 Assert(!is_from);
1448 cstate->rel = NULL;
1449
1450 /* Don't allow COPY w/ OIDs from a query */
1451 if (cstate->oids)
1452 ereport(ERROR,
1453 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1454 errmsg("COPY (query) WITH OIDS is not supported")));
1455
1456 /*
1457 * Run parse analysis and rewrite. Note this also acquires sufficient
1458 * locks on the source table(s).
1459 *
1460 * Because the parser and planner tend to scribble on their input, we
1461 * make a preliminary copy of the source querytree. This prevents
1462 * problems in the case that the COPY is in a portal or plpgsql
1463 * function and is executed repeatedly. (See also the same hack in
1464 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1465 */
1466 rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1467 pstate->p_sourcetext, NULL, 0,
1468 NULL);
1469
1470 /* check that we got back something we can work with */
1471 if (rewritten == NIL)
1472 {
1473 ereport(ERROR,
1474 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1475 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1476 }
1477 else if (list_length(rewritten) > 1)
1478 {
1479 ListCell *lc;
1480
1481 /* examine queries to determine which error message to issue */
1482 foreach(lc, rewritten)
1483 {
1484 Query *q = lfirst_node(Query, lc);
1485
1486 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1487 ereport(ERROR,
1488 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1489 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1490 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1491 ereport(ERROR,
1492 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1493 errmsg("DO ALSO rules are not supported for the COPY")));
1494 }
1495
1496 ereport(ERROR,
1497 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1498 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1499 }
1500
1501 query = linitial_node(Query, rewritten);
1502
1503 /* The grammar allows SELECT INTO, but we don't support that */
1504 if (query->utilityStmt != NULL &&
1505 IsA(query->utilityStmt, CreateTableAsStmt))
1506 ereport(ERROR,
1507 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1508 errmsg("COPY (SELECT INTO) is not supported")));
1509
1510 Assert(query->utilityStmt == NULL);
1511
1512 /*
1513 * Similarly the grammar doesn't enforce the presence of a RETURNING
1514 * clause, but this is required here.
1515 */
1516 if (query->commandType != CMD_SELECT &&
1517 query->returningList == NIL)
1518 {
1519 Assert(query->commandType == CMD_INSERT ||
1520 query->commandType == CMD_UPDATE ||
1521 query->commandType == CMD_DELETE);
1522
1523 ereport(ERROR,
1524 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1525 errmsg("COPY query must have a RETURNING clause")));
1526 }
1527
1528 /* plan the query */
1529 plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1530
1531 /*
1532 * With row level security and a user using "COPY relation TO", we
1533 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1534 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1535 * in any RLS clauses.
1536 *
1537 * When this happens, we are passed in the relid of the originally
1538 * found relation (which we have locked). As the planner will look up
1539 * the relation again, we double-check here to make sure it found the
1540 * same one that we have locked.
1541 */
1542 if (queryRelId != InvalidOid)
1543 {
1544 /*
1545 * Note that with RLS involved there may be multiple relations,
1546 * and while the one we need is almost certainly first, we don't
1547 * make any guarantees of that in the planner, so check the whole
1548 * list and make sure we find the original relation.
1549 */
1550 if (!list_member_oid(plan->relationOids, queryRelId))
1551 ereport(ERROR,
1552 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1553 errmsg("relation referenced by COPY statement has changed")));
1554 }
1555
1556 /*
1557 * Use a snapshot with an updated command ID to ensure this query sees
1558 * results of any previously executed queries.
1559 */
1560 PushCopiedSnapshot(GetActiveSnapshot());
1561 UpdateActiveSnapshotCommandId();
1562
1563 /* Create dest receiver for COPY OUT */
1564 dest = CreateDestReceiver(DestCopyOut);
1565 ((DR_copy *) dest)->cstate = cstate;
1566
1567 /* Create a QueryDesc requesting no output */
1568 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1569 GetActiveSnapshot(),
1570 InvalidSnapshot,
1571 dest, NULL, NULL, 0);
1572
1573 /*
1574 * Call ExecutorStart to prepare the plan for execution.
1575 *
1576 * ExecutorStart computes a result tupdesc for us
1577 */
1578 ExecutorStart(cstate->queryDesc, 0);
1579
1580 tupDesc = cstate->queryDesc->tupDesc;
1581 }
1582
1583 /* Generate or convert list of attributes to process */
1584 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1585
1586 num_phys_attrs = tupDesc->natts;
1587
1588 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1589 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1590 if (cstate->force_quote_all)
1591 {
1592 int i;
1593
1594 for (i = 0; i < num_phys_attrs; i++)
1595 cstate->force_quote_flags[i] = true;
1596 }
1597 else if (cstate->force_quote)
1598 {
1599 List *attnums;
1600 ListCell *cur;
1601
1602 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1603
1604 foreach(cur, attnums)
1605 {
1606 int attnum = lfirst_int(cur);
1607 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1608
1609 if (!list_member_int(cstate->attnumlist, attnum))
1610 ereport(ERROR,
1611 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1612 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1613 NameStr(attr->attname))));
1614 cstate->force_quote_flags[attnum - 1] = true;
1615 }
1616 }
1617
1618 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1619 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1620 if (cstate->force_notnull)
1621 {
1622 List *attnums;
1623 ListCell *cur;
1624
1625 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1626
1627 foreach(cur, attnums)
1628 {
1629 int attnum = lfirst_int(cur);
1630 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1631
1632 if (!list_member_int(cstate->attnumlist, attnum))
1633 ereport(ERROR,
1634 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1635 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1636 NameStr(attr->attname))));
1637 cstate->force_notnull_flags[attnum - 1] = true;
1638 }
1639 }
1640
1641 /* Convert FORCE_NULL name list to per-column flags, check validity */
1642 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1643 if (cstate->force_null)
1644 {
1645 List *attnums;
1646 ListCell *cur;
1647
1648 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1649
1650 foreach(cur, attnums)
1651 {
1652 int attnum = lfirst_int(cur);
1653 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1654
1655 if (!list_member_int(cstate->attnumlist, attnum))
1656 ereport(ERROR,
1657 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1658 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1659 NameStr(attr->attname))));
1660 cstate->force_null_flags[attnum - 1] = true;
1661 }
1662 }
1663
1664 /* Convert convert_selectively name list to per-column flags */
1665 if (cstate->convert_selectively)
1666 {
1667 List *attnums;
1668 ListCell *cur;
1669
1670 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1671
1672 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1673
1674 foreach(cur, attnums)
1675 {
1676 int attnum = lfirst_int(cur);
1677 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1678
1679 if (!list_member_int(cstate->attnumlist, attnum))
1680 ereport(ERROR,
1681 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1682 errmsg_internal("selected column \"%s\" not referenced by COPY",
1683 NameStr(attr->attname))));
1684 cstate->convert_select_flags[attnum - 1] = true;
1685 }
1686 }
1687
1688 /* Use client encoding when ENCODING option is not specified. */
1689 if (cstate->file_encoding < 0)
1690 cstate->file_encoding = pg_get_client_encoding();
1691
1692 /*
1693 * Set up encoding conversion info. Even if the file and server encodings
1694 * are the same, we must apply pg_any_to_server() to validate data in
1695 * multibyte encodings.
1696 */
1697 cstate->need_transcoding =
1698 (cstate->file_encoding != GetDatabaseEncoding() ||
1699 pg_database_encoding_max_length() > 1);
1700 /* See Multibyte encoding comment above */
1701 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1702
1703 cstate->copy_dest = COPY_FILE; /* default */
1704
1705 MemoryContextSwitchTo(oldcontext);
1706
1707 return cstate;
1708 }
1709
1710 /*
1711 * Closes the pipe to an external program, checking the pclose() return code.
1712 */
1713 static void
ClosePipeToProgram(CopyState cstate)1714 ClosePipeToProgram(CopyState cstate)
1715 {
1716 int pclose_rc;
1717
1718 Assert(cstate->is_program);
1719
1720 pclose_rc = ClosePipeStream(cstate->copy_file);
1721 if (pclose_rc == -1)
1722 ereport(ERROR,
1723 (errcode_for_file_access(),
1724 errmsg("could not close pipe to external command: %m")));
1725 else if (pclose_rc != 0)
1726 {
1727 /*
1728 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1729 * expectable for the called program to fail with SIGPIPE, and we
1730 * should not report that as an error. Otherwise, SIGPIPE indicates a
1731 * problem.
1732 */
1733 if (cstate->is_copy_from && !cstate->reached_eof &&
1734 wait_result_is_signal(pclose_rc, SIGPIPE))
1735 return;
1736
1737 ereport(ERROR,
1738 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1739 errmsg("program \"%s\" failed",
1740 cstate->filename),
1741 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1742 }
1743 }
1744
1745 /*
1746 * Release resources allocated in a cstate for COPY TO/FROM.
1747 */
1748 static void
EndCopy(CopyState cstate)1749 EndCopy(CopyState cstate)
1750 {
1751 if (cstate->is_program)
1752 {
1753 ClosePipeToProgram(cstate);
1754 }
1755 else
1756 {
1757 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1758 ereport(ERROR,
1759 (errcode_for_file_access(),
1760 errmsg("could not close file \"%s\": %m",
1761 cstate->filename)));
1762 }
1763
1764 MemoryContextDelete(cstate->copycontext);
1765 pfree(cstate);
1766 }
1767
1768 /*
1769 * Setup CopyState to read tuples from a table or a query for COPY TO.
1770 */
1771 static CopyState
BeginCopyTo(ParseState * pstate,Relation rel,RawStmt * query,Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1772 BeginCopyTo(ParseState *pstate,
1773 Relation rel,
1774 RawStmt *query,
1775 Oid queryRelId,
1776 const char *filename,
1777 bool is_program,
1778 List *attnamelist,
1779 List *options)
1780 {
1781 CopyState cstate;
1782 bool pipe = (filename == NULL);
1783 MemoryContext oldcontext;
1784
1785 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1786 {
1787 if (rel->rd_rel->relkind == RELKIND_VIEW)
1788 ereport(ERROR,
1789 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1790 errmsg("cannot copy from view \"%s\"",
1791 RelationGetRelationName(rel)),
1792 errhint("Try the COPY (SELECT ...) TO variant.")));
1793 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1794 ereport(ERROR,
1795 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1796 errmsg("cannot copy from materialized view \"%s\"",
1797 RelationGetRelationName(rel)),
1798 errhint("Try the COPY (SELECT ...) TO variant.")));
1799 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1800 ereport(ERROR,
1801 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1802 errmsg("cannot copy from foreign table \"%s\"",
1803 RelationGetRelationName(rel)),
1804 errhint("Try the COPY (SELECT ...) TO variant.")));
1805 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1806 ereport(ERROR,
1807 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1808 errmsg("cannot copy from sequence \"%s\"",
1809 RelationGetRelationName(rel))));
1810 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1811 ereport(ERROR,
1812 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1813 errmsg("cannot copy from partitioned table \"%s\"",
1814 RelationGetRelationName(rel)),
1815 errhint("Try the COPY (SELECT ...) TO variant.")));
1816 else
1817 ereport(ERROR,
1818 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1819 errmsg("cannot copy from non-table relation \"%s\"",
1820 RelationGetRelationName(rel))));
1821 }
1822
1823 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1824 options);
1825 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1826
1827 if (pipe)
1828 {
1829 Assert(!is_program); /* the grammar does not allow this */
1830 if (whereToSendOutput != DestRemote)
1831 cstate->copy_file = stdout;
1832 }
1833 else
1834 {
1835 cstate->filename = pstrdup(filename);
1836 cstate->is_program = is_program;
1837
1838 if (is_program)
1839 {
1840 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1841 if (cstate->copy_file == NULL)
1842 ereport(ERROR,
1843 (errcode_for_file_access(),
1844 errmsg("could not execute command \"%s\": %m",
1845 cstate->filename)));
1846 }
1847 else
1848 {
1849 mode_t oumask; /* Pre-existing umask value */
1850 struct stat st;
1851
1852 /*
1853 * Prevent write to relative path ... too easy to shoot oneself in
1854 * the foot by overwriting a database file ...
1855 */
1856 if (!is_absolute_path(filename))
1857 ereport(ERROR,
1858 (errcode(ERRCODE_INVALID_NAME),
1859 errmsg("relative path not allowed for COPY to file")));
1860
1861 oumask = umask(S_IWGRP | S_IWOTH);
1862 PG_TRY();
1863 {
1864 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1865 }
1866 PG_CATCH();
1867 {
1868 umask(oumask);
1869 PG_RE_THROW();
1870 }
1871 PG_END_TRY();
1872 umask(oumask);
1873 if (cstate->copy_file == NULL)
1874 {
1875 /* copy errno because ereport subfunctions might change it */
1876 int save_errno = errno;
1877
1878 ereport(ERROR,
1879 (errcode_for_file_access(),
1880 errmsg("could not open file \"%s\" for writing: %m",
1881 cstate->filename),
1882 (save_errno == ENOENT || save_errno == EACCES) ?
1883 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1884 "You may want a client-side facility such as psql's \\copy.") : 0));
1885 }
1886
1887 if (fstat(fileno(cstate->copy_file), &st))
1888 ereport(ERROR,
1889 (errcode_for_file_access(),
1890 errmsg("could not stat file \"%s\": %m",
1891 cstate->filename)));
1892
1893 if (S_ISDIR(st.st_mode))
1894 ereport(ERROR,
1895 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1896 errmsg("\"%s\" is a directory", cstate->filename)));
1897 }
1898 }
1899
1900 MemoryContextSwitchTo(oldcontext);
1901
1902 return cstate;
1903 }
1904
1905 /*
1906 * This intermediate routine exists mainly to localize the effects of setjmp
1907 * so we don't need to plaster a lot of variables with "volatile".
1908 */
1909 static uint64
DoCopyTo(CopyState cstate)1910 DoCopyTo(CopyState cstate)
1911 {
1912 bool pipe = (cstate->filename == NULL);
1913 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1914 uint64 processed;
1915
1916 PG_TRY();
1917 {
1918 if (fe_copy)
1919 SendCopyBegin(cstate);
1920
1921 processed = CopyTo(cstate);
1922
1923 if (fe_copy)
1924 SendCopyEnd(cstate);
1925 }
1926 PG_CATCH();
1927 {
1928 /*
1929 * Make sure we turn off old-style COPY OUT mode upon error. It is
1930 * okay to do this in all cases, since it does nothing if the mode is
1931 * not on.
1932 */
1933 pq_endcopyout(true);
1934 PG_RE_THROW();
1935 }
1936 PG_END_TRY();
1937
1938 return processed;
1939 }
1940
1941 /*
1942 * Clean up storage and release resources for COPY TO.
1943 */
1944 static void
EndCopyTo(CopyState cstate)1945 EndCopyTo(CopyState cstate)
1946 {
1947 if (cstate->queryDesc != NULL)
1948 {
1949 /* Close down the query and free resources. */
1950 ExecutorFinish(cstate->queryDesc);
1951 ExecutorEnd(cstate->queryDesc);
1952 FreeQueryDesc(cstate->queryDesc);
1953 PopActiveSnapshot();
1954 }
1955
1956 /* Clean up storage */
1957 EndCopy(cstate);
1958 }
1959
1960 /*
1961 * Copy from relation or query TO file.
1962 */
1963 static uint64
CopyTo(CopyState cstate)1964 CopyTo(CopyState cstate)
1965 {
1966 TupleDesc tupDesc;
1967 int num_phys_attrs;
1968 ListCell *cur;
1969 uint64 processed;
1970
1971 if (cstate->rel)
1972 tupDesc = RelationGetDescr(cstate->rel);
1973 else
1974 tupDesc = cstate->queryDesc->tupDesc;
1975 num_phys_attrs = tupDesc->natts;
1976 cstate->null_print_client = cstate->null_print; /* default */
1977
1978 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1979 cstate->fe_msgbuf = makeStringInfo();
1980
1981 /* Get info about the columns we need to process. */
1982 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1983 foreach(cur, cstate->attnumlist)
1984 {
1985 int attnum = lfirst_int(cur);
1986 Oid out_func_oid;
1987 bool isvarlena;
1988 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1989
1990 if (cstate->binary)
1991 getTypeBinaryOutputInfo(attr->atttypid,
1992 &out_func_oid,
1993 &isvarlena);
1994 else
1995 getTypeOutputInfo(attr->atttypid,
1996 &out_func_oid,
1997 &isvarlena);
1998 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1999 }
2000
2001 /*
2002 * Create a temporary memory context that we can reset once per row to
2003 * recover palloc'd memory. This avoids any problems with leaks inside
2004 * datatype output routines, and should be faster than retail pfree's
2005 * anyway. (We don't need a whole econtext as CopyFrom does.)
2006 */
2007 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
2008 "COPY TO",
2009 ALLOCSET_DEFAULT_SIZES);
2010
2011 if (cstate->binary)
2012 {
2013 /* Generate header for a binary copy */
2014 int32 tmp;
2015
2016 /* Signature */
2017 CopySendData(cstate, BinarySignature, 11);
2018 /* Flags field */
2019 tmp = 0;
2020 if (cstate->oids)
2021 tmp |= (1 << 16);
2022 CopySendInt32(cstate, tmp);
2023 /* No header extension */
2024 tmp = 0;
2025 CopySendInt32(cstate, tmp);
2026 }
2027 else
2028 {
2029 /*
2030 * For non-binary copy, we need to convert null_print to file
2031 * encoding, because it will be sent directly with CopySendString.
2032 */
2033 if (cstate->need_transcoding)
2034 cstate->null_print_client = pg_server_to_any(cstate->null_print,
2035 cstate->null_print_len,
2036 cstate->file_encoding);
2037
2038 /* if a header has been requested send the line */
2039 if (cstate->header_line)
2040 {
2041 bool hdr_delim = false;
2042
2043 foreach(cur, cstate->attnumlist)
2044 {
2045 int attnum = lfirst_int(cur);
2046 char *colname;
2047
2048 if (hdr_delim)
2049 CopySendChar(cstate, cstate->delim[0]);
2050 hdr_delim = true;
2051
2052 colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2053
2054 CopyAttributeOutCSV(cstate, colname, false,
2055 list_length(cstate->attnumlist) == 1);
2056 }
2057
2058 CopySendEndOfRow(cstate);
2059 }
2060 }
2061
2062 if (cstate->rel)
2063 {
2064 Datum *values;
2065 bool *nulls;
2066 HeapScanDesc scandesc;
2067 HeapTuple tuple;
2068
2069 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2070 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2071
2072 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2073
2074 processed = 0;
2075 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2076 {
2077 CHECK_FOR_INTERRUPTS();
2078
2079 /* Deconstruct the tuple ... faster than repeated heap_getattr */
2080 heap_deform_tuple(tuple, tupDesc, values, nulls);
2081
2082 /* Format and send the data */
2083 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2084 processed++;
2085 }
2086
2087 heap_endscan(scandesc);
2088
2089 pfree(values);
2090 pfree(nulls);
2091 }
2092 else
2093 {
2094 /* run the plan --- the dest receiver will send tuples */
2095 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2096 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2097 }
2098
2099 if (cstate->binary)
2100 {
2101 /* Generate trailer for a binary copy */
2102 CopySendInt16(cstate, -1);
2103 /* Need to flush out the trailer */
2104 CopySendEndOfRow(cstate);
2105 }
2106
2107 MemoryContextDelete(cstate->rowcontext);
2108
2109 return processed;
2110 }
2111
2112 /*
2113 * Emit one row during CopyTo().
2114 */
2115 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2116 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2117 {
2118 bool need_delim = false;
2119 FmgrInfo *out_functions = cstate->out_functions;
2120 MemoryContext oldcontext;
2121 ListCell *cur;
2122 char *string;
2123
2124 MemoryContextReset(cstate->rowcontext);
2125 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2126
2127 if (cstate->binary)
2128 {
2129 /* Binary per-tuple header */
2130 CopySendInt16(cstate, list_length(cstate->attnumlist));
2131 /* Send OID if wanted --- note attnumlist doesn't include it */
2132 if (cstate->oids)
2133 {
2134 /* Hack --- assume Oid is same size as int32 */
2135 CopySendInt32(cstate, sizeof(int32));
2136 CopySendInt32(cstate, tupleOid);
2137 }
2138 }
2139 else
2140 {
2141 /* Text format has no per-tuple header, but send OID if wanted */
2142 /* Assume digits don't need any quoting or encoding conversion */
2143 if (cstate->oids)
2144 {
2145 string = DatumGetCString(DirectFunctionCall1(oidout,
2146 ObjectIdGetDatum(tupleOid)));
2147 CopySendString(cstate, string);
2148 need_delim = true;
2149 }
2150 }
2151
2152 foreach(cur, cstate->attnumlist)
2153 {
2154 int attnum = lfirst_int(cur);
2155 Datum value = values[attnum - 1];
2156 bool isnull = nulls[attnum - 1];
2157
2158 if (!cstate->binary)
2159 {
2160 if (need_delim)
2161 CopySendChar(cstate, cstate->delim[0]);
2162 need_delim = true;
2163 }
2164
2165 if (isnull)
2166 {
2167 if (!cstate->binary)
2168 CopySendString(cstate, cstate->null_print_client);
2169 else
2170 CopySendInt32(cstate, -1);
2171 }
2172 else
2173 {
2174 if (!cstate->binary)
2175 {
2176 string = OutputFunctionCall(&out_functions[attnum - 1],
2177 value);
2178 if (cstate->csv_mode)
2179 CopyAttributeOutCSV(cstate, string,
2180 cstate->force_quote_flags[attnum - 1],
2181 list_length(cstate->attnumlist) == 1);
2182 else
2183 CopyAttributeOutText(cstate, string);
2184 }
2185 else
2186 {
2187 bytea *outputbytes;
2188
2189 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2190 value);
2191 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2192 CopySendData(cstate, VARDATA(outputbytes),
2193 VARSIZE(outputbytes) - VARHDRSZ);
2194 }
2195 }
2196 }
2197
2198 CopySendEndOfRow(cstate);
2199
2200 MemoryContextSwitchTo(oldcontext);
2201 }
2202
2203
2204 /*
2205 * error context callback for COPY FROM
2206 *
2207 * The argument for the error context must be CopyState.
2208 */
2209 void
CopyFromErrorCallback(void * arg)2210 CopyFromErrorCallback(void *arg)
2211 {
2212 CopyState cstate = (CopyState) arg;
2213 char curlineno_str[32];
2214
2215 snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2216 cstate->cur_lineno);
2217
2218 if (cstate->binary)
2219 {
2220 /* can't usefully display the data */
2221 if (cstate->cur_attname)
2222 errcontext("COPY %s, line %s, column %s",
2223 cstate->cur_relname, curlineno_str,
2224 cstate->cur_attname);
2225 else
2226 errcontext("COPY %s, line %s",
2227 cstate->cur_relname, curlineno_str);
2228 }
2229 else
2230 {
2231 if (cstate->cur_attname && cstate->cur_attval)
2232 {
2233 /* error is relevant to a particular column */
2234 char *attval;
2235
2236 attval = limit_printout_length(cstate->cur_attval);
2237 errcontext("COPY %s, line %s, column %s: \"%s\"",
2238 cstate->cur_relname, curlineno_str,
2239 cstate->cur_attname, attval);
2240 pfree(attval);
2241 }
2242 else if (cstate->cur_attname)
2243 {
2244 /* error is relevant to a particular column, value is NULL */
2245 errcontext("COPY %s, line %s, column %s: null input",
2246 cstate->cur_relname, curlineno_str,
2247 cstate->cur_attname);
2248 }
2249 else
2250 {
2251 /*
2252 * Error is relevant to a particular line.
2253 *
2254 * If line_buf still contains the correct line, and it's already
2255 * transcoded, print it. If it's still in a foreign encoding, it's
2256 * quite likely that the error is precisely a failure to do
2257 * encoding conversion (ie, bad data). We dare not try to convert
2258 * it, and at present there's no way to regurgitate it without
2259 * conversion. So we have to punt and just report the line number.
2260 */
2261 if (cstate->line_buf_valid &&
2262 (cstate->line_buf_converted || !cstate->need_transcoding))
2263 {
2264 char *lineval;
2265
2266 lineval = limit_printout_length(cstate->line_buf.data);
2267 errcontext("COPY %s, line %s: \"%s\"",
2268 cstate->cur_relname, curlineno_str, lineval);
2269 pfree(lineval);
2270 }
2271 else
2272 {
2273 errcontext("COPY %s, line %s",
2274 cstate->cur_relname, curlineno_str);
2275 }
2276 }
2277 }
2278 }
2279
2280 /*
2281 * Make sure we don't print an unreasonable amount of COPY data in a message.
2282 *
2283 * It would seem a lot easier to just use the sprintf "precision" limit to
2284 * truncate the string. However, some versions of glibc have a bug/misfeature
2285 * that vsnprintf will always fail (return -1) if it is asked to truncate
2286 * a string that contains invalid byte sequences for the current encoding.
2287 * So, do our own truncation. We return a pstrdup'd copy of the input.
2288 */
2289 static char *
limit_printout_length(const char * str)2290 limit_printout_length(const char *str)
2291 {
2292 #define MAX_COPY_DATA_DISPLAY 100
2293
2294 int slen = strlen(str);
2295 int len;
2296 char *res;
2297
2298 /* Fast path if definitely okay */
2299 if (slen <= MAX_COPY_DATA_DISPLAY)
2300 return pstrdup(str);
2301
2302 /* Apply encoding-dependent truncation */
2303 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2304
2305 /*
2306 * Truncate, and add "..." to show we truncated the input.
2307 */
2308 res = (char *) palloc(len + 4);
2309 memcpy(res, str, len);
2310 strcpy(res + len, "...");
2311
2312 return res;
2313 }
2314
2315 /*
2316 * Copy FROM file to relation.
2317 */
2318 uint64
CopyFrom(CopyState cstate)2319 CopyFrom(CopyState cstate)
2320 {
2321 HeapTuple tuple;
2322 TupleDesc tupDesc;
2323 Datum *values;
2324 bool *nulls;
2325 ResultRelInfo *resultRelInfo;
2326 ResultRelInfo *saved_resultRelInfo = NULL;
2327 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2328 ModifyTableState *mtstate;
2329 ExprContext *econtext;
2330 TupleTableSlot *myslot;
2331 MemoryContext oldcontext = CurrentMemoryContext;
2332
2333 ErrorContextCallback errcallback;
2334 CommandId mycid = GetCurrentCommandId(true);
2335 int hi_options = 0; /* start with default heap_insert options */
2336 BulkInsertState bistate;
2337 uint64 processed = 0;
2338 bool useHeapMultiInsert;
2339 int nBufferedTuples = 0;
2340 int prev_leaf_part_index = -1;
2341
2342 #define MAX_BUFFERED_TUPLES 1000
2343 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2344 Size bufferedTuplesSize = 0;
2345 uint64 firstBufferedLineNo = 0;
2346
2347 Assert(cstate->rel);
2348
2349 /*
2350 * The target must be a plain, foreign, or partitioned relation, or have
2351 * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
2352 * allowed on views, so we only hint about them in the view case.)
2353 */
2354 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2355 cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2356 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2357 !(cstate->rel->trigdesc &&
2358 cstate->rel->trigdesc->trig_insert_instead_row))
2359 {
2360 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2361 ereport(ERROR,
2362 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2363 errmsg("cannot copy to view \"%s\"",
2364 RelationGetRelationName(cstate->rel)),
2365 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2366 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2367 ereport(ERROR,
2368 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2369 errmsg("cannot copy to materialized view \"%s\"",
2370 RelationGetRelationName(cstate->rel))));
2371 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2372 ereport(ERROR,
2373 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2374 errmsg("cannot copy to sequence \"%s\"",
2375 RelationGetRelationName(cstate->rel))));
2376 else
2377 ereport(ERROR,
2378 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2379 errmsg("cannot copy to non-table relation \"%s\"",
2380 RelationGetRelationName(cstate->rel))));
2381 }
2382
2383 tupDesc = RelationGetDescr(cstate->rel);
2384
2385 /*----------
2386 * Check to see if we can avoid writing WAL
2387 *
2388 * If archive logging/streaming is not enabled *and* either
2389 * - table was created in same transaction as this COPY
2390 * - data is being written to relfilenode created in this transaction
2391 * then we can skip writing WAL. It's safe because if the transaction
2392 * doesn't commit, we'll discard the table (or the new relfilenode file).
2393 * If it does commit, we'll have done the heap_sync at the bottom of this
2394 * routine first.
2395 *
2396 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2397 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2398 * can be cleared before the end of the transaction. The exact case is
2399 * when a relation sets a new relfilenode twice in same transaction, yet
2400 * the second one fails in an aborted subtransaction, e.g.
2401 *
2402 * BEGIN;
2403 * TRUNCATE t;
2404 * SAVEPOINT save;
2405 * TRUNCATE t;
2406 * ROLLBACK TO save;
2407 * COPY ...
2408 *
2409 * Also, if the target file is new-in-transaction, we assume that checking
2410 * FSM for free space is a waste of time, even if we must use WAL because
2411 * of archiving. This could possibly be wrong, but it's unlikely.
2412 *
2413 * The comments for heap_insert and RelationGetBufferForTuple specify that
2414 * skipping WAL logging is only safe if we ensure that our tuples do not
2415 * go into pages containing tuples from any other transactions --- but this
2416 * must be the case if we have a new table or new relfilenode, so we need
2417 * no additional work to enforce that.
2418 *
2419 * We currently don't support this optimization if the COPY target is a
2420 * partitioned table as we currently only lazily initialize partition
2421 * information when routing the first tuple to the partition. We cannot
2422 * know at this stage if we can perform this optimization. It should be
2423 * possible to improve on this, but it does mean maintaining heap insert
2424 * option flags per partition and setting them when we first open the
2425 * partition.
2426 *
2427 * This optimization is not supported for relation types which do not
2428 * have any physical storage, with foreign tables and views using
2429 * INSTEAD OF triggers entering in this category. Partitioned tables
2430 * are not supported as per the description above.
2431 *----------
2432 */
2433 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2434 if (cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2435 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2436 cstate->rel->rd_rel->relkind != RELKIND_VIEW &&
2437 (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2438 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
2439 {
2440 hi_options |= HEAP_INSERT_SKIP_FSM;
2441 if (!XLogIsNeeded())
2442 hi_options |= HEAP_INSERT_SKIP_WAL;
2443 }
2444
2445 /*
2446 * Optimize if new relfilenode was created in this subxact or one of its
2447 * committed children and we won't see those rows later as part of an
2448 * earlier scan or command. The subxact test ensures that if this subxact
2449 * aborts then the frozen rows won't be visible after xact cleanup. Note
2450 * that the stronger test of exactly which subtransaction created it is
2451 * crucial for correctness of this optimization. The test for an earlier
2452 * scan or command tolerates false negatives. FREEZE causes other sessions
2453 * to see rows they would not see under MVCC, and a false negative merely
2454 * spreads that anomaly to the current session.
2455 */
2456 if (cstate->freeze)
2457 {
2458 /*
2459 * We currently disallow COPY FREEZE on partitioned tables. The
2460 * reason for this is that we've simply not yet opened the partitions
2461 * to determine if the optimization can be applied to them. We could
2462 * go and open them all here, but doing so may be quite a costly
2463 * overhead for small copies. In any case, we may just end up routing
2464 * tuples to a small number of partitions. It seems better just to
2465 * raise an ERROR for partitioned tables.
2466 */
2467 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2468 {
2469 ereport(ERROR,
2470 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2471 errmsg("cannot perform FREEZE on a partitioned table")));
2472 }
2473
2474 /*
2475 * Tolerate one registration for the benefit of FirstXactSnapshot.
2476 * Scan-bearing queries generally create at least two registrations,
2477 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2478 * Clear CatalogSnapshot to avoid counting its registration. We'll
2479 * still detect ongoing catalog scans, each of which separately
2480 * registers the snapshot it uses.
2481 */
2482 InvalidateCatalogSnapshot();
2483 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2484 ereport(ERROR,
2485 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2486 errmsg("cannot perform FREEZE because of prior transaction activity")));
2487
2488 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2489 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2490 ereport(ERROR,
2491 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2492 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2493
2494 hi_options |= HEAP_INSERT_FROZEN;
2495 }
2496
2497 /*
2498 * We need a ResultRelInfo so we can use the regular executor's
2499 * index-entry-making machinery. (There used to be a huge amount of code
2500 * here that basically duplicated execUtils.c ...)
2501 */
2502 resultRelInfo = makeNode(ResultRelInfo);
2503 InitResultRelInfo(resultRelInfo,
2504 cstate->rel,
2505 1, /* dummy rangetable index */
2506 NULL,
2507 0);
2508
2509 /* Verify the named relation is a valid target for INSERT */
2510 CheckValidResultRel(resultRelInfo, CMD_INSERT);
2511
2512 ExecOpenIndices(resultRelInfo, false);
2513
2514 estate->es_result_relations = resultRelInfo;
2515 estate->es_num_result_relations = 1;
2516 estate->es_result_relation_info = resultRelInfo;
2517 estate->es_range_table = cstate->range_table;
2518
2519 /* Set up a tuple slot too */
2520 myslot = ExecInitExtraTupleSlot(estate, tupDesc);
2521 /* Triggers might need a slot as well */
2522 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
2523
2524 /*
2525 * Set up a ModifyTableState so we can let FDW(s) init themselves for
2526 * foreign-table result relation(s).
2527 */
2528 mtstate = makeNode(ModifyTableState);
2529 mtstate->ps.plan = NULL;
2530 mtstate->ps.state = estate;
2531 mtstate->operation = CMD_INSERT;
2532 mtstate->resultRelInfo = estate->es_result_relations;
2533 mtstate->rootResultRelInfo = estate->es_result_relations;
2534
2535 if (resultRelInfo->ri_FdwRoutine != NULL &&
2536 resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
2537 resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
2538 resultRelInfo);
2539
2540 /* Prepare to catch AFTER triggers. */
2541 AfterTriggerBeginQuery();
2542
2543 /*
2544 * If there are any triggers with transition tables on the named relation,
2545 * we need to be prepared to capture transition tuples.
2546 */
2547 cstate->transition_capture =
2548 MakeTransitionCaptureState(cstate->rel->trigdesc,
2549 RelationGetRelid(cstate->rel),
2550 CMD_INSERT);
2551
2552 /*
2553 * If the named relation is a partitioned table, initialize state for
2554 * CopyFrom tuple routing.
2555 */
2556 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2557 {
2558 PartitionTupleRouting *proute;
2559
2560 proute = cstate->partition_tuple_routing =
2561 ExecSetupPartitionTupleRouting(NULL, resultRelInfo);
2562
2563 /*
2564 * If we are capturing transition tuples, they may need to be
2565 * converted from partition format back to partitioned table format
2566 * (this is only ever necessary if a BEFORE trigger modifies the
2567 * tuple).
2568 */
2569 if (cstate->transition_capture != NULL)
2570 ExecSetupChildParentMapForLeaf(proute);
2571 }
2572
2573 /*
2574 * It's more efficient to prepare a bunch of tuples for insertion, and
2575 * insert them in one heap_multi_insert() call, than call heap_insert()
2576 * separately for every tuple. However, we can't do that if there are
2577 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2578 * expressions. Such triggers or expressions might query the table we're
2579 * inserting to, and act differently if the tuples that have already been
2580 * processed and prepared for insertion are not there. We also can't do
2581 * it if the table is foreign or partitioned.
2582 */
2583 if ((resultRelInfo->ri_TrigDesc != NULL &&
2584 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2585 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2586 resultRelInfo->ri_FdwRoutine != NULL ||
2587 cstate->partition_tuple_routing != NULL ||
2588 cstate->volatile_defexprs)
2589 {
2590 useHeapMultiInsert = false;
2591 }
2592 else
2593 {
2594 useHeapMultiInsert = true;
2595 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2596 }
2597
2598 /*
2599 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2600 * should do this for COPY, since it's not really an "INSERT" statement as
2601 * such. However, executing these triggers maintains consistency with the
2602 * EACH ROW triggers that we already fire on COPY.
2603 */
2604 ExecBSInsertTriggers(estate, resultRelInfo);
2605
2606 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2607 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2608
2609 bistate = GetBulkInsertState();
2610 econtext = GetPerTupleExprContext(estate);
2611
2612 /* Set up callback to identify error line number */
2613 errcallback.callback = CopyFromErrorCallback;
2614 errcallback.arg = (void *) cstate;
2615 errcallback.previous = error_context_stack;
2616 error_context_stack = &errcallback;
2617
2618 for (;;)
2619 {
2620 TupleTableSlot *slot;
2621 bool skip_tuple;
2622 Oid loaded_oid = InvalidOid;
2623
2624 CHECK_FOR_INTERRUPTS();
2625
2626 if (nBufferedTuples == 0)
2627 {
2628 /*
2629 * Reset the per-tuple exprcontext. We can only do this if the
2630 * tuple buffer is empty. (Calling the context the per-tuple
2631 * memory context is a bit of a misnomer now.)
2632 */
2633 ResetPerTupleExprContext(estate);
2634 }
2635
2636 /* Switch into its memory context */
2637 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2638
2639 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2640 break;
2641
2642 /* And now we can form the input tuple. */
2643 tuple = heap_form_tuple(tupDesc, values, nulls);
2644
2645 if (loaded_oid != InvalidOid)
2646 HeapTupleSetOid(tuple, loaded_oid);
2647
2648 /*
2649 * Constraints might reference the tableoid column, so initialize
2650 * t_tableOid before evaluating them.
2651 */
2652 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2653
2654 /* Triggers and stuff need to be invoked in query context. */
2655 MemoryContextSwitchTo(oldcontext);
2656
2657 /* Place tuple in tuple slot --- but slot shouldn't free it */
2658 slot = myslot;
2659 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2660
2661 /* Determine the partition to heap_insert the tuple into */
2662 if (cstate->partition_tuple_routing)
2663 {
2664 int leaf_part_index;
2665 PartitionTupleRouting *proute = cstate->partition_tuple_routing;
2666
2667 /*
2668 * Away we go ... If we end up not finding a partition after all,
2669 * ExecFindPartition() does not return and errors out instead.
2670 * Otherwise, the returned value is to be used as an index into
2671 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2672 * will get us the ResultRelInfo and TupleConversionMap for the
2673 * partition, respectively.
2674 */
2675 leaf_part_index = ExecFindPartition(resultRelInfo,
2676 proute->partition_dispatch_info,
2677 slot,
2678 estate);
2679 Assert(leaf_part_index >= 0 &&
2680 leaf_part_index < proute->num_partitions);
2681
2682 /*
2683 * If this tuple is mapped to a partition that is not same as the
2684 * previous one, we'd better make the bulk insert mechanism gets a
2685 * new buffer.
2686 */
2687 if (prev_leaf_part_index != leaf_part_index)
2688 {
2689 ReleaseBulkInsertStatePin(bistate);
2690 prev_leaf_part_index = leaf_part_index;
2691 }
2692
2693 /*
2694 * Save the old ResultRelInfo and switch to the one corresponding
2695 * to the selected partition.
2696 */
2697 saved_resultRelInfo = resultRelInfo;
2698 resultRelInfo = proute->partitions[leaf_part_index];
2699 if (resultRelInfo == NULL)
2700 {
2701 resultRelInfo = ExecInitPartitionInfo(mtstate,
2702 saved_resultRelInfo,
2703 proute, estate,
2704 leaf_part_index);
2705 Assert(resultRelInfo != NULL);
2706 }
2707
2708 /*
2709 * For ExecInsertIndexTuples() to work on the partition's indexes
2710 */
2711 estate->es_result_relation_info = resultRelInfo;
2712
2713 /*
2714 * If we're capturing transition tuples, we might need to convert
2715 * from the partition rowtype to parent rowtype.
2716 */
2717 if (cstate->transition_capture != NULL)
2718 {
2719 if (resultRelInfo->ri_TrigDesc &&
2720 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2721 {
2722 /*
2723 * If there are any BEFORE triggers on the partition,
2724 * we'll have to be ready to convert their result back to
2725 * tuplestore format.
2726 */
2727 cstate->transition_capture->tcs_original_insert_tuple = NULL;
2728 cstate->transition_capture->tcs_map =
2729 TupConvMapForLeaf(proute, saved_resultRelInfo,
2730 leaf_part_index);
2731 }
2732 else
2733 {
2734 /*
2735 * Otherwise, just remember the original unconverted
2736 * tuple, to avoid a needless round trip conversion.
2737 */
2738 cstate->transition_capture->tcs_original_insert_tuple = tuple;
2739 cstate->transition_capture->tcs_map = NULL;
2740 }
2741 }
2742
2743 /*
2744 * We might need to convert from the parent rowtype to the
2745 * partition rowtype.
2746 */
2747 tuple = ConvertPartitionTupleSlot(proute->parent_child_tupconv_maps[leaf_part_index],
2748 tuple,
2749 proute->partition_tuple_slot,
2750 &slot);
2751
2752 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2753 }
2754
2755 skip_tuple = false;
2756
2757 /* BEFORE ROW INSERT Triggers */
2758 if (resultRelInfo->ri_TrigDesc &&
2759 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2760 {
2761 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2762
2763 if (slot == NULL) /* "do nothing" */
2764 skip_tuple = true;
2765 else /* trigger might have changed tuple */
2766 tuple = ExecMaterializeSlot(slot);
2767 }
2768
2769 if (!skip_tuple)
2770 {
2771 if (resultRelInfo->ri_TrigDesc &&
2772 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2773 {
2774 /* Pass the data to the INSTEAD ROW INSERT trigger */
2775 ExecIRInsertTriggers(estate, resultRelInfo, slot);
2776 }
2777 else
2778 {
2779 /*
2780 * If the target is a plain table, check the constraints of
2781 * the tuple.
2782 */
2783 if (resultRelInfo->ri_FdwRoutine == NULL &&
2784 resultRelInfo->ri_RelationDesc->rd_att->constr)
2785 ExecConstraints(resultRelInfo, slot, estate);
2786
2787 /*
2788 * Also check the tuple against the partition constraint, if
2789 * there is one; except that if we got here via tuple-routing,
2790 * we don't need to if there's no BR trigger defined on the
2791 * partition.
2792 */
2793 if (resultRelInfo->ri_PartitionCheck &&
2794 (saved_resultRelInfo == NULL ||
2795 (resultRelInfo->ri_TrigDesc &&
2796 resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
2797 ExecPartitionCheck(resultRelInfo, slot, estate, true);
2798
2799 if (useHeapMultiInsert)
2800 {
2801 /* Add this tuple to the tuple buffer */
2802 if (nBufferedTuples == 0)
2803 firstBufferedLineNo = cstate->cur_lineno;
2804 bufferedTuples[nBufferedTuples++] = tuple;
2805 bufferedTuplesSize += tuple->t_len;
2806
2807 /*
2808 * If the buffer filled up, flush it. Also flush if the
2809 * total size of all the tuples in the buffer becomes
2810 * large, to avoid using large amounts of memory for the
2811 * buffer when the tuples are exceptionally wide.
2812 */
2813 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2814 bufferedTuplesSize > 65535)
2815 {
2816 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2817 resultRelInfo, myslot, bistate,
2818 nBufferedTuples, bufferedTuples,
2819 firstBufferedLineNo);
2820 nBufferedTuples = 0;
2821 bufferedTuplesSize = 0;
2822 }
2823 }
2824 else
2825 {
2826 List *recheckIndexes = NIL;
2827
2828 /* OK, store the tuple */
2829 if (resultRelInfo->ri_FdwRoutine != NULL)
2830 {
2831 slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
2832 resultRelInfo,
2833 slot,
2834 NULL);
2835
2836 if (slot == NULL) /* "do nothing" */
2837 goto next_tuple;
2838
2839 /* FDW might have changed tuple */
2840 tuple = ExecMaterializeSlot(slot);
2841
2842 /*
2843 * AFTER ROW Triggers might reference the tableoid
2844 * column, so initialize t_tableOid before evaluating
2845 * them.
2846 */
2847 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2848 }
2849 else
2850 heap_insert(resultRelInfo->ri_RelationDesc, tuple,
2851 mycid, hi_options, bistate);
2852
2853 /* And create index entries for it */
2854 if (resultRelInfo->ri_NumIndices > 0)
2855 recheckIndexes = ExecInsertIndexTuples(slot,
2856 &(tuple->t_self),
2857 estate,
2858 false,
2859 NULL,
2860 NIL);
2861
2862 /* AFTER ROW INSERT Triggers */
2863 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2864 recheckIndexes, cstate->transition_capture);
2865
2866 list_free(recheckIndexes);
2867 }
2868 }
2869
2870 /*
2871 * We count only tuples not suppressed by a BEFORE INSERT trigger
2872 * or FDW; this is the same definition used by nodeModifyTable.c
2873 * for counting tuples inserted by an INSERT command.
2874 */
2875 processed++;
2876 }
2877
2878 next_tuple:
2879 /* Restore the saved ResultRelInfo */
2880 if (saved_resultRelInfo)
2881 {
2882 resultRelInfo = saved_resultRelInfo;
2883 estate->es_result_relation_info = resultRelInfo;
2884 }
2885 }
2886
2887 /* Flush any remaining buffered tuples */
2888 if (nBufferedTuples > 0)
2889 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2890 resultRelInfo, myslot, bistate,
2891 nBufferedTuples, bufferedTuples,
2892 firstBufferedLineNo);
2893
2894 /* Done, clean up */
2895 error_context_stack = errcallback.previous;
2896
2897 FreeBulkInsertState(bistate);
2898
2899 MemoryContextSwitchTo(oldcontext);
2900
2901 /*
2902 * In the old protocol, tell pqcomm that we can process normal protocol
2903 * messages again.
2904 */
2905 if (cstate->copy_dest == COPY_OLD_FE)
2906 pq_endmsgread();
2907
2908 /* Execute AFTER STATEMENT insertion triggers */
2909 ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2910
2911 /* Handle queued AFTER triggers */
2912 AfterTriggerEndQuery(estate);
2913
2914 pfree(values);
2915 pfree(nulls);
2916
2917 ExecResetTupleTable(estate->es_tupleTable, false);
2918
2919 /* Allow the FDW to shut down */
2920 if (resultRelInfo->ri_FdwRoutine != NULL &&
2921 resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
2922 resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
2923 resultRelInfo);
2924
2925 ExecCloseIndices(resultRelInfo);
2926
2927 /* Close all the partitioned tables, leaf partitions, and their indices */
2928 if (cstate->partition_tuple_routing)
2929 ExecCleanupTupleRouting(mtstate, cstate->partition_tuple_routing);
2930
2931 /* Close any trigger target relations */
2932 ExecCleanUpTriggerState(estate);
2933
2934 FreeExecutorState(estate);
2935
2936 /*
2937 * If we skipped writing WAL, then we need to sync the heap (but not
2938 * indexes since those use WAL anyway)
2939 */
2940 if (hi_options & HEAP_INSERT_SKIP_WAL)
2941 heap_sync(cstate->rel);
2942
2943 return processed;
2944 }
2945
2946 /*
2947 * A subroutine of CopyFrom, to write the current batch of buffered heap
2948 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2949 * triggers.
2950 */
2951 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2952 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2953 int hi_options, ResultRelInfo *resultRelInfo,
2954 TupleTableSlot *myslot, BulkInsertState bistate,
2955 int nBufferedTuples, HeapTuple *bufferedTuples,
2956 uint64 firstBufferedLineNo)
2957 {
2958 MemoryContext oldcontext;
2959 int i;
2960 uint64 save_cur_lineno;
2961
2962 /*
2963 * Print error context information correctly, if one of the operations
2964 * below fail.
2965 */
2966 cstate->line_buf_valid = false;
2967 save_cur_lineno = cstate->cur_lineno;
2968
2969 /*
2970 * heap_multi_insert leaks memory, so switch to short-lived memory context
2971 * before calling it.
2972 */
2973 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2974 heap_multi_insert(cstate->rel,
2975 bufferedTuples,
2976 nBufferedTuples,
2977 mycid,
2978 hi_options,
2979 bistate);
2980 MemoryContextSwitchTo(oldcontext);
2981
2982 /*
2983 * If there are any indexes, update them for all the inserted tuples, and
2984 * run AFTER ROW INSERT triggers.
2985 */
2986 if (resultRelInfo->ri_NumIndices > 0)
2987 {
2988 for (i = 0; i < nBufferedTuples; i++)
2989 {
2990 List *recheckIndexes;
2991
2992 cstate->cur_lineno = firstBufferedLineNo + i;
2993 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2994 recheckIndexes =
2995 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2996 estate, false, NULL, NIL);
2997 ExecARInsertTriggers(estate, resultRelInfo,
2998 bufferedTuples[i],
2999 recheckIndexes, cstate->transition_capture);
3000 list_free(recheckIndexes);
3001 }
3002 }
3003
3004 /*
3005 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
3006 * anyway.
3007 */
3008 else if (resultRelInfo->ri_TrigDesc != NULL &&
3009 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
3010 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
3011 {
3012 for (i = 0; i < nBufferedTuples; i++)
3013 {
3014 cstate->cur_lineno = firstBufferedLineNo + i;
3015 ExecARInsertTriggers(estate, resultRelInfo,
3016 bufferedTuples[i],
3017 NIL, cstate->transition_capture);
3018 }
3019 }
3020
3021 /* reset cur_lineno to where we were */
3022 cstate->cur_lineno = save_cur_lineno;
3023 }
3024
3025 /*
3026 * Setup to read tuples from a file for COPY FROM.
3027 *
3028 * 'rel': Used as a template for the tuples
3029 * 'filename': Name of server-local file to read
3030 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3031 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3032 *
3033 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3034 */
3035 CopyState
BeginCopyFrom(ParseState * pstate,Relation rel,const char * filename,bool is_program,copy_data_source_cb data_source_cb,List * attnamelist,List * options)3036 BeginCopyFrom(ParseState *pstate,
3037 Relation rel,
3038 const char *filename,
3039 bool is_program,
3040 copy_data_source_cb data_source_cb,
3041 List *attnamelist,
3042 List *options)
3043 {
3044 CopyState cstate;
3045 bool pipe = (filename == NULL);
3046 TupleDesc tupDesc;
3047 AttrNumber num_phys_attrs,
3048 num_defaults;
3049 FmgrInfo *in_functions;
3050 Oid *typioparams;
3051 int attnum;
3052 Oid in_func_oid;
3053 int *defmap;
3054 ExprState **defexprs;
3055 MemoryContext oldcontext;
3056 bool volatile_defexprs;
3057
3058 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3059 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3060
3061 /* Initialize state variables */
3062 cstate->reached_eof = false;
3063 cstate->eol_type = EOL_UNKNOWN;
3064 cstate->cur_relname = RelationGetRelationName(cstate->rel);
3065 cstate->cur_lineno = 0;
3066 cstate->cur_attname = NULL;
3067 cstate->cur_attval = NULL;
3068
3069 /* Set up variables to avoid per-attribute overhead. */
3070 initStringInfo(&cstate->attribute_buf);
3071 initStringInfo(&cstate->line_buf);
3072 cstate->line_buf_converted = false;
3073 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3074 cstate->raw_buf_index = cstate->raw_buf_len = 0;
3075
3076 /* Assign range table, we'll need it in CopyFrom. */
3077 if (pstate)
3078 cstate->range_table = pstate->p_rtable;
3079
3080 tupDesc = RelationGetDescr(cstate->rel);
3081 num_phys_attrs = tupDesc->natts;
3082 num_defaults = 0;
3083 volatile_defexprs = false;
3084
3085 /*
3086 * Pick up the required catalog information for each attribute in the
3087 * relation, including the input function, the element type (to pass to
3088 * the input function), and info about defaults and constraints. (Which
3089 * input function we use depends on text/binary format choice.)
3090 */
3091 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3092 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3093 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3094 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3095
3096 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3097 {
3098 Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3099
3100 /* We don't need info for dropped attributes */
3101 if (att->attisdropped)
3102 continue;
3103
3104 /* Fetch the input function and typioparam info */
3105 if (cstate->binary)
3106 getTypeBinaryInputInfo(att->atttypid,
3107 &in_func_oid, &typioparams[attnum - 1]);
3108 else
3109 getTypeInputInfo(att->atttypid,
3110 &in_func_oid, &typioparams[attnum - 1]);
3111 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3112
3113 /* Get default info if needed */
3114 if (!list_member_int(cstate->attnumlist, attnum))
3115 {
3116 /* attribute is NOT to be copied from input */
3117 /* use default value if one exists */
3118 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3119 attnum);
3120
3121 if (defexpr != NULL)
3122 {
3123 /* Run the expression through planner */
3124 defexpr = expression_planner(defexpr);
3125
3126 /* Initialize executable expression in copycontext */
3127 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3128 defmap[num_defaults] = attnum - 1;
3129 num_defaults++;
3130
3131 /*
3132 * If a default expression looks at the table being loaded,
3133 * then it could give the wrong answer when using
3134 * multi-insert. Since database access can be dynamic this is
3135 * hard to test for exactly, so we use the much wider test of
3136 * whether the default expression is volatile. We allow for
3137 * the special case of when the default expression is the
3138 * nextval() of a sequence which in this specific case is
3139 * known to be safe for use with the multi-insert
3140 * optimization. Hence we use this special case function
3141 * checker rather than the standard check for
3142 * contain_volatile_functions().
3143 */
3144 if (!volatile_defexprs)
3145 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3146 }
3147 }
3148 }
3149
3150 /* We keep those variables in cstate. */
3151 cstate->in_functions = in_functions;
3152 cstate->typioparams = typioparams;
3153 cstate->defmap = defmap;
3154 cstate->defexprs = defexprs;
3155 cstate->volatile_defexprs = volatile_defexprs;
3156 cstate->num_defaults = num_defaults;
3157 cstate->is_program = is_program;
3158
3159 if (data_source_cb)
3160 {
3161 cstate->copy_dest = COPY_CALLBACK;
3162 cstate->data_source_cb = data_source_cb;
3163 }
3164 else if (pipe)
3165 {
3166 Assert(!is_program); /* the grammar does not allow this */
3167 if (whereToSendOutput == DestRemote)
3168 ReceiveCopyBegin(cstate);
3169 else
3170 cstate->copy_file = stdin;
3171 }
3172 else
3173 {
3174 cstate->filename = pstrdup(filename);
3175
3176 if (cstate->is_program)
3177 {
3178 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3179 if (cstate->copy_file == NULL)
3180 ereport(ERROR,
3181 (errcode_for_file_access(),
3182 errmsg("could not execute command \"%s\": %m",
3183 cstate->filename)));
3184 }
3185 else
3186 {
3187 struct stat st;
3188
3189 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3190 if (cstate->copy_file == NULL)
3191 {
3192 /* copy errno because ereport subfunctions might change it */
3193 int save_errno = errno;
3194
3195 ereport(ERROR,
3196 (errcode_for_file_access(),
3197 errmsg("could not open file \"%s\" for reading: %m",
3198 cstate->filename),
3199 (save_errno == ENOENT || save_errno == EACCES) ?
3200 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3201 "You may want a client-side facility such as psql's \\copy.") : 0));
3202 }
3203
3204 if (fstat(fileno(cstate->copy_file), &st))
3205 ereport(ERROR,
3206 (errcode_for_file_access(),
3207 errmsg("could not stat file \"%s\": %m",
3208 cstate->filename)));
3209
3210 if (S_ISDIR(st.st_mode))
3211 ereport(ERROR,
3212 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3213 errmsg("\"%s\" is a directory", cstate->filename)));
3214 }
3215 }
3216
3217 if (!cstate->binary)
3218 {
3219 /* must rely on user to tell us... */
3220 cstate->file_has_oids = cstate->oids;
3221 }
3222 else
3223 {
3224 /* Read and verify binary header */
3225 char readSig[11];
3226 int32 tmp;
3227
3228 /* Signature */
3229 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3230 memcmp(readSig, BinarySignature, 11) != 0)
3231 ereport(ERROR,
3232 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3233 errmsg("COPY file signature not recognized")));
3234 /* Flags field */
3235 if (!CopyGetInt32(cstate, &tmp))
3236 ereport(ERROR,
3237 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3238 errmsg("invalid COPY file header (missing flags)")));
3239 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3240 tmp &= ~(1 << 16);
3241 if ((tmp >> 16) != 0)
3242 ereport(ERROR,
3243 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3244 errmsg("unrecognized critical flags in COPY file header")));
3245 /* Header extension length */
3246 if (!CopyGetInt32(cstate, &tmp) ||
3247 tmp < 0)
3248 ereport(ERROR,
3249 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3250 errmsg("invalid COPY file header (missing length)")));
3251 /* Skip extension header, if present */
3252 while (tmp-- > 0)
3253 {
3254 if (CopyGetData(cstate, readSig, 1, 1) != 1)
3255 ereport(ERROR,
3256 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3257 errmsg("invalid COPY file header (wrong length)")));
3258 }
3259 }
3260
3261 if (cstate->file_has_oids && cstate->binary)
3262 {
3263 getTypeBinaryInputInfo(OIDOID,
3264 &in_func_oid, &cstate->oid_typioparam);
3265 fmgr_info(in_func_oid, &cstate->oid_in_function);
3266 }
3267
3268 /* create workspace for CopyReadAttributes results */
3269 if (!cstate->binary)
3270 {
3271 AttrNumber attr_count = list_length(cstate->attnumlist);
3272 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3273
3274 cstate->max_fields = nfields;
3275 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3276 }
3277
3278 MemoryContextSwitchTo(oldcontext);
3279
3280 return cstate;
3281 }
3282
3283 /*
3284 * Read raw fields in the next line for COPY FROM in text or csv mode.
3285 * Return false if no more lines.
3286 *
3287 * An internal temporary buffer is returned via 'fields'. It is valid until
3288 * the next call of the function. Since the function returns all raw fields
3289 * in the input file, 'nfields' could be different from the number of columns
3290 * in the relation.
3291 *
3292 * NOTE: force_not_null option are not applied to the returned fields.
3293 */
3294 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)3295 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3296 {
3297 int fldct;
3298 bool done;
3299
3300 /* only available for text or csv input */
3301 Assert(!cstate->binary);
3302
3303 /* on input just throw the header line away */
3304 if (cstate->cur_lineno == 0 && cstate->header_line)
3305 {
3306 cstate->cur_lineno++;
3307 if (CopyReadLine(cstate))
3308 return false; /* done */
3309 }
3310
3311 cstate->cur_lineno++;
3312
3313 /* Actually read the line into memory here */
3314 done = CopyReadLine(cstate);
3315
3316 /*
3317 * EOF at start of line means we're done. If we see EOF after some
3318 * characters, we act as though it was newline followed by EOF, ie,
3319 * process the line and then exit loop on next iteration.
3320 */
3321 if (done && cstate->line_buf.len == 0)
3322 return false;
3323
3324 /* Parse the line into de-escaped field values */
3325 if (cstate->csv_mode)
3326 fldct = CopyReadAttributesCSV(cstate);
3327 else
3328 fldct = CopyReadAttributesText(cstate);
3329
3330 *fields = cstate->raw_fields;
3331 *nfields = fldct;
3332 return true;
3333 }
3334
3335 /*
3336 * Read next tuple from file for COPY FROM. Return false if no more tuples.
3337 *
3338 * 'econtext' is used to evaluate default expression for each columns not
3339 * read from the file. It can be NULL when no default values are used, i.e.
3340 * when all columns are read from the file.
3341 *
3342 * 'values' and 'nulls' arrays must be the same length as columns of the
3343 * relation passed to BeginCopyFrom. This function fills the arrays.
3344 * Oid of the tuple is returned with 'tupleOid' separately.
3345 */
3346 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3347 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3348 Datum *values, bool *nulls, Oid *tupleOid)
3349 {
3350 TupleDesc tupDesc;
3351 AttrNumber num_phys_attrs,
3352 attr_count,
3353 num_defaults = cstate->num_defaults;
3354 FmgrInfo *in_functions = cstate->in_functions;
3355 Oid *typioparams = cstate->typioparams;
3356 int i;
3357 int nfields;
3358 bool isnull;
3359 bool file_has_oids = cstate->file_has_oids;
3360 int *defmap = cstate->defmap;
3361 ExprState **defexprs = cstate->defexprs;
3362
3363 tupDesc = RelationGetDescr(cstate->rel);
3364 num_phys_attrs = tupDesc->natts;
3365 attr_count = list_length(cstate->attnumlist);
3366 nfields = file_has_oids ? (attr_count + 1) : attr_count;
3367
3368 /* Initialize all values for row to NULL */
3369 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3370 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3371
3372 if (!cstate->binary)
3373 {
3374 char **field_strings;
3375 ListCell *cur;
3376 int fldct;
3377 int fieldno;
3378 char *string;
3379
3380 /* read raw fields in the next line */
3381 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3382 return false;
3383
3384 /* check for overflowing fields */
3385 if (nfields > 0 && fldct > nfields)
3386 ereport(ERROR,
3387 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3388 errmsg("extra data after last expected column")));
3389
3390 fieldno = 0;
3391
3392 /* Read the OID field if present */
3393 if (file_has_oids)
3394 {
3395 if (fieldno >= fldct)
3396 ereport(ERROR,
3397 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3398 errmsg("missing data for OID column")));
3399 string = field_strings[fieldno++];
3400
3401 if (string == NULL)
3402 ereport(ERROR,
3403 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3404 errmsg("null OID in COPY data")));
3405 else if (cstate->oids && tupleOid != NULL)
3406 {
3407 cstate->cur_attname = "oid";
3408 cstate->cur_attval = string;
3409 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3410 CStringGetDatum(string)));
3411 if (*tupleOid == InvalidOid)
3412 ereport(ERROR,
3413 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3414 errmsg("invalid OID in COPY data")));
3415 cstate->cur_attname = NULL;
3416 cstate->cur_attval = NULL;
3417 }
3418 }
3419
3420 /* Loop to read the user attributes on the line. */
3421 foreach(cur, cstate->attnumlist)
3422 {
3423 int attnum = lfirst_int(cur);
3424 int m = attnum - 1;
3425 Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3426
3427 if (fieldno >= fldct)
3428 ereport(ERROR,
3429 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3430 errmsg("missing data for column \"%s\"",
3431 NameStr(att->attname))));
3432 string = field_strings[fieldno++];
3433
3434 if (cstate->convert_select_flags &&
3435 !cstate->convert_select_flags[m])
3436 {
3437 /* ignore input field, leaving column as NULL */
3438 continue;
3439 }
3440
3441 if (cstate->csv_mode)
3442 {
3443 if (string == NULL &&
3444 cstate->force_notnull_flags[m])
3445 {
3446 /*
3447 * FORCE_NOT_NULL option is set and column is NULL -
3448 * convert it to the NULL string.
3449 */
3450 string = cstate->null_print;
3451 }
3452 else if (string != NULL && cstate->force_null_flags[m]
3453 && strcmp(string, cstate->null_print) == 0)
3454 {
3455 /*
3456 * FORCE_NULL option is set and column matches the NULL
3457 * string. It must have been quoted, or otherwise the
3458 * string would already have been set to NULL. Convert it
3459 * to NULL as specified.
3460 */
3461 string = NULL;
3462 }
3463 }
3464
3465 cstate->cur_attname = NameStr(att->attname);
3466 cstate->cur_attval = string;
3467 values[m] = InputFunctionCall(&in_functions[m],
3468 string,
3469 typioparams[m],
3470 att->atttypmod);
3471 if (string != NULL)
3472 nulls[m] = false;
3473 cstate->cur_attname = NULL;
3474 cstate->cur_attval = NULL;
3475 }
3476
3477 Assert(fieldno == nfields);
3478 }
3479 else
3480 {
3481 /* binary */
3482 int16 fld_count;
3483 ListCell *cur;
3484
3485 cstate->cur_lineno++;
3486
3487 if (!CopyGetInt16(cstate, &fld_count))
3488 {
3489 /* EOF detected (end of file, or protocol-level EOF) */
3490 return false;
3491 }
3492
3493 if (fld_count == -1)
3494 {
3495 /*
3496 * Received EOF marker. In a V3-protocol copy, wait for the
3497 * protocol-level EOF, and complain if it doesn't come
3498 * immediately. This ensures that we correctly handle CopyFail,
3499 * if client chooses to send that now.
3500 *
3501 * Note that we MUST NOT try to read more data in an old-protocol
3502 * copy, since there is no protocol-level EOF marker then. We
3503 * could go either way for copy from file, but choose to throw
3504 * error if there's data after the EOF marker, for consistency
3505 * with the new-protocol case.
3506 */
3507 char dummy;
3508
3509 if (cstate->copy_dest != COPY_OLD_FE &&
3510 CopyGetData(cstate, &dummy, 1, 1) > 0)
3511 ereport(ERROR,
3512 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3513 errmsg("received copy data after EOF marker")));
3514 return false;
3515 }
3516
3517 if (fld_count != attr_count)
3518 ereport(ERROR,
3519 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3520 errmsg("row field count is %d, expected %d",
3521 (int) fld_count, attr_count)));
3522
3523 if (file_has_oids)
3524 {
3525 Oid loaded_oid;
3526
3527 cstate->cur_attname = "oid";
3528 loaded_oid =
3529 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3530 0,
3531 &cstate->oid_in_function,
3532 cstate->oid_typioparam,
3533 -1,
3534 &isnull));
3535 if (isnull || loaded_oid == InvalidOid)
3536 ereport(ERROR,
3537 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3538 errmsg("invalid OID in COPY data")));
3539 cstate->cur_attname = NULL;
3540 if (cstate->oids && tupleOid != NULL)
3541 *tupleOid = loaded_oid;
3542 }
3543
3544 i = 0;
3545 foreach(cur, cstate->attnumlist)
3546 {
3547 int attnum = lfirst_int(cur);
3548 int m = attnum - 1;
3549 Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3550
3551 cstate->cur_attname = NameStr(att->attname);
3552 i++;
3553 values[m] = CopyReadBinaryAttribute(cstate,
3554 i,
3555 &in_functions[m],
3556 typioparams[m],
3557 att->atttypmod,
3558 &nulls[m]);
3559 cstate->cur_attname = NULL;
3560 }
3561 }
3562
3563 /*
3564 * Now compute and insert any defaults available for the columns not
3565 * provided by the input data. Anything not processed here or above will
3566 * remain NULL.
3567 */
3568 for (i = 0; i < num_defaults; i++)
3569 {
3570 /*
3571 * The caller must supply econtext and have switched into the
3572 * per-tuple memory context in it.
3573 */
3574 Assert(econtext != NULL);
3575 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3576
3577 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3578 &nulls[defmap[i]]);
3579 }
3580
3581 return true;
3582 }
3583
3584 /*
3585 * Clean up storage and release resources for COPY FROM.
3586 */
3587 void
EndCopyFrom(CopyState cstate)3588 EndCopyFrom(CopyState cstate)
3589 {
3590 /* No COPY FROM related resources except memory. */
3591
3592 EndCopy(cstate);
3593 }
3594
3595 /*
3596 * Read the next input line and stash it in line_buf, with conversion to
3597 * server encoding.
3598 *
3599 * Result is true if read was terminated by EOF, false if terminated
3600 * by newline. The terminating newline or EOF marker is not included
3601 * in the final value of line_buf.
3602 */
3603 static bool
CopyReadLine(CopyState cstate)3604 CopyReadLine(CopyState cstate)
3605 {
3606 bool result;
3607
3608 resetStringInfo(&cstate->line_buf);
3609 cstate->line_buf_valid = true;
3610
3611 /* Mark that encoding conversion hasn't occurred yet */
3612 cstate->line_buf_converted = false;
3613
3614 /* Parse data and transfer into line_buf */
3615 result = CopyReadLineText(cstate);
3616
3617 if (result)
3618 {
3619 /*
3620 * Reached EOF. In protocol version 3, we should ignore anything
3621 * after \. up to the protocol end of copy data. (XXX maybe better
3622 * not to treat \. as special?)
3623 */
3624 if (cstate->copy_dest == COPY_NEW_FE)
3625 {
3626 do
3627 {
3628 cstate->raw_buf_index = cstate->raw_buf_len;
3629 } while (CopyLoadRawBuf(cstate));
3630 }
3631 }
3632 else
3633 {
3634 /*
3635 * If we didn't hit EOF, then we must have transferred the EOL marker
3636 * to line_buf along with the data. Get rid of it.
3637 */
3638 switch (cstate->eol_type)
3639 {
3640 case EOL_NL:
3641 Assert(cstate->line_buf.len >= 1);
3642 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3643 cstate->line_buf.len--;
3644 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3645 break;
3646 case EOL_CR:
3647 Assert(cstate->line_buf.len >= 1);
3648 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3649 cstate->line_buf.len--;
3650 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3651 break;
3652 case EOL_CRNL:
3653 Assert(cstate->line_buf.len >= 2);
3654 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3655 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3656 cstate->line_buf.len -= 2;
3657 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3658 break;
3659 case EOL_UNKNOWN:
3660 /* shouldn't get here */
3661 Assert(false);
3662 break;
3663 }
3664 }
3665
3666 /* Done reading the line. Convert it to server encoding. */
3667 if (cstate->need_transcoding)
3668 {
3669 char *cvt;
3670
3671 cvt = pg_any_to_server(cstate->line_buf.data,
3672 cstate->line_buf.len,
3673 cstate->file_encoding);
3674 if (cvt != cstate->line_buf.data)
3675 {
3676 /* transfer converted data back to line_buf */
3677 resetStringInfo(&cstate->line_buf);
3678 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3679 pfree(cvt);
3680 }
3681 }
3682
3683 /* Now it's safe to use the buffer in error messages */
3684 cstate->line_buf_converted = true;
3685
3686 return result;
3687 }
3688
3689 /*
3690 * CopyReadLineText - inner loop of CopyReadLine for text mode
3691 */
3692 static bool
CopyReadLineText(CopyState cstate)3693 CopyReadLineText(CopyState cstate)
3694 {
3695 char *copy_raw_buf;
3696 int raw_buf_ptr;
3697 int copy_buf_len;
3698 bool need_data = false;
3699 bool hit_eof = false;
3700 bool result = false;
3701 char mblen_str[2];
3702
3703 /* CSV variables */
3704 bool first_char_in_line = true;
3705 bool in_quote = false,
3706 last_was_esc = false;
3707 char quotec = '\0';
3708 char escapec = '\0';
3709
3710 if (cstate->csv_mode)
3711 {
3712 quotec = cstate->quote[0];
3713 escapec = cstate->escape[0];
3714 /* ignore special escape processing if it's the same as quotec */
3715 if (quotec == escapec)
3716 escapec = '\0';
3717 }
3718
3719 mblen_str[1] = '\0';
3720
3721 /*
3722 * The objective of this loop is to transfer the entire next input line
3723 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3724 * \n) and the end-of-copy marker (\.).
3725 *
3726 * In CSV mode, \r and \n inside a quoted field are just part of the data
3727 * value and are put in line_buf. We keep just enough state to know if we
3728 * are currently in a quoted field or not.
3729 *
3730 * These four characters, and the CSV escape and quote characters, are
3731 * assumed the same in frontend and backend encodings.
3732 *
3733 * For speed, we try to move data from raw_buf to line_buf in chunks
3734 * rather than one character at a time. raw_buf_ptr points to the next
3735 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3736 * have been determined to be part of the line, but not yet transferred to
3737 * line_buf.
3738 *
3739 * For a little extra speed within the loop, we copy raw_buf and
3740 * raw_buf_len into local variables.
3741 */
3742 copy_raw_buf = cstate->raw_buf;
3743 raw_buf_ptr = cstate->raw_buf_index;
3744 copy_buf_len = cstate->raw_buf_len;
3745
3746 for (;;)
3747 {
3748 int prev_raw_ptr;
3749 char c;
3750
3751 /*
3752 * Load more data if needed. Ideally we would just force four bytes
3753 * of read-ahead and avoid the many calls to
3754 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3755 * does not allow us to read too far ahead or we might read into the
3756 * next data, so we read-ahead only as far we know we can. One
3757 * optimization would be to read-ahead four byte here if
3758 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3759 * considering the size of the buffer.
3760 */
3761 if (raw_buf_ptr >= copy_buf_len || need_data)
3762 {
3763 REFILL_LINEBUF;
3764
3765 /*
3766 * Try to read some more data. This will certainly reset
3767 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3768 */
3769 if (!CopyLoadRawBuf(cstate))
3770 hit_eof = true;
3771 raw_buf_ptr = 0;
3772 copy_buf_len = cstate->raw_buf_len;
3773
3774 /*
3775 * If we are completely out of data, break out of the loop,
3776 * reporting EOF.
3777 */
3778 if (copy_buf_len <= 0)
3779 {
3780 result = true;
3781 break;
3782 }
3783 need_data = false;
3784 }
3785
3786 /* OK to fetch a character */
3787 prev_raw_ptr = raw_buf_ptr;
3788 c = copy_raw_buf[raw_buf_ptr++];
3789
3790 if (cstate->csv_mode)
3791 {
3792 /*
3793 * If character is '\\' or '\r', we may need to look ahead below.
3794 * Force fetch of the next character if we don't already have it.
3795 * We need to do this before changing CSV state, in case one of
3796 * these characters is also the quote or escape character.
3797 *
3798 * Note: old-protocol does not like forced prefetch, but it's OK
3799 * here since we cannot validly be at EOF.
3800 */
3801 if (c == '\\' || c == '\r')
3802 {
3803 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3804 }
3805
3806 /*
3807 * Dealing with quotes and escapes here is mildly tricky. If the
3808 * quote char is also the escape char, there's no problem - we
3809 * just use the char as a toggle. If they are different, we need
3810 * to ensure that we only take account of an escape inside a
3811 * quoted field and immediately preceding a quote char, and not
3812 * the second in an escape-escape sequence.
3813 */
3814 if (in_quote && c == escapec)
3815 last_was_esc = !last_was_esc;
3816 if (c == quotec && !last_was_esc)
3817 in_quote = !in_quote;
3818 if (c != escapec)
3819 last_was_esc = false;
3820
3821 /*
3822 * Updating the line count for embedded CR and/or LF chars is
3823 * necessarily a little fragile - this test is probably about the
3824 * best we can do. (XXX it's arguable whether we should do this
3825 * at all --- is cur_lineno a physical or logical count?)
3826 */
3827 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3828 cstate->cur_lineno++;
3829 }
3830
3831 /* Process \r */
3832 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3833 {
3834 /* Check for \r\n on first line, _and_ handle \r\n. */
3835 if (cstate->eol_type == EOL_UNKNOWN ||
3836 cstate->eol_type == EOL_CRNL)
3837 {
3838 /*
3839 * If need more data, go back to loop top to load it.
3840 *
3841 * Note that if we are at EOF, c will wind up as '\0' because
3842 * of the guaranteed pad of raw_buf.
3843 */
3844 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3845
3846 /* get next char */
3847 c = copy_raw_buf[raw_buf_ptr];
3848
3849 if (c == '\n')
3850 {
3851 raw_buf_ptr++; /* eat newline */
3852 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3853 }
3854 else
3855 {
3856 /* found \r, but no \n */
3857 if (cstate->eol_type == EOL_CRNL)
3858 ereport(ERROR,
3859 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3860 !cstate->csv_mode ?
3861 errmsg("literal carriage return found in data") :
3862 errmsg("unquoted carriage return found in data"),
3863 !cstate->csv_mode ?
3864 errhint("Use \"\\r\" to represent carriage return.") :
3865 errhint("Use quoted CSV field to represent carriage return.")));
3866
3867 /*
3868 * if we got here, it is the first line and we didn't find
3869 * \n, so don't consume the peeked character
3870 */
3871 cstate->eol_type = EOL_CR;
3872 }
3873 }
3874 else if (cstate->eol_type == EOL_NL)
3875 ereport(ERROR,
3876 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3877 !cstate->csv_mode ?
3878 errmsg("literal carriage return found in data") :
3879 errmsg("unquoted carriage return found in data"),
3880 !cstate->csv_mode ?
3881 errhint("Use \"\\r\" to represent carriage return.") :
3882 errhint("Use quoted CSV field to represent carriage return.")));
3883 /* If reach here, we have found the line terminator */
3884 break;
3885 }
3886
3887 /* Process \n */
3888 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3889 {
3890 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3891 ereport(ERROR,
3892 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3893 !cstate->csv_mode ?
3894 errmsg("literal newline found in data") :
3895 errmsg("unquoted newline found in data"),
3896 !cstate->csv_mode ?
3897 errhint("Use \"\\n\" to represent newline.") :
3898 errhint("Use quoted CSV field to represent newline.")));
3899 cstate->eol_type = EOL_NL; /* in case not set yet */
3900 /* If reach here, we have found the line terminator */
3901 break;
3902 }
3903
3904 /*
3905 * In CSV mode, we only recognize \. alone on a line. This is because
3906 * \. is a valid CSV data value.
3907 */
3908 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3909 {
3910 char c2;
3911
3912 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3913 IF_NEED_REFILL_AND_EOF_BREAK(0);
3914
3915 /* -----
3916 * get next character
3917 * Note: we do not change c so if it isn't \., we can fall
3918 * through and continue processing for file encoding.
3919 * -----
3920 */
3921 c2 = copy_raw_buf[raw_buf_ptr];
3922
3923 if (c2 == '.')
3924 {
3925 raw_buf_ptr++; /* consume the '.' */
3926
3927 /*
3928 * Note: if we loop back for more data here, it does not
3929 * matter that the CSV state change checks are re-executed; we
3930 * will come back here with no important state changed.
3931 */
3932 if (cstate->eol_type == EOL_CRNL)
3933 {
3934 /* Get the next character */
3935 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3936 /* if hit_eof, c2 will become '\0' */
3937 c2 = copy_raw_buf[raw_buf_ptr++];
3938
3939 if (c2 == '\n')
3940 {
3941 if (!cstate->csv_mode)
3942 ereport(ERROR,
3943 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3944 errmsg("end-of-copy marker does not match previous newline style")));
3945 else
3946 NO_END_OF_COPY_GOTO;
3947 }
3948 else if (c2 != '\r')
3949 {
3950 if (!cstate->csv_mode)
3951 ereport(ERROR,
3952 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3953 errmsg("end-of-copy marker corrupt")));
3954 else
3955 NO_END_OF_COPY_GOTO;
3956 }
3957 }
3958
3959 /* Get the next character */
3960 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3961 /* if hit_eof, c2 will become '\0' */
3962 c2 = copy_raw_buf[raw_buf_ptr++];
3963
3964 if (c2 != '\r' && c2 != '\n')
3965 {
3966 if (!cstate->csv_mode)
3967 ereport(ERROR,
3968 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3969 errmsg("end-of-copy marker corrupt")));
3970 else
3971 NO_END_OF_COPY_GOTO;
3972 }
3973
3974 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3975 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3976 (cstate->eol_type == EOL_CR && c2 != '\r'))
3977 {
3978 ereport(ERROR,
3979 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3980 errmsg("end-of-copy marker does not match previous newline style")));
3981 }
3982
3983 /*
3984 * Transfer only the data before the \. into line_buf, then
3985 * discard the data and the \. sequence.
3986 */
3987 if (prev_raw_ptr > cstate->raw_buf_index)
3988 appendBinaryStringInfo(&cstate->line_buf,
3989 cstate->raw_buf + cstate->raw_buf_index,
3990 prev_raw_ptr - cstate->raw_buf_index);
3991 cstate->raw_buf_index = raw_buf_ptr;
3992 result = true; /* report EOF */
3993 break;
3994 }
3995 else if (!cstate->csv_mode)
3996 {
3997 /*
3998 * If we are here, it means we found a backslash followed by
3999 * something other than a period. In non-CSV mode, anything
4000 * after a backslash is special, so we skip over that second
4001 * character too. If we didn't do that \\. would be
4002 * considered an eof-of copy, while in non-CSV mode it is a
4003 * literal backslash followed by a period. In CSV mode,
4004 * backslashes are not special, so we want to process the
4005 * character after the backslash just like a normal character,
4006 * so we don't increment in those cases.
4007 *
4008 * Set 'c' to skip whole character correctly in multi-byte
4009 * encodings. If we don't have the whole character in the
4010 * buffer yet, we might loop back to process it, after all,
4011 * but that's OK because multi-byte characters cannot have any
4012 * special meaning.
4013 */
4014 raw_buf_ptr++;
4015 c = c2;
4016 }
4017 }
4018
4019 /*
4020 * This label is for CSV cases where \. appears at the start of a
4021 * line, but there is more text after it, meaning it was a data value.
4022 * We are more strict for \. in CSV mode because \. could be a data
4023 * value, while in non-CSV mode, \. cannot be a data value.
4024 */
4025 not_end_of_copy:
4026
4027 /*
4028 * Process all bytes of a multi-byte character as a group.
4029 *
4030 * We only support multi-byte sequences where the first byte has the
4031 * high-bit set, so as an optimization we can avoid this block
4032 * entirely if it is not set.
4033 */
4034 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4035 {
4036 int mblen;
4037
4038 mblen_str[0] = c;
4039 /* All our encodings only read the first byte to get the length */
4040 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4041 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
4042 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4043 raw_buf_ptr += mblen - 1;
4044 }
4045 first_char_in_line = false;
4046 } /* end of outer loop */
4047
4048 /*
4049 * Transfer any still-uncopied data to line_buf.
4050 */
4051 REFILL_LINEBUF;
4052
4053 return result;
4054 }
4055
4056 /*
4057 * Return decimal value for a hexadecimal digit
4058 */
4059 static int
GetDecimalFromHex(char hex)4060 GetDecimalFromHex(char hex)
4061 {
4062 if (isdigit((unsigned char) hex))
4063 return hex - '0';
4064 else
4065 return tolower((unsigned char) hex) - 'a' + 10;
4066 }
4067
4068 /*
4069 * Parse the current line into separate attributes (fields),
4070 * performing de-escaping as needed.
4071 *
4072 * The input is in line_buf. We use attribute_buf to hold the result
4073 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4074 * string, or NULL when the input matches the null marker string.
4075 * This array is expanded as necessary.
4076 *
4077 * (Note that the caller cannot check for nulls since the returned
4078 * string would be the post-de-escaping equivalent, which may look
4079 * the same as some valid data string.)
4080 *
4081 * delim is the column delimiter string (must be just one byte for now).
4082 * null_print is the null marker string. Note that this is compared to
4083 * the pre-de-escaped input string.
4084 *
4085 * The return value is the number of fields actually read.
4086 */
4087 static int
CopyReadAttributesText(CopyState cstate)4088 CopyReadAttributesText(CopyState cstate)
4089 {
4090 char delimc = cstate->delim[0];
4091 int fieldno;
4092 char *output_ptr;
4093 char *cur_ptr;
4094 char *line_end_ptr;
4095
4096 /*
4097 * We need a special case for zero-column tables: check that the input
4098 * line is empty, and return.
4099 */
4100 if (cstate->max_fields <= 0)
4101 {
4102 if (cstate->line_buf.len != 0)
4103 ereport(ERROR,
4104 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4105 errmsg("extra data after last expected column")));
4106 return 0;
4107 }
4108
4109 resetStringInfo(&cstate->attribute_buf);
4110
4111 /*
4112 * The de-escaped attributes will certainly not be longer than the input
4113 * data line, so we can just force attribute_buf to be large enough and
4114 * then transfer data without any checks for enough space. We need to do
4115 * it this way because enlarging attribute_buf mid-stream would invalidate
4116 * pointers already stored into cstate->raw_fields[].
4117 */
4118 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4119 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4120 output_ptr = cstate->attribute_buf.data;
4121
4122 /* set pointer variables for loop */
4123 cur_ptr = cstate->line_buf.data;
4124 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4125
4126 /* Outer loop iterates over fields */
4127 fieldno = 0;
4128 for (;;)
4129 {
4130 bool found_delim = false;
4131 char *start_ptr;
4132 char *end_ptr;
4133 int input_len;
4134 bool saw_non_ascii = false;
4135
4136 /* Make sure there is enough space for the next value */
4137 if (fieldno >= cstate->max_fields)
4138 {
4139 cstate->max_fields *= 2;
4140 cstate->raw_fields =
4141 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4142 }
4143
4144 /* Remember start of field on both input and output sides */
4145 start_ptr = cur_ptr;
4146 cstate->raw_fields[fieldno] = output_ptr;
4147
4148 /*
4149 * Scan data for field.
4150 *
4151 * Note that in this loop, we are scanning to locate the end of field
4152 * and also speculatively performing de-escaping. Once we find the
4153 * end-of-field, we can match the raw field contents against the null
4154 * marker string. Only after that comparison fails do we know that
4155 * de-escaping is actually the right thing to do; therefore we *must
4156 * not* throw any syntax errors before we've done the null-marker
4157 * check.
4158 */
4159 for (;;)
4160 {
4161 char c;
4162
4163 end_ptr = cur_ptr;
4164 if (cur_ptr >= line_end_ptr)
4165 break;
4166 c = *cur_ptr++;
4167 if (c == delimc)
4168 {
4169 found_delim = true;
4170 break;
4171 }
4172 if (c == '\\')
4173 {
4174 if (cur_ptr >= line_end_ptr)
4175 break;
4176 c = *cur_ptr++;
4177 switch (c)
4178 {
4179 case '0':
4180 case '1':
4181 case '2':
4182 case '3':
4183 case '4':
4184 case '5':
4185 case '6':
4186 case '7':
4187 {
4188 /* handle \013 */
4189 int val;
4190
4191 val = OCTVALUE(c);
4192 if (cur_ptr < line_end_ptr)
4193 {
4194 c = *cur_ptr;
4195 if (ISOCTAL(c))
4196 {
4197 cur_ptr++;
4198 val = (val << 3) + OCTVALUE(c);
4199 if (cur_ptr < line_end_ptr)
4200 {
4201 c = *cur_ptr;
4202 if (ISOCTAL(c))
4203 {
4204 cur_ptr++;
4205 val = (val << 3) + OCTVALUE(c);
4206 }
4207 }
4208 }
4209 }
4210 c = val & 0377;
4211 if (c == '\0' || IS_HIGHBIT_SET(c))
4212 saw_non_ascii = true;
4213 }
4214 break;
4215 case 'x':
4216 /* Handle \x3F */
4217 if (cur_ptr < line_end_ptr)
4218 {
4219 char hexchar = *cur_ptr;
4220
4221 if (isxdigit((unsigned char) hexchar))
4222 {
4223 int val = GetDecimalFromHex(hexchar);
4224
4225 cur_ptr++;
4226 if (cur_ptr < line_end_ptr)
4227 {
4228 hexchar = *cur_ptr;
4229 if (isxdigit((unsigned char) hexchar))
4230 {
4231 cur_ptr++;
4232 val = (val << 4) + GetDecimalFromHex(hexchar);
4233 }
4234 }
4235 c = val & 0xff;
4236 if (c == '\0' || IS_HIGHBIT_SET(c))
4237 saw_non_ascii = true;
4238 }
4239 }
4240 break;
4241 case 'b':
4242 c = '\b';
4243 break;
4244 case 'f':
4245 c = '\f';
4246 break;
4247 case 'n':
4248 c = '\n';
4249 break;
4250 case 'r':
4251 c = '\r';
4252 break;
4253 case 't':
4254 c = '\t';
4255 break;
4256 case 'v':
4257 c = '\v';
4258 break;
4259
4260 /*
4261 * in all other cases, take the char after '\'
4262 * literally
4263 */
4264 }
4265 }
4266
4267 /* Add c to output string */
4268 *output_ptr++ = c;
4269 }
4270
4271 /* Check whether raw input matched null marker */
4272 input_len = end_ptr - start_ptr;
4273 if (input_len == cstate->null_print_len &&
4274 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4275 cstate->raw_fields[fieldno] = NULL;
4276 else
4277 {
4278 /*
4279 * At this point we know the field is supposed to contain data.
4280 *
4281 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4282 * resulting string is valid data for the db encoding.
4283 */
4284 if (saw_non_ascii)
4285 {
4286 char *fld = cstate->raw_fields[fieldno];
4287
4288 pg_verifymbstr(fld, output_ptr - fld, false);
4289 }
4290 }
4291
4292 /* Terminate attribute value in output area */
4293 *output_ptr++ = '\0';
4294
4295 fieldno++;
4296 /* Done if we hit EOL instead of a delim */
4297 if (!found_delim)
4298 break;
4299 }
4300
4301 /* Clean up state of attribute_buf */
4302 output_ptr--;
4303 Assert(*output_ptr == '\0');
4304 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4305
4306 return fieldno;
4307 }
4308
4309 /*
4310 * Parse the current line into separate attributes (fields),
4311 * performing de-escaping as needed. This has exactly the same API as
4312 * CopyReadAttributesText, except we parse the fields according to
4313 * "standard" (i.e. common) CSV usage.
4314 */
4315 static int
CopyReadAttributesCSV(CopyState cstate)4316 CopyReadAttributesCSV(CopyState cstate)
4317 {
4318 char delimc = cstate->delim[0];
4319 char quotec = cstate->quote[0];
4320 char escapec = cstate->escape[0];
4321 int fieldno;
4322 char *output_ptr;
4323 char *cur_ptr;
4324 char *line_end_ptr;
4325
4326 /*
4327 * We need a special case for zero-column tables: check that the input
4328 * line is empty, and return.
4329 */
4330 if (cstate->max_fields <= 0)
4331 {
4332 if (cstate->line_buf.len != 0)
4333 ereport(ERROR,
4334 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4335 errmsg("extra data after last expected column")));
4336 return 0;
4337 }
4338
4339 resetStringInfo(&cstate->attribute_buf);
4340
4341 /*
4342 * The de-escaped attributes will certainly not be longer than the input
4343 * data line, so we can just force attribute_buf to be large enough and
4344 * then transfer data without any checks for enough space. We need to do
4345 * it this way because enlarging attribute_buf mid-stream would invalidate
4346 * pointers already stored into cstate->raw_fields[].
4347 */
4348 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4349 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4350 output_ptr = cstate->attribute_buf.data;
4351
4352 /* set pointer variables for loop */
4353 cur_ptr = cstate->line_buf.data;
4354 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4355
4356 /* Outer loop iterates over fields */
4357 fieldno = 0;
4358 for (;;)
4359 {
4360 bool found_delim = false;
4361 bool saw_quote = false;
4362 char *start_ptr;
4363 char *end_ptr;
4364 int input_len;
4365
4366 /* Make sure there is enough space for the next value */
4367 if (fieldno >= cstate->max_fields)
4368 {
4369 cstate->max_fields *= 2;
4370 cstate->raw_fields =
4371 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4372 }
4373
4374 /* Remember start of field on both input and output sides */
4375 start_ptr = cur_ptr;
4376 cstate->raw_fields[fieldno] = output_ptr;
4377
4378 /*
4379 * Scan data for field,
4380 *
4381 * The loop starts in "not quote" mode and then toggles between that
4382 * and "in quote" mode. The loop exits normally if it is in "not
4383 * quote" mode and a delimiter or line end is seen.
4384 */
4385 for (;;)
4386 {
4387 char c;
4388
4389 /* Not in quote */
4390 for (;;)
4391 {
4392 end_ptr = cur_ptr;
4393 if (cur_ptr >= line_end_ptr)
4394 goto endfield;
4395 c = *cur_ptr++;
4396 /* unquoted field delimiter */
4397 if (c == delimc)
4398 {
4399 found_delim = true;
4400 goto endfield;
4401 }
4402 /* start of quoted field (or part of field) */
4403 if (c == quotec)
4404 {
4405 saw_quote = true;
4406 break;
4407 }
4408 /* Add c to output string */
4409 *output_ptr++ = c;
4410 }
4411
4412 /* In quote */
4413 for (;;)
4414 {
4415 end_ptr = cur_ptr;
4416 if (cur_ptr >= line_end_ptr)
4417 ereport(ERROR,
4418 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4419 errmsg("unterminated CSV quoted field")));
4420
4421 c = *cur_ptr++;
4422
4423 /* escape within a quoted field */
4424 if (c == escapec)
4425 {
4426 /*
4427 * peek at the next char if available, and escape it if it
4428 * is an escape char or a quote char
4429 */
4430 if (cur_ptr < line_end_ptr)
4431 {
4432 char nextc = *cur_ptr;
4433
4434 if (nextc == escapec || nextc == quotec)
4435 {
4436 *output_ptr++ = nextc;
4437 cur_ptr++;
4438 continue;
4439 }
4440 }
4441 }
4442
4443 /*
4444 * end of quoted field. Must do this test after testing for
4445 * escape in case quote char and escape char are the same
4446 * (which is the common case).
4447 */
4448 if (c == quotec)
4449 break;
4450
4451 /* Add c to output string */
4452 *output_ptr++ = c;
4453 }
4454 }
4455 endfield:
4456
4457 /* Terminate attribute value in output area */
4458 *output_ptr++ = '\0';
4459
4460 /* Check whether raw input matched null marker */
4461 input_len = end_ptr - start_ptr;
4462 if (!saw_quote && input_len == cstate->null_print_len &&
4463 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4464 cstate->raw_fields[fieldno] = NULL;
4465
4466 fieldno++;
4467 /* Done if we hit EOL instead of a delim */
4468 if (!found_delim)
4469 break;
4470 }
4471
4472 /* Clean up state of attribute_buf */
4473 output_ptr--;
4474 Assert(*output_ptr == '\0');
4475 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4476
4477 return fieldno;
4478 }
4479
4480
4481 /*
4482 * Read a binary attribute
4483 */
4484 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4485 CopyReadBinaryAttribute(CopyState cstate,
4486 int column_no, FmgrInfo *flinfo,
4487 Oid typioparam, int32 typmod,
4488 bool *isnull)
4489 {
4490 int32 fld_size;
4491 Datum result;
4492
4493 if (!CopyGetInt32(cstate, &fld_size))
4494 ereport(ERROR,
4495 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4496 errmsg("unexpected EOF in COPY data")));
4497 if (fld_size == -1)
4498 {
4499 *isnull = true;
4500 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4501 }
4502 if (fld_size < 0)
4503 ereport(ERROR,
4504 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4505 errmsg("invalid field size")));
4506
4507 /* reset attribute_buf to empty, and load raw data in it */
4508 resetStringInfo(&cstate->attribute_buf);
4509
4510 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4511 if (CopyGetData(cstate, cstate->attribute_buf.data,
4512 fld_size, fld_size) != fld_size)
4513 ereport(ERROR,
4514 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4515 errmsg("unexpected EOF in COPY data")));
4516
4517 cstate->attribute_buf.len = fld_size;
4518 cstate->attribute_buf.data[fld_size] = '\0';
4519
4520 /* Call the column type's binary input converter */
4521 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4522 typioparam, typmod);
4523
4524 /* Trouble if it didn't eat the whole buffer */
4525 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4526 ereport(ERROR,
4527 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4528 errmsg("incorrect binary data format")));
4529
4530 *isnull = false;
4531 return result;
4532 }
4533
4534 /*
4535 * Send text representation of one attribute, with conversion and escaping
4536 */
4537 #define DUMPSOFAR() \
4538 do { \
4539 if (ptr > start) \
4540 CopySendData(cstate, start, ptr - start); \
4541 } while (0)
4542
4543 static void
CopyAttributeOutText(CopyState cstate,char * string)4544 CopyAttributeOutText(CopyState cstate, char *string)
4545 {
4546 char *ptr;
4547 char *start;
4548 char c;
4549 char delimc = cstate->delim[0];
4550
4551 if (cstate->need_transcoding)
4552 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4553 else
4554 ptr = string;
4555
4556 /*
4557 * We have to grovel through the string searching for control characters
4558 * and instances of the delimiter character. In most cases, though, these
4559 * are infrequent. To avoid overhead from calling CopySendData once per
4560 * character, we dump out all characters between escaped characters in a
4561 * single call. The loop invariant is that the data from "start" to "ptr"
4562 * can be sent literally, but hasn't yet been.
4563 *
4564 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4565 * in valid backend encodings, extra bytes of a multibyte character never
4566 * look like ASCII. This loop is sufficiently performance-critical that
4567 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4568 * of the normal safe-encoding path.
4569 */
4570 if (cstate->encoding_embeds_ascii)
4571 {
4572 start = ptr;
4573 while ((c = *ptr) != '\0')
4574 {
4575 if ((unsigned char) c < (unsigned char) 0x20)
4576 {
4577 /*
4578 * \r and \n must be escaped, the others are traditional. We
4579 * prefer to dump these using the C-like notation, rather than
4580 * a backslash and the literal character, because it makes the
4581 * dump file a bit more proof against Microsoftish data
4582 * mangling.
4583 */
4584 switch (c)
4585 {
4586 case '\b':
4587 c = 'b';
4588 break;
4589 case '\f':
4590 c = 'f';
4591 break;
4592 case '\n':
4593 c = 'n';
4594 break;
4595 case '\r':
4596 c = 'r';
4597 break;
4598 case '\t':
4599 c = 't';
4600 break;
4601 case '\v':
4602 c = 'v';
4603 break;
4604 default:
4605 /* If it's the delimiter, must backslash it */
4606 if (c == delimc)
4607 break;
4608 /* All ASCII control chars are length 1 */
4609 ptr++;
4610 continue; /* fall to end of loop */
4611 }
4612 /* if we get here, we need to convert the control char */
4613 DUMPSOFAR();
4614 CopySendChar(cstate, '\\');
4615 CopySendChar(cstate, c);
4616 start = ++ptr; /* do not include char in next run */
4617 }
4618 else if (c == '\\' || c == delimc)
4619 {
4620 DUMPSOFAR();
4621 CopySendChar(cstate, '\\');
4622 start = ptr++; /* we include char in next run */
4623 }
4624 else if (IS_HIGHBIT_SET(c))
4625 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4626 else
4627 ptr++;
4628 }
4629 }
4630 else
4631 {
4632 start = ptr;
4633 while ((c = *ptr) != '\0')
4634 {
4635 if ((unsigned char) c < (unsigned char) 0x20)
4636 {
4637 /*
4638 * \r and \n must be escaped, the others are traditional. We
4639 * prefer to dump these using the C-like notation, rather than
4640 * a backslash and the literal character, because it makes the
4641 * dump file a bit more proof against Microsoftish data
4642 * mangling.
4643 */
4644 switch (c)
4645 {
4646 case '\b':
4647 c = 'b';
4648 break;
4649 case '\f':
4650 c = 'f';
4651 break;
4652 case '\n':
4653 c = 'n';
4654 break;
4655 case '\r':
4656 c = 'r';
4657 break;
4658 case '\t':
4659 c = 't';
4660 break;
4661 case '\v':
4662 c = 'v';
4663 break;
4664 default:
4665 /* If it's the delimiter, must backslash it */
4666 if (c == delimc)
4667 break;
4668 /* All ASCII control chars are length 1 */
4669 ptr++;
4670 continue; /* fall to end of loop */
4671 }
4672 /* if we get here, we need to convert the control char */
4673 DUMPSOFAR();
4674 CopySendChar(cstate, '\\');
4675 CopySendChar(cstate, c);
4676 start = ++ptr; /* do not include char in next run */
4677 }
4678 else if (c == '\\' || c == delimc)
4679 {
4680 DUMPSOFAR();
4681 CopySendChar(cstate, '\\');
4682 start = ptr++; /* we include char in next run */
4683 }
4684 else
4685 ptr++;
4686 }
4687 }
4688
4689 DUMPSOFAR();
4690 }
4691
4692 /*
4693 * Send text representation of one attribute, with conversion and
4694 * CSV-style escaping
4695 */
4696 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4697 CopyAttributeOutCSV(CopyState cstate, char *string,
4698 bool use_quote, bool single_attr)
4699 {
4700 char *ptr;
4701 char *start;
4702 char c;
4703 char delimc = cstate->delim[0];
4704 char quotec = cstate->quote[0];
4705 char escapec = cstate->escape[0];
4706
4707 /* force quoting if it matches null_print (before conversion!) */
4708 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4709 use_quote = true;
4710
4711 if (cstate->need_transcoding)
4712 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4713 else
4714 ptr = string;
4715
4716 /*
4717 * Make a preliminary pass to discover if it needs quoting
4718 */
4719 if (!use_quote)
4720 {
4721 /*
4722 * Because '\.' can be a data value, quote it if it appears alone on a
4723 * line so it is not interpreted as the end-of-data marker.
4724 */
4725 if (single_attr && strcmp(ptr, "\\.") == 0)
4726 use_quote = true;
4727 else
4728 {
4729 char *tptr = ptr;
4730
4731 while ((c = *tptr) != '\0')
4732 {
4733 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4734 {
4735 use_quote = true;
4736 break;
4737 }
4738 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4739 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4740 else
4741 tptr++;
4742 }
4743 }
4744 }
4745
4746 if (use_quote)
4747 {
4748 CopySendChar(cstate, quotec);
4749
4750 /*
4751 * We adopt the same optimization strategy as in CopyAttributeOutText
4752 */
4753 start = ptr;
4754 while ((c = *ptr) != '\0')
4755 {
4756 if (c == quotec || c == escapec)
4757 {
4758 DUMPSOFAR();
4759 CopySendChar(cstate, escapec);
4760 start = ptr; /* we include char in next run */
4761 }
4762 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4763 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4764 else
4765 ptr++;
4766 }
4767 DUMPSOFAR();
4768
4769 CopySendChar(cstate, quotec);
4770 }
4771 else
4772 {
4773 /* If it doesn't need quoting, we can just dump it as-is */
4774 CopySendString(cstate, ptr);
4775 }
4776 }
4777
4778 /*
4779 * CopyGetAttnums - build an integer list of attnums to be copied
4780 *
4781 * The input attnamelist is either the user-specified column list,
4782 * or NIL if there was none (in which case we want all the non-dropped
4783 * columns).
4784 *
4785 * rel can be NULL ... it's only used for error reports.
4786 */
4787 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4788 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4789 {
4790 List *attnums = NIL;
4791
4792 if (attnamelist == NIL)
4793 {
4794 /* Generate default column list */
4795 int attr_count = tupDesc->natts;
4796 int i;
4797
4798 for (i = 0; i < attr_count; i++)
4799 {
4800 if (TupleDescAttr(tupDesc, i)->attisdropped)
4801 continue;
4802 attnums = lappend_int(attnums, i + 1);
4803 }
4804 }
4805 else
4806 {
4807 /* Validate the user-supplied list and extract attnums */
4808 ListCell *l;
4809
4810 foreach(l, attnamelist)
4811 {
4812 char *name = strVal(lfirst(l));
4813 int attnum;
4814 int i;
4815
4816 /* Lookup column name */
4817 attnum = InvalidAttrNumber;
4818 for (i = 0; i < tupDesc->natts; i++)
4819 {
4820 Form_pg_attribute att = TupleDescAttr(tupDesc, i);
4821
4822 if (att->attisdropped)
4823 continue;
4824 if (namestrcmp(&(att->attname), name) == 0)
4825 {
4826 attnum = att->attnum;
4827 break;
4828 }
4829 }
4830 if (attnum == InvalidAttrNumber)
4831 {
4832 if (rel != NULL)
4833 ereport(ERROR,
4834 (errcode(ERRCODE_UNDEFINED_COLUMN),
4835 errmsg("column \"%s\" of relation \"%s\" does not exist",
4836 name, RelationGetRelationName(rel))));
4837 else
4838 ereport(ERROR,
4839 (errcode(ERRCODE_UNDEFINED_COLUMN),
4840 errmsg("column \"%s\" does not exist",
4841 name)));
4842 }
4843 /* Check for duplicates */
4844 if (list_member_int(attnums, attnum))
4845 ereport(ERROR,
4846 (errcode(ERRCODE_DUPLICATE_COLUMN),
4847 errmsg("column \"%s\" specified more than once",
4848 name)));
4849 attnums = lappend_int(attnums, attnum);
4850 }
4851 }
4852
4853 return attnums;
4854 }
4855
4856
4857 /*
4858 * copy_dest_startup --- executor startup
4859 */
4860 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4861 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4862 {
4863 /* no-op */
4864 }
4865
4866 /*
4867 * copy_dest_receive --- receive one tuple
4868 */
4869 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4870 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4871 {
4872 DR_copy *myState = (DR_copy *) self;
4873 CopyState cstate = myState->cstate;
4874
4875 /* Make sure the tuple is fully deconstructed */
4876 slot_getallattrs(slot);
4877
4878 /* And send the data */
4879 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4880 myState->processed++;
4881
4882 return true;
4883 }
4884
4885 /*
4886 * copy_dest_shutdown --- executor end
4887 */
4888 static void
copy_dest_shutdown(DestReceiver * self)4889 copy_dest_shutdown(DestReceiver *self)
4890 {
4891 /* no-op */
4892 }
4893
4894 /*
4895 * copy_dest_destroy --- release DestReceiver object
4896 */
4897 static void
copy_dest_destroy(DestReceiver * self)4898 copy_dest_destroy(DestReceiver *self)
4899 {
4900 pfree(self);
4901 }
4902
4903 /*
4904 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4905 */
4906 DestReceiver *
CreateCopyDestReceiver(void)4907 CreateCopyDestReceiver(void)
4908 {
4909 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4910
4911 self->pub.receiveSlot = copy_dest_receive;
4912 self->pub.rStartup = copy_dest_startup;
4913 self->pub.rShutdown = copy_dest_shutdown;
4914 self->pub.rDestroy = copy_dest_destroy;
4915 self->pub.mydest = DestCopyOut;
4916
4917 self->cstate = NULL; /* will be set later */
4918 self->processed = 0;
4919
4920 return (DestReceiver *) self;
4921 }
4922