1 /*-------------------------------------------------------------------------
2 *
3 * copy.c
4 * Implements the COPY utility command
5 *
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/copy.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/portal.h"
47 #include "utils/rel.h"
48 #include "utils/rls.h"
49 #include "utils/snapmgr.h"
50
51
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
54
55 /*
56 * Represents the different source/dest cases we need to worry about at
57 * the bottom level
58 */
59 typedef enum CopyDest
60 {
61 COPY_FILE, /* to/from file (or a piped program) */
62 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
63 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
64 } CopyDest;
65
66 /*
67 * Represents the end-of-line terminator type of the input
68 */
69 typedef enum EolType
70 {
71 EOL_UNKNOWN,
72 EOL_NL,
73 EOL_CR,
74 EOL_CRNL
75 } EolType;
76
77 /*
78 * This struct contains all the state variables used throughout a COPY
79 * operation. For simplicity, we use the same struct for all variants of COPY,
80 * even though some fields are used in only some cases.
81 *
82 * Multi-byte encodings: all supported client-side encodings encode multi-byte
83 * characters by having the first byte's high bit set. Subsequent bytes of the
84 * character can have the high bit not set. When scanning data in such an
85 * encoding to look for a match to a single-byte (ie ASCII) character, we must
86 * use the full pg_encoding_mblen() machinery to skip over multibyte
87 * characters, else we might find a false match to a trailing byte. In
88 * supported server encodings, there is no possibility of a false match, and
89 * it's faster to make useless comparisons to trailing bytes than it is to
90 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
91 * when we have to do it the hard way.
92 */
93 typedef struct CopyStateData
94 {
95 /* low-level state data */
96 CopyDest copy_dest; /* type of copy source/destination */
97 FILE *copy_file; /* used if copy_dest == COPY_FILE */
98 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
99 * dest == COPY_NEW_FE in COPY FROM */
100 bool fe_eof; /* true if detected end of copy data */
101 EolType eol_type; /* EOL type of input */
102 int file_encoding; /* file or remote side's character encoding */
103 bool need_transcoding; /* file encoding diff from server? */
104 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
105
106 /* parameters from the COPY command */
107 Relation rel; /* relation to copy to or from */
108 QueryDesc *queryDesc; /* executable query to copy from */
109 List *attnumlist; /* integer list of attnums to copy */
110 char *filename; /* filename, or NULL for STDIN/STDOUT */
111 bool is_program; /* is 'filename' a program to popen? */
112 bool binary; /* binary format? */
113 bool oids; /* include OIDs? */
114 bool freeze; /* freeze rows on loading? */
115 bool csv_mode; /* Comma Separated Value format? */
116 bool header_line; /* CSV header line? */
117 char *null_print; /* NULL marker string (server encoding!) */
118 int null_print_len; /* length of same */
119 char *null_print_client; /* same converted to file encoding */
120 char *delim; /* column delimiter (must be 1 byte) */
121 char *quote; /* CSV quote char (must be 1 byte) */
122 char *escape; /* CSV escape char (must be 1 byte) */
123 List *force_quote; /* list of column names */
124 bool force_quote_all; /* FORCE_QUOTE *? */
125 bool *force_quote_flags; /* per-column CSV FQ flags */
126 List *force_notnull; /* list of column names */
127 bool *force_notnull_flags; /* per-column CSV FNN flags */
128 List *force_null; /* list of column names */
129 bool *force_null_flags; /* per-column CSV FN flags */
130 bool convert_selectively; /* do selective binary conversion? */
131 List *convert_select; /* list of column names (can be NIL) */
132 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
133
134 /* these are just for error messages, see CopyFromErrorCallback */
135 const char *cur_relname; /* table name for error messages */
136 uint64 cur_lineno; /* line number for error messages */
137 const char *cur_attname; /* current att for error messages */
138 const char *cur_attval; /* current att value for error messages */
139
140 /*
141 * Working state for COPY TO/FROM
142 */
143 MemoryContext copycontext; /* per-copy execution context */
144
145 /*
146 * Working state for COPY TO
147 */
148 FmgrInfo *out_functions; /* lookup info for output functions */
149 MemoryContext rowcontext; /* per-row evaluation context */
150
151 /*
152 * Working state for COPY FROM
153 */
154 AttrNumber num_defaults;
155 bool file_has_oids;
156 FmgrInfo oid_in_function;
157 Oid oid_typioparam;
158 FmgrInfo *in_functions; /* array of input functions for each attrs */
159 Oid *typioparams; /* array of element types for in_functions */
160 int *defmap; /* array of default att numbers */
161 ExprState **defexprs; /* array of default att expressions */
162 bool volatile_defexprs; /* is any of defexprs volatile? */
163 List *range_table;
164
165 /*
166 * These variables are used to reduce overhead in textual COPY FROM.
167 *
168 * attribute_buf holds the separated, de-escaped text for each field of
169 * the current line. The CopyReadAttributes functions return arrays of
170 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
171 * the buffer on each cycle.
172 */
173 StringInfoData attribute_buf;
174
175 /* field raw data pointers found by COPY FROM */
176
177 int max_fields;
178 char **raw_fields;
179
180 /*
181 * Similarly, line_buf holds the whole input line being processed. The
182 * input cycle is first to read the whole line into line_buf, convert it
183 * to server encoding there, and then extract the individual attribute
184 * fields into attribute_buf. line_buf is preserved unmodified so that we
185 * can display it in error messages if appropriate.
186 */
187 StringInfoData line_buf;
188 bool line_buf_converted; /* converted to server encoding? */
189 bool line_buf_valid; /* contains the row being processed? */
190
191 /*
192 * Finally, raw_buf holds raw data read from the data source (file or
193 * client connection). CopyReadLine parses this data sufficiently to
194 * locate line boundaries, then transfers the data to line_buf and
195 * converts it. Note: we guarantee that there is a \0 at
196 * raw_buf[raw_buf_len].
197 */
198 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
199 char *raw_buf;
200 int raw_buf_index; /* next byte to process */
201 int raw_buf_len; /* total # of bytes stored */
202 } CopyStateData;
203
204 /* DestReceiver for COPY (query) TO */
205 typedef struct
206 {
207 DestReceiver pub; /* publicly-known function pointers */
208 CopyState cstate; /* CopyStateData for the command */
209 uint64 processed; /* # of tuples processed */
210 } DR_copy;
211
212
213 /*
214 * These macros centralize code used to process line_buf and raw_buf buffers.
215 * They are macros because they often do continue/break control and to avoid
216 * function call overhead in tight COPY loops.
217 *
218 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
219 * prevent the continue/break processing from working. We end the "if (1)"
220 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
221 * any "else" in the calling code, and to avoid any compiler warnings about
222 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
223 */
224
225 /*
226 * This keeps the character read at the top of the loop in the buffer
227 * even if there is more than one read-ahead.
228 */
229 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
230 if (1) \
231 { \
232 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
233 { \
234 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
235 need_data = true; \
236 continue; \
237 } \
238 } else ((void) 0)
239
240 /* This consumes the remainder of the buffer and breaks */
241 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
242 if (1) \
243 { \
244 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
245 { \
246 if (extralen) \
247 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
248 /* backslash just before EOF, treat as data char */ \
249 result = true; \
250 break; \
251 } \
252 } else ((void) 0)
253
254 /*
255 * Transfer any approved data to line_buf; must do this to be sure
256 * there is some room in raw_buf.
257 */
258 #define REFILL_LINEBUF \
259 if (1) \
260 { \
261 if (raw_buf_ptr > cstate->raw_buf_index) \
262 { \
263 appendBinaryStringInfo(&cstate->line_buf, \
264 cstate->raw_buf + cstate->raw_buf_index, \
265 raw_buf_ptr - cstate->raw_buf_index); \
266 cstate->raw_buf_index = raw_buf_ptr; \
267 } \
268 } else ((void) 0)
269
270 /* Undo any read-ahead and jump out of the block. */
271 #define NO_END_OF_COPY_GOTO \
272 if (1) \
273 { \
274 raw_buf_ptr = prev_raw_ptr + 1; \
275 goto not_end_of_copy; \
276 } else ((void) 0)
277
278 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
279
280
281 /* non-export function prototypes */
282 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
283 const char *queryString, const Oid queryRelId, List *attnamelist,
284 List *options);
285 static void EndCopy(CopyState cstate);
286 static void ClosePipeToProgram(CopyState cstate);
287 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
288 const Oid queryRelId, const char *filename, bool is_program,
289 List *attnamelist, List *options);
290 static void EndCopyTo(CopyState cstate);
291 static uint64 DoCopyTo(CopyState cstate);
292 static uint64 CopyTo(CopyState cstate);
293 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
294 Datum *values, bool *nulls);
295 static uint64 CopyFrom(CopyState cstate);
296 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
297 CommandId mycid, int hi_options,
298 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
299 BulkInsertState bistate,
300 int nBufferedTuples, HeapTuple *bufferedTuples,
301 uint64 firstBufferedLineNo);
302 static bool CopyReadLine(CopyState cstate);
303 static bool CopyReadLineText(CopyState cstate);
304 static int CopyReadAttributesText(CopyState cstate);
305 static int CopyReadAttributesCSV(CopyState cstate);
306 static Datum CopyReadBinaryAttribute(CopyState cstate,
307 int column_no, FmgrInfo *flinfo,
308 Oid typioparam, int32 typmod,
309 bool *isnull);
310 static void CopyAttributeOutText(CopyState cstate, char *string);
311 static void CopyAttributeOutCSV(CopyState cstate, char *string,
312 bool use_quote, bool single_attr);
313 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
314 List *attnamelist);
315 static char *limit_printout_length(const char *str);
316
317 /* Low-level communications functions */
318 static void SendCopyBegin(CopyState cstate);
319 static void ReceiveCopyBegin(CopyState cstate);
320 static void SendCopyEnd(CopyState cstate);
321 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
322 static void CopySendString(CopyState cstate, const char *str);
323 static void CopySendChar(CopyState cstate, char c);
324 static void CopySendEndOfRow(CopyState cstate);
325 static int CopyGetData(CopyState cstate, void *databuf,
326 int minread, int maxread);
327 static void CopySendInt32(CopyState cstate, int32 val);
328 static bool CopyGetInt32(CopyState cstate, int32 *val);
329 static void CopySendInt16(CopyState cstate, int16 val);
330 static bool CopyGetInt16(CopyState cstate, int16 *val);
331
332
333 /*
334 * Send copy start/stop messages for frontend copies. These have changed
335 * in past protocol redesigns.
336 */
337 static void
SendCopyBegin(CopyState cstate)338 SendCopyBegin(CopyState cstate)
339 {
340 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
341 {
342 /* new way */
343 StringInfoData buf;
344 int natts = list_length(cstate->attnumlist);
345 int16 format = (cstate->binary ? 1 : 0);
346 int i;
347
348 pq_beginmessage(&buf, 'H');
349 pq_sendbyte(&buf, format); /* overall format */
350 pq_sendint(&buf, natts, 2);
351 for (i = 0; i < natts; i++)
352 pq_sendint(&buf, format, 2); /* per-column formats */
353 pq_endmessage(&buf);
354 cstate->copy_dest = COPY_NEW_FE;
355 }
356 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
357 {
358 /* old way */
359 if (cstate->binary)
360 ereport(ERROR,
361 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 errmsg("COPY BINARY is not supported to stdout or from stdin")));
363 pq_putemptymessage('H');
364 /* grottiness needed for old COPY OUT protocol */
365 pq_startcopyout();
366 cstate->copy_dest = COPY_OLD_FE;
367 }
368 else
369 {
370 /* very old way */
371 if (cstate->binary)
372 ereport(ERROR,
373 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374 errmsg("COPY BINARY is not supported to stdout or from stdin")));
375 pq_putemptymessage('B');
376 /* grottiness needed for old COPY OUT protocol */
377 pq_startcopyout();
378 cstate->copy_dest = COPY_OLD_FE;
379 }
380 }
381
382 static void
ReceiveCopyBegin(CopyState cstate)383 ReceiveCopyBegin(CopyState cstate)
384 {
385 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
386 {
387 /* new way */
388 StringInfoData buf;
389 int natts = list_length(cstate->attnumlist);
390 int16 format = (cstate->binary ? 1 : 0);
391 int i;
392
393 pq_beginmessage(&buf, 'G');
394 pq_sendbyte(&buf, format); /* overall format */
395 pq_sendint(&buf, natts, 2);
396 for (i = 0; i < natts; i++)
397 pq_sendint(&buf, format, 2); /* per-column formats */
398 pq_endmessage(&buf);
399 cstate->copy_dest = COPY_NEW_FE;
400 cstate->fe_msgbuf = makeStringInfo();
401 }
402 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
403 {
404 /* old way */
405 if (cstate->binary)
406 ereport(ERROR,
407 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 errmsg("COPY BINARY is not supported to stdout or from stdin")));
409 pq_putemptymessage('G');
410 /* any error in old protocol will make us lose sync */
411 pq_startmsgread();
412 cstate->copy_dest = COPY_OLD_FE;
413 }
414 else
415 {
416 /* very old way */
417 if (cstate->binary)
418 ereport(ERROR,
419 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 errmsg("COPY BINARY is not supported to stdout or from stdin")));
421 pq_putemptymessage('D');
422 /* any error in old protocol will make us lose sync */
423 pq_startmsgread();
424 cstate->copy_dest = COPY_OLD_FE;
425 }
426 /* We *must* flush here to ensure FE knows it can send. */
427 pq_flush();
428 }
429
430 static void
SendCopyEnd(CopyState cstate)431 SendCopyEnd(CopyState cstate)
432 {
433 if (cstate->copy_dest == COPY_NEW_FE)
434 {
435 /* Shouldn't have any unsent data */
436 Assert(cstate->fe_msgbuf->len == 0);
437 /* Send Copy Done message */
438 pq_putemptymessage('c');
439 }
440 else
441 {
442 CopySendData(cstate, "\\.", 2);
443 /* Need to flush out the trailer (this also appends a newline) */
444 CopySendEndOfRow(cstate);
445 pq_endcopyout(false);
446 }
447 }
448
449 /*----------
450 * CopySendData sends output data to the destination (file or frontend)
451 * CopySendString does the same for null-terminated strings
452 * CopySendChar does the same for single characters
453 * CopySendEndOfRow does the appropriate thing at end of each data row
454 * (data is not actually flushed except by CopySendEndOfRow)
455 *
456 * NB: no data conversion is applied by these functions
457 *----------
458 */
459 static void
CopySendData(CopyState cstate,const void * databuf,int datasize)460 CopySendData(CopyState cstate, const void *databuf, int datasize)
461 {
462 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
463 }
464
465 static void
CopySendString(CopyState cstate,const char * str)466 CopySendString(CopyState cstate, const char *str)
467 {
468 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
469 }
470
471 static void
CopySendChar(CopyState cstate,char c)472 CopySendChar(CopyState cstate, char c)
473 {
474 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
475 }
476
477 static void
CopySendEndOfRow(CopyState cstate)478 CopySendEndOfRow(CopyState cstate)
479 {
480 StringInfo fe_msgbuf = cstate->fe_msgbuf;
481
482 switch (cstate->copy_dest)
483 {
484 case COPY_FILE:
485 if (!cstate->binary)
486 {
487 /* Default line termination depends on platform */
488 #ifndef WIN32
489 CopySendChar(cstate, '\n');
490 #else
491 CopySendString(cstate, "\r\n");
492 #endif
493 }
494
495 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
496 cstate->copy_file) != 1 ||
497 ferror(cstate->copy_file))
498 {
499 if (cstate->is_program)
500 {
501 if (errno == EPIPE)
502 {
503 /*
504 * The pipe will be closed automatically on error at
505 * the end of transaction, but we might get a better
506 * error message from the subprocess' exit code than
507 * just "Broken Pipe"
508 */
509 ClosePipeToProgram(cstate);
510
511 /*
512 * If ClosePipeToProgram() didn't throw an error, the
513 * program terminated normally, but closed the pipe
514 * first. Restore errno, and throw an error.
515 */
516 errno = EPIPE;
517 }
518 ereport(ERROR,
519 (errcode_for_file_access(),
520 errmsg("could not write to COPY program: %m")));
521 }
522 else
523 ereport(ERROR,
524 (errcode_for_file_access(),
525 errmsg("could not write to COPY file: %m")));
526 }
527 break;
528 case COPY_OLD_FE:
529 /* The FE/BE protocol uses \n as newline for all platforms */
530 if (!cstate->binary)
531 CopySendChar(cstate, '\n');
532
533 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
534 {
535 /* no hope of recovering connection sync, so FATAL */
536 ereport(FATAL,
537 (errcode(ERRCODE_CONNECTION_FAILURE),
538 errmsg("connection lost during COPY to stdout")));
539 }
540 break;
541 case COPY_NEW_FE:
542 /* The FE/BE protocol uses \n as newline for all platforms */
543 if (!cstate->binary)
544 CopySendChar(cstate, '\n');
545
546 /* Dump the accumulated row as one CopyData message */
547 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
548 break;
549 }
550
551 resetStringInfo(fe_msgbuf);
552 }
553
554 /*
555 * CopyGetData reads data from the source (file or frontend)
556 *
557 * We attempt to read at least minread, and at most maxread, bytes from
558 * the source. The actual number of bytes read is returned; if this is
559 * less than minread, EOF was detected.
560 *
561 * Note: when copying from the frontend, we expect a proper EOF mark per
562 * protocol; if the frontend simply drops the connection, we raise error.
563 * It seems unwise to allow the COPY IN to complete normally in that case.
564 *
565 * NB: no data conversion is applied here.
566 */
567 static int
CopyGetData(CopyState cstate,void * databuf,int minread,int maxread)568 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
569 {
570 int bytesread = 0;
571
572 switch (cstate->copy_dest)
573 {
574 case COPY_FILE:
575 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
576 if (ferror(cstate->copy_file))
577 ereport(ERROR,
578 (errcode_for_file_access(),
579 errmsg("could not read from COPY file: %m")));
580 break;
581 case COPY_OLD_FE:
582
583 /*
584 * We cannot read more than minread bytes (which in practice is 1)
585 * because old protocol doesn't have any clear way of separating
586 * the COPY stream from following data. This is slow, but not any
587 * slower than the code path was originally, and we don't care
588 * much anymore about the performance of old protocol.
589 */
590 if (pq_getbytes((char *) databuf, minread))
591 {
592 /* Only a \. terminator is legal EOF in old protocol */
593 ereport(ERROR,
594 (errcode(ERRCODE_CONNECTION_FAILURE),
595 errmsg("unexpected EOF on client connection with an open transaction")));
596 }
597 bytesread = minread;
598 break;
599 case COPY_NEW_FE:
600 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
601 {
602 int avail;
603
604 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
605 {
606 /* Try to receive another message */
607 int mtype;
608
609 readmessage:
610 HOLD_CANCEL_INTERRUPTS();
611 pq_startmsgread();
612 mtype = pq_getbyte();
613 if (mtype == EOF)
614 ereport(ERROR,
615 (errcode(ERRCODE_CONNECTION_FAILURE),
616 errmsg("unexpected EOF on client connection with an open transaction")));
617 if (pq_getmessage(cstate->fe_msgbuf, 0))
618 ereport(ERROR,
619 (errcode(ERRCODE_CONNECTION_FAILURE),
620 errmsg("unexpected EOF on client connection with an open transaction")));
621 RESUME_CANCEL_INTERRUPTS();
622 switch (mtype)
623 {
624 case 'd': /* CopyData */
625 break;
626 case 'c': /* CopyDone */
627 /* COPY IN correctly terminated by frontend */
628 cstate->fe_eof = true;
629 return bytesread;
630 case 'f': /* CopyFail */
631 ereport(ERROR,
632 (errcode(ERRCODE_QUERY_CANCELED),
633 errmsg("COPY from stdin failed: %s",
634 pq_getmsgstring(cstate->fe_msgbuf))));
635 break;
636 case 'H': /* Flush */
637 case 'S': /* Sync */
638
639 /*
640 * Ignore Flush/Sync for the convenience of client
641 * libraries (such as libpq) that may send those
642 * without noticing that the command they just
643 * sent was COPY.
644 */
645 goto readmessage;
646 default:
647 ereport(ERROR,
648 (errcode(ERRCODE_PROTOCOL_VIOLATION),
649 errmsg("unexpected message type 0x%02X during COPY from stdin",
650 mtype)));
651 break;
652 }
653 }
654 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
655 if (avail > maxread)
656 avail = maxread;
657 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
658 databuf = (void *) ((char *) databuf + avail);
659 maxread -= avail;
660 bytesread += avail;
661 }
662 break;
663 }
664
665 return bytesread;
666 }
667
668
669 /*
670 * These functions do apply some data conversion
671 */
672
673 /*
674 * CopySendInt32 sends an int32 in network byte order
675 */
676 static void
CopySendInt32(CopyState cstate,int32 val)677 CopySendInt32(CopyState cstate, int32 val)
678 {
679 uint32 buf;
680
681 buf = htonl((uint32) val);
682 CopySendData(cstate, &buf, sizeof(buf));
683 }
684
685 /*
686 * CopyGetInt32 reads an int32 that appears in network byte order
687 *
688 * Returns true if OK, false if EOF
689 */
690 static bool
CopyGetInt32(CopyState cstate,int32 * val)691 CopyGetInt32(CopyState cstate, int32 *val)
692 {
693 uint32 buf;
694
695 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
696 {
697 *val = 0; /* suppress compiler warning */
698 return false;
699 }
700 *val = (int32) ntohl(buf);
701 return true;
702 }
703
704 /*
705 * CopySendInt16 sends an int16 in network byte order
706 */
707 static void
CopySendInt16(CopyState cstate,int16 val)708 CopySendInt16(CopyState cstate, int16 val)
709 {
710 uint16 buf;
711
712 buf = htons((uint16) val);
713 CopySendData(cstate, &buf, sizeof(buf));
714 }
715
716 /*
717 * CopyGetInt16 reads an int16 that appears in network byte order
718 */
719 static bool
CopyGetInt16(CopyState cstate,int16 * val)720 CopyGetInt16(CopyState cstate, int16 *val)
721 {
722 uint16 buf;
723
724 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
725 {
726 *val = 0; /* suppress compiler warning */
727 return false;
728 }
729 *val = (int16) ntohs(buf);
730 return true;
731 }
732
733
734 /*
735 * CopyLoadRawBuf loads some more data into raw_buf
736 *
737 * Returns TRUE if able to obtain at least one more byte, else FALSE.
738 *
739 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
740 * down to the start of the buffer and then we load more data after that.
741 * This case is used only when a frontend multibyte character crosses a
742 * bufferload boundary.
743 */
744 static bool
CopyLoadRawBuf(CopyState cstate)745 CopyLoadRawBuf(CopyState cstate)
746 {
747 int nbytes;
748 int inbytes;
749
750 if (cstate->raw_buf_index < cstate->raw_buf_len)
751 {
752 /* Copy down the unprocessed data */
753 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
754 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
755 nbytes);
756 }
757 else
758 nbytes = 0; /* no data need be saved */
759
760 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
761 1, RAW_BUF_SIZE - nbytes);
762 nbytes += inbytes;
763 cstate->raw_buf[nbytes] = '\0';
764 cstate->raw_buf_index = 0;
765 cstate->raw_buf_len = nbytes;
766 return (inbytes > 0);
767 }
768
769
770 /*
771 * DoCopy executes the SQL COPY statement
772 *
773 * Either unload or reload contents of table <relation>, depending on <from>.
774 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
775 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
776 * or DELETE query.
777 *
778 * If <pipe> is false, transfer is between the table and the file named
779 * <filename>. Otherwise, transfer is between the table and our regular
780 * input/output stream. The latter could be either stdin/stdout or a
781 * socket, depending on whether we're running under Postmaster control.
782 *
783 * Do not allow a Postgres user without superuser privilege to read from
784 * or write to a file.
785 *
786 * Do not allow the copy if user doesn't have proper permission to access
787 * the table or the specifically requested columns.
788 */
789 Oid
DoCopy(const CopyStmt * stmt,const char * queryString,uint64 * processed)790 DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
791 {
792 CopyState cstate;
793 bool is_from = stmt->is_from;
794 bool pipe = (stmt->filename == NULL);
795 Relation rel;
796 Oid relid;
797 Node *query = NULL;
798 List *range_table = NIL;
799
800 /* Disallow COPY to/from file or program except to superusers. */
801 if (!pipe && !superuser())
802 {
803 if (stmt->is_program)
804 ereport(ERROR,
805 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
806 errmsg("must be superuser to COPY to or from an external program"),
807 errhint("Anyone can COPY to stdout or from stdin. "
808 "psql's \\copy command also works for anyone.")));
809 else
810 ereport(ERROR,
811 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
812 errmsg("must be superuser to COPY to or from a file"),
813 errhint("Anyone can COPY to stdout or from stdin. "
814 "psql's \\copy command also works for anyone.")));
815 }
816
817 if (stmt->relation)
818 {
819 TupleDesc tupDesc;
820 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
821 List *attnums;
822 ListCell *cur;
823 RangeTblEntry *rte;
824
825 Assert(!stmt->query);
826
827 /* Open and lock the relation, using the appropriate lock type. */
828 rel = heap_openrv(stmt->relation,
829 (is_from ? RowExclusiveLock : AccessShareLock));
830
831 relid = RelationGetRelid(rel);
832
833 rte = makeNode(RangeTblEntry);
834 rte->rtekind = RTE_RELATION;
835 rte->relid = RelationGetRelid(rel);
836 rte->relkind = rel->rd_rel->relkind;
837 rte->requiredPerms = required_access;
838 range_table = list_make1(rte);
839
840 tupDesc = RelationGetDescr(rel);
841 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
842 foreach(cur, attnums)
843 {
844 int attno = lfirst_int(cur) -
845 FirstLowInvalidHeapAttributeNumber;
846
847 if (is_from)
848 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
849 else
850 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
851 }
852 ExecCheckRTPerms(range_table, true);
853
854 /*
855 * Permission check for row security policies.
856 *
857 * check_enable_rls will ereport(ERROR) if the user has requested
858 * something invalid and will otherwise indicate if we should enable
859 * RLS (returns RLS_ENABLED) or not for this COPY statement.
860 *
861 * If the relation has a row security policy and we are to apply it
862 * then perform a "query" copy and allow the normal query processing
863 * to handle the policies.
864 *
865 * If RLS is not enabled for this, then just fall through to the
866 * normal non-filtering relation handling.
867 */
868 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
869 {
870 SelectStmt *select;
871 ColumnRef *cr;
872 ResTarget *target;
873 RangeVar *from;
874 List *targetList = NIL;
875
876 if (is_from)
877 ereport(ERROR,
878 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 errmsg("COPY FROM not supported with row-level security"),
880 errhint("Use INSERT statements instead.")));
881
882 /*
883 * Build target list
884 *
885 * If no columns are specified in the attribute list of the COPY
886 * command, then the target list is 'all' columns. Therefore, '*'
887 * should be used as the target list for the resulting SELECT
888 * statement.
889 *
890 * In the case that columns are specified in the attribute list,
891 * create a ColumnRef and ResTarget for each column and add them to
892 * the target list for the resulting SELECT statement.
893 */
894 if (!stmt->attlist)
895 {
896 cr = makeNode(ColumnRef);
897 cr->fields = list_make1(makeNode(A_Star));
898 cr->location = -1;
899
900 target = makeNode(ResTarget);
901 target->name = NULL;
902 target->indirection = NIL;
903 target->val = (Node *) cr;
904 target->location = -1;
905
906 targetList = list_make1(target);
907 }
908 else
909 {
910 ListCell *lc;
911
912 foreach(lc, stmt->attlist)
913 {
914 /*
915 * Build the ColumnRef for each column. The ColumnRef
916 * 'fields' property is a String 'Value' node (see
917 * nodes/value.h) that corresponds to the column name
918 * respectively.
919 */
920 cr = makeNode(ColumnRef);
921 cr->fields = list_make1(lfirst(lc));
922 cr->location = -1;
923
924 /* Build the ResTarget and add the ColumnRef to it. */
925 target = makeNode(ResTarget);
926 target->name = NULL;
927 target->indirection = NIL;
928 target->val = (Node *) cr;
929 target->location = -1;
930
931 /* Add each column to the SELECT statement's target list */
932 targetList = lappend(targetList, target);
933 }
934 }
935
936 /*
937 * Build RangeVar for from clause, fully qualified based on the
938 * relation which we have opened and locked.
939 */
940 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
941 pstrdup(RelationGetRelationName(rel)),
942 -1);
943
944 /* Build query */
945 select = makeNode(SelectStmt);
946 select->targetList = targetList;
947 select->fromClause = list_make1(from);
948
949 query = (Node *) select;
950
951 /*
952 * Close the relation for now, but keep the lock on it to prevent
953 * changes between now and when we start the query-based COPY.
954 *
955 * We'll reopen it later as part of the query-based COPY.
956 */
957 heap_close(rel, NoLock);
958 rel = NULL;
959 }
960 }
961 else
962 {
963 Assert(stmt->query);
964
965 query = stmt->query;
966 relid = InvalidOid;
967 rel = NULL;
968 }
969
970 if (is_from)
971 {
972 Assert(rel);
973
974 /* check read-only transaction and parallel mode */
975 if (XactReadOnly && !rel->rd_islocaltemp)
976 PreventCommandIfReadOnly("COPY FROM");
977 PreventCommandIfParallelMode("COPY FROM");
978
979 cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
980 stmt->attlist, stmt->options);
981 cstate->range_table = range_table;
982 *processed = CopyFrom(cstate); /* copy from file to database */
983 EndCopyFrom(cstate);
984 }
985 else
986 {
987 cstate = BeginCopyTo(rel, query, queryString, relid,
988 stmt->filename, stmt->is_program,
989 stmt->attlist, stmt->options);
990 *processed = DoCopyTo(cstate); /* copy from database to file */
991 EndCopyTo(cstate);
992 }
993
994 /*
995 * Close the relation. If reading, we can release the AccessShareLock we
996 * got; if writing, we should hold the lock until end of transaction to
997 * ensure that updates will be committed before lock is released.
998 */
999 if (rel != NULL)
1000 heap_close(rel, (is_from ? NoLock : AccessShareLock));
1001
1002 return relid;
1003 }
1004
1005 /*
1006 * Process the statement option list for COPY.
1007 *
1008 * Scan the options list (a list of DefElem) and transpose the information
1009 * into cstate, applying appropriate error checking.
1010 *
1011 * cstate is assumed to be filled with zeroes initially.
1012 *
1013 * This is exported so that external users of the COPY API can sanity-check
1014 * a list of options. In that usage, cstate should be passed as NULL
1015 * (since external users don't know sizeof(CopyStateData)) and the collected
1016 * data is just leaked until CurrentMemoryContext is reset.
1017 *
1018 * Note that additional checking, such as whether column names listed in FORCE
1019 * QUOTE actually exist, has to be applied later. This just checks for
1020 * self-consistency of the options list.
1021 */
1022 void
ProcessCopyOptions(CopyState cstate,bool is_from,List * options)1023 ProcessCopyOptions(CopyState cstate,
1024 bool is_from,
1025 List *options)
1026 {
1027 bool format_specified = false;
1028 ListCell *option;
1029
1030 /* Support external use for option sanity checking */
1031 if (cstate == NULL)
1032 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1033
1034 cstate->file_encoding = -1;
1035
1036 /* Extract options from the statement node tree */
1037 foreach(option, options)
1038 {
1039 DefElem *defel = (DefElem *) lfirst(option);
1040
1041 if (strcmp(defel->defname, "format") == 0)
1042 {
1043 char *fmt = defGetString(defel);
1044
1045 if (format_specified)
1046 ereport(ERROR,
1047 (errcode(ERRCODE_SYNTAX_ERROR),
1048 errmsg("conflicting or redundant options")));
1049 format_specified = true;
1050 if (strcmp(fmt, "text") == 0)
1051 /* default format */ ;
1052 else if (strcmp(fmt, "csv") == 0)
1053 cstate->csv_mode = true;
1054 else if (strcmp(fmt, "binary") == 0)
1055 cstate->binary = true;
1056 else
1057 ereport(ERROR,
1058 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1059 errmsg("COPY format \"%s\" not recognized", fmt)));
1060 }
1061 else if (strcmp(defel->defname, "oids") == 0)
1062 {
1063 if (cstate->oids)
1064 ereport(ERROR,
1065 (errcode(ERRCODE_SYNTAX_ERROR),
1066 errmsg("conflicting or redundant options")));
1067 cstate->oids = defGetBoolean(defel);
1068 }
1069 else if (strcmp(defel->defname, "freeze") == 0)
1070 {
1071 if (cstate->freeze)
1072 ereport(ERROR,
1073 (errcode(ERRCODE_SYNTAX_ERROR),
1074 errmsg("conflicting or redundant options")));
1075 cstate->freeze = defGetBoolean(defel);
1076 }
1077 else if (strcmp(defel->defname, "delimiter") == 0)
1078 {
1079 if (cstate->delim)
1080 ereport(ERROR,
1081 (errcode(ERRCODE_SYNTAX_ERROR),
1082 errmsg("conflicting or redundant options")));
1083 cstate->delim = defGetString(defel);
1084 }
1085 else if (strcmp(defel->defname, "null") == 0)
1086 {
1087 if (cstate->null_print)
1088 ereport(ERROR,
1089 (errcode(ERRCODE_SYNTAX_ERROR),
1090 errmsg("conflicting or redundant options")));
1091 cstate->null_print = defGetString(defel);
1092 }
1093 else if (strcmp(defel->defname, "header") == 0)
1094 {
1095 if (cstate->header_line)
1096 ereport(ERROR,
1097 (errcode(ERRCODE_SYNTAX_ERROR),
1098 errmsg("conflicting or redundant options")));
1099 cstate->header_line = defGetBoolean(defel);
1100 }
1101 else if (strcmp(defel->defname, "quote") == 0)
1102 {
1103 if (cstate->quote)
1104 ereport(ERROR,
1105 (errcode(ERRCODE_SYNTAX_ERROR),
1106 errmsg("conflicting or redundant options")));
1107 cstate->quote = defGetString(defel);
1108 }
1109 else if (strcmp(defel->defname, "escape") == 0)
1110 {
1111 if (cstate->escape)
1112 ereport(ERROR,
1113 (errcode(ERRCODE_SYNTAX_ERROR),
1114 errmsg("conflicting or redundant options")));
1115 cstate->escape = defGetString(defel);
1116 }
1117 else if (strcmp(defel->defname, "force_quote") == 0)
1118 {
1119 if (cstate->force_quote || cstate->force_quote_all)
1120 ereport(ERROR,
1121 (errcode(ERRCODE_SYNTAX_ERROR),
1122 errmsg("conflicting or redundant options")));
1123 if (defel->arg && IsA(defel->arg, A_Star))
1124 cstate->force_quote_all = true;
1125 else if (defel->arg && IsA(defel->arg, List))
1126 cstate->force_quote = (List *) defel->arg;
1127 else
1128 ereport(ERROR,
1129 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1130 errmsg("argument to option \"%s\" must be a list of column names",
1131 defel->defname)));
1132 }
1133 else if (strcmp(defel->defname, "force_not_null") == 0)
1134 {
1135 if (cstate->force_notnull)
1136 ereport(ERROR,
1137 (errcode(ERRCODE_SYNTAX_ERROR),
1138 errmsg("conflicting or redundant options")));
1139 if (defel->arg && IsA(defel->arg, List))
1140 cstate->force_notnull = (List *) defel->arg;
1141 else
1142 ereport(ERROR,
1143 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1144 errmsg("argument to option \"%s\" must be a list of column names",
1145 defel->defname)));
1146 }
1147 else if (strcmp(defel->defname, "force_null") == 0)
1148 {
1149 if (cstate->force_null)
1150 ereport(ERROR,
1151 (errcode(ERRCODE_SYNTAX_ERROR),
1152 errmsg("conflicting or redundant options")));
1153 if (defel->arg && IsA(defel->arg, List))
1154 cstate->force_null = (List *) defel->arg;
1155 else
1156 ereport(ERROR,
1157 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1158 errmsg("argument to option \"%s\" must be a list of column names",
1159 defel->defname)));
1160 }
1161 else if (strcmp(defel->defname, "convert_selectively") == 0)
1162 {
1163 /*
1164 * Undocumented, not-accessible-from-SQL option: convert only the
1165 * named columns to binary form, storing the rest as NULLs. It's
1166 * allowed for the column list to be NIL.
1167 */
1168 if (cstate->convert_selectively)
1169 ereport(ERROR,
1170 (errcode(ERRCODE_SYNTAX_ERROR),
1171 errmsg("conflicting or redundant options")));
1172 cstate->convert_selectively = true;
1173 if (defel->arg == NULL || IsA(defel->arg, List))
1174 cstate->convert_select = (List *) defel->arg;
1175 else
1176 ereport(ERROR,
1177 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1178 errmsg("argument to option \"%s\" must be a list of column names",
1179 defel->defname)));
1180 }
1181 else if (strcmp(defel->defname, "encoding") == 0)
1182 {
1183 if (cstate->file_encoding >= 0)
1184 ereport(ERROR,
1185 (errcode(ERRCODE_SYNTAX_ERROR),
1186 errmsg("conflicting or redundant options")));
1187 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1188 if (cstate->file_encoding < 0)
1189 ereport(ERROR,
1190 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 errmsg("argument to option \"%s\" must be a valid encoding name",
1192 defel->defname)));
1193 }
1194 else
1195 ereport(ERROR,
1196 (errcode(ERRCODE_SYNTAX_ERROR),
1197 errmsg("option \"%s\" not recognized",
1198 defel->defname)));
1199 }
1200
1201 /*
1202 * Check for incompatible options (must do these two before inserting
1203 * defaults)
1204 */
1205 if (cstate->binary && cstate->delim)
1206 ereport(ERROR,
1207 (errcode(ERRCODE_SYNTAX_ERROR),
1208 errmsg("cannot specify DELIMITER in BINARY mode")));
1209
1210 if (cstate->binary && cstate->null_print)
1211 ereport(ERROR,
1212 (errcode(ERRCODE_SYNTAX_ERROR),
1213 errmsg("cannot specify NULL in BINARY mode")));
1214
1215 /* Set defaults for omitted options */
1216 if (!cstate->delim)
1217 cstate->delim = cstate->csv_mode ? "," : "\t";
1218
1219 if (!cstate->null_print)
1220 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1221 cstate->null_print_len = strlen(cstate->null_print);
1222
1223 if (cstate->csv_mode)
1224 {
1225 if (!cstate->quote)
1226 cstate->quote = "\"";
1227 if (!cstate->escape)
1228 cstate->escape = cstate->quote;
1229 }
1230
1231 /* Only single-byte delimiter strings are supported. */
1232 if (strlen(cstate->delim) != 1)
1233 ereport(ERROR,
1234 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1235 errmsg("COPY delimiter must be a single one-byte character")));
1236
1237 /* Disallow end-of-line characters */
1238 if (strchr(cstate->delim, '\r') != NULL ||
1239 strchr(cstate->delim, '\n') != NULL)
1240 ereport(ERROR,
1241 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1242 errmsg("COPY delimiter cannot be newline or carriage return")));
1243
1244 if (strchr(cstate->null_print, '\r') != NULL ||
1245 strchr(cstate->null_print, '\n') != NULL)
1246 ereport(ERROR,
1247 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1248 errmsg("COPY null representation cannot use newline or carriage return")));
1249
1250 /*
1251 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1252 * backslash because it would be ambiguous. We can't allow the other
1253 * cases because data characters matching the delimiter must be
1254 * backslashed, and certain backslash combinations are interpreted
1255 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1256 * more than strictly necessary, but seems best for consistency and
1257 * future-proofing. Likewise we disallow all digits though only octal
1258 * digits are actually dangerous.
1259 */
1260 if (!cstate->csv_mode &&
1261 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1262 cstate->delim[0]) != NULL)
1263 ereport(ERROR,
1264 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1265 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1266
1267 /* Check header */
1268 if (!cstate->csv_mode && cstate->header_line)
1269 ereport(ERROR,
1270 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1271 errmsg("COPY HEADER available only in CSV mode")));
1272
1273 /* Check quote */
1274 if (!cstate->csv_mode && cstate->quote != NULL)
1275 ereport(ERROR,
1276 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1277 errmsg("COPY quote available only in CSV mode")));
1278
1279 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1280 ereport(ERROR,
1281 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1282 errmsg("COPY quote must be a single one-byte character")));
1283
1284 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1285 ereport(ERROR,
1286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1287 errmsg("COPY delimiter and quote must be different")));
1288
1289 /* Check escape */
1290 if (!cstate->csv_mode && cstate->escape != NULL)
1291 ereport(ERROR,
1292 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1293 errmsg("COPY escape available only in CSV mode")));
1294
1295 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1296 ereport(ERROR,
1297 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1298 errmsg("COPY escape must be a single one-byte character")));
1299
1300 /* Check force_quote */
1301 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1302 ereport(ERROR,
1303 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1304 errmsg("COPY force quote available only in CSV mode")));
1305 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1306 ereport(ERROR,
1307 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1308 errmsg("COPY force quote only available using COPY TO")));
1309
1310 /* Check force_notnull */
1311 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1312 ereport(ERROR,
1313 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1314 errmsg("COPY force not null available only in CSV mode")));
1315 if (cstate->force_notnull != NIL && !is_from)
1316 ereport(ERROR,
1317 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1318 errmsg("COPY force not null only available using COPY FROM")));
1319
1320 /* Check force_null */
1321 if (!cstate->csv_mode && cstate->force_null != NIL)
1322 ereport(ERROR,
1323 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1324 errmsg("COPY force null available only in CSV mode")));
1325
1326 if (cstate->force_null != NIL && !is_from)
1327 ereport(ERROR,
1328 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1329 errmsg("COPY force null only available using COPY FROM")));
1330
1331 /* Don't allow the delimiter to appear in the null string. */
1332 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1333 ereport(ERROR,
1334 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1335 errmsg("COPY delimiter must not appear in the NULL specification")));
1336
1337 /* Don't allow the CSV quote char to appear in the null string. */
1338 if (cstate->csv_mode &&
1339 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1340 ereport(ERROR,
1341 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 errmsg("CSV quote character must not appear in the NULL specification")));
1343 }
1344
1345 /*
1346 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1347 *
1348 * Iff <binary>, unload or reload in the binary format, as opposed to the
1349 * more wasteful but more robust and portable text format.
1350 *
1351 * Iff <oids>, unload or reload the format that includes OID information.
1352 * On input, we accept OIDs whether or not the table has an OID column,
1353 * but silently drop them if it does not. On output, we report an error
1354 * if the user asks for OIDs in a table that has none (not providing an
1355 * OID column might seem friendlier, but could seriously confuse programs).
1356 *
1357 * If in the text format, delimit columns with delimiter <delim> and print
1358 * NULL values as <null_print>.
1359 */
1360 static CopyState
BeginCopy(bool is_from,Relation rel,Node * raw_query,const char * queryString,const Oid queryRelId,List * attnamelist,List * options)1361 BeginCopy(bool is_from,
1362 Relation rel,
1363 Node *raw_query,
1364 const char *queryString,
1365 const Oid queryRelId,
1366 List *attnamelist,
1367 List *options)
1368 {
1369 CopyState cstate;
1370 TupleDesc tupDesc;
1371 int num_phys_attrs;
1372 MemoryContext oldcontext;
1373
1374 /* Allocate workspace and zero all fields */
1375 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1376
1377 /*
1378 * We allocate everything used by a cstate in a new memory context. This
1379 * avoids memory leaks during repeated use of COPY in a query.
1380 */
1381 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1382 "COPY",
1383 ALLOCSET_DEFAULT_SIZES);
1384
1385 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1386
1387 /* Extract options from the statement node tree */
1388 ProcessCopyOptions(cstate, is_from, options);
1389
1390 /* Process the source/target relation or query */
1391 if (rel)
1392 {
1393 Assert(!raw_query);
1394
1395 cstate->rel = rel;
1396
1397 tupDesc = RelationGetDescr(cstate->rel);
1398
1399 /* Don't allow COPY w/ OIDs to or from a table without them */
1400 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1401 ereport(ERROR,
1402 (errcode(ERRCODE_UNDEFINED_COLUMN),
1403 errmsg("table \"%s\" does not have OIDs",
1404 RelationGetRelationName(cstate->rel))));
1405 }
1406 else
1407 {
1408 List *rewritten;
1409 Query *query;
1410 PlannedStmt *plan;
1411 DestReceiver *dest;
1412
1413 Assert(!is_from);
1414 cstate->rel = NULL;
1415
1416 /* Don't allow COPY w/ OIDs from a query */
1417 if (cstate->oids)
1418 ereport(ERROR,
1419 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1420 errmsg("COPY (query) WITH OIDS is not supported")));
1421
1422 /*
1423 * Run parse analysis and rewrite. Note this also acquires sufficient
1424 * locks on the source table(s).
1425 *
1426 * Because the parser and planner tend to scribble on their input, we
1427 * make a preliminary copy of the source querytree. This prevents
1428 * problems in the case that the COPY is in a portal or plpgsql
1429 * function and is executed repeatedly. (See also the same hack in
1430 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1431 */
1432 rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
1433 queryString, NULL, 0);
1434
1435 /* check that we got back something we can work with */
1436 if (rewritten == NIL)
1437 {
1438 ereport(ERROR,
1439 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1440 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1441 }
1442 else if (list_length(rewritten) > 1)
1443 {
1444 ListCell *lc;
1445
1446 /* examine queries to determine which error message to issue */
1447 foreach(lc, rewritten)
1448 {
1449 Query *q = (Query *) lfirst(lc);
1450
1451 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1452 ereport(ERROR,
1453 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1454 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1455 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1456 ereport(ERROR,
1457 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1458 errmsg("DO ALSO rules are not supported for the COPY")));
1459 }
1460
1461 ereport(ERROR,
1462 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1463 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1464 }
1465
1466 query = (Query *) linitial(rewritten);
1467
1468 /* The grammar allows SELECT INTO, but we don't support that */
1469 if (query->utilityStmt != NULL &&
1470 IsA(query->utilityStmt, CreateTableAsStmt))
1471 ereport(ERROR,
1472 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473 errmsg("COPY (SELECT INTO) is not supported")));
1474
1475 Assert(query->utilityStmt == NULL);
1476
1477 /*
1478 * Similarly the grammar doesn't enforce the presence of a RETURNING
1479 * clause, but this is required here.
1480 */
1481 if (query->commandType != CMD_SELECT &&
1482 query->returningList == NIL)
1483 {
1484 Assert(query->commandType == CMD_INSERT ||
1485 query->commandType == CMD_UPDATE ||
1486 query->commandType == CMD_DELETE);
1487
1488 ereport(ERROR,
1489 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1490 errmsg("COPY query must have a RETURNING clause")));
1491 }
1492
1493 /* plan the query */
1494 plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1495
1496 /*
1497 * With row level security and a user using "COPY relation TO", we
1498 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1499 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1500 * in any RLS clauses.
1501 *
1502 * When this happens, we are passed in the relid of the originally
1503 * found relation (which we have locked). As the planner will look up
1504 * the relation again, we double-check here to make sure it found the
1505 * same one that we have locked.
1506 */
1507 if (queryRelId != InvalidOid)
1508 {
1509 /*
1510 * Note that with RLS involved there may be multiple relations,
1511 * and while the one we need is almost certainly first, we don't
1512 * make any guarantees of that in the planner, so check the whole
1513 * list and make sure we find the original relation.
1514 */
1515 if (!list_member_oid(plan->relationOids, queryRelId))
1516 ereport(ERROR,
1517 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1518 errmsg("relation referenced by COPY statement has changed")));
1519 }
1520
1521 /*
1522 * Use a snapshot with an updated command ID to ensure this query sees
1523 * results of any previously executed queries.
1524 */
1525 PushCopiedSnapshot(GetActiveSnapshot());
1526 UpdateActiveSnapshotCommandId();
1527
1528 /* Create dest receiver for COPY OUT */
1529 dest = CreateDestReceiver(DestCopyOut);
1530 ((DR_copy *) dest)->cstate = cstate;
1531
1532 /* Create a QueryDesc requesting no output */
1533 cstate->queryDesc = CreateQueryDesc(plan, queryString,
1534 GetActiveSnapshot(),
1535 InvalidSnapshot,
1536 dest, NULL, 0);
1537
1538 /*
1539 * Call ExecutorStart to prepare the plan for execution.
1540 *
1541 * ExecutorStart computes a result tupdesc for us
1542 */
1543 ExecutorStart(cstate->queryDesc, 0);
1544
1545 tupDesc = cstate->queryDesc->tupDesc;
1546 }
1547
1548 /* Generate or convert list of attributes to process */
1549 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1550
1551 num_phys_attrs = tupDesc->natts;
1552
1553 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1554 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1555 if (cstate->force_quote_all)
1556 {
1557 int i;
1558
1559 for (i = 0; i < num_phys_attrs; i++)
1560 cstate->force_quote_flags[i] = true;
1561 }
1562 else if (cstate->force_quote)
1563 {
1564 List *attnums;
1565 ListCell *cur;
1566
1567 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1568
1569 foreach(cur, attnums)
1570 {
1571 int attnum = lfirst_int(cur);
1572
1573 if (!list_member_int(cstate->attnumlist, attnum))
1574 ereport(ERROR,
1575 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1576 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1577 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1578 cstate->force_quote_flags[attnum - 1] = true;
1579 }
1580 }
1581
1582 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1583 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1584 if (cstate->force_notnull)
1585 {
1586 List *attnums;
1587 ListCell *cur;
1588
1589 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1590
1591 foreach(cur, attnums)
1592 {
1593 int attnum = lfirst_int(cur);
1594
1595 if (!list_member_int(cstate->attnumlist, attnum))
1596 ereport(ERROR,
1597 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1598 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1599 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1600 cstate->force_notnull_flags[attnum - 1] = true;
1601 }
1602 }
1603
1604 /* Convert FORCE_NULL name list to per-column flags, check validity */
1605 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1606 if (cstate->force_null)
1607 {
1608 List *attnums;
1609 ListCell *cur;
1610
1611 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1612
1613 foreach(cur, attnums)
1614 {
1615 int attnum = lfirst_int(cur);
1616
1617 if (!list_member_int(cstate->attnumlist, attnum))
1618 ereport(ERROR,
1619 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1620 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1621 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1622 cstate->force_null_flags[attnum - 1] = true;
1623 }
1624 }
1625
1626 /* Convert convert_selectively name list to per-column flags */
1627 if (cstate->convert_selectively)
1628 {
1629 List *attnums;
1630 ListCell *cur;
1631
1632 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1633
1634 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1635
1636 foreach(cur, attnums)
1637 {
1638 int attnum = lfirst_int(cur);
1639
1640 if (!list_member_int(cstate->attnumlist, attnum))
1641 ereport(ERROR,
1642 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1643 errmsg_internal("selected column \"%s\" not referenced by COPY",
1644 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1645 cstate->convert_select_flags[attnum - 1] = true;
1646 }
1647 }
1648
1649 /* Use client encoding when ENCODING option is not specified. */
1650 if (cstate->file_encoding < 0)
1651 cstate->file_encoding = pg_get_client_encoding();
1652
1653 /*
1654 * Set up encoding conversion info. Even if the file and server encodings
1655 * are the same, we must apply pg_any_to_server() to validate data in
1656 * multibyte encodings.
1657 */
1658 cstate->need_transcoding =
1659 (cstate->file_encoding != GetDatabaseEncoding() ||
1660 pg_database_encoding_max_length() > 1);
1661 /* See Multibyte encoding comment above */
1662 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1663
1664 cstate->copy_dest = COPY_FILE; /* default */
1665
1666 MemoryContextSwitchTo(oldcontext);
1667
1668 return cstate;
1669 }
1670
1671 /*
1672 * Closes the pipe to an external program, checking the pclose() return code.
1673 */
1674 static void
ClosePipeToProgram(CopyState cstate)1675 ClosePipeToProgram(CopyState cstate)
1676 {
1677 int pclose_rc;
1678
1679 Assert(cstate->is_program);
1680
1681 pclose_rc = ClosePipeStream(cstate->copy_file);
1682 if (pclose_rc == -1)
1683 ereport(ERROR,
1684 (errcode_for_file_access(),
1685 errmsg("could not close pipe to external command: %m")));
1686 else if (pclose_rc != 0)
1687 ereport(ERROR,
1688 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1689 errmsg("program \"%s\" failed",
1690 cstate->filename),
1691 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1692 }
1693
1694 /*
1695 * Release resources allocated in a cstate for COPY TO/FROM.
1696 */
1697 static void
EndCopy(CopyState cstate)1698 EndCopy(CopyState cstate)
1699 {
1700 if (cstate->is_program)
1701 {
1702 ClosePipeToProgram(cstate);
1703 }
1704 else
1705 {
1706 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1707 ereport(ERROR,
1708 (errcode_for_file_access(),
1709 errmsg("could not close file \"%s\": %m",
1710 cstate->filename)));
1711 }
1712
1713 MemoryContextDelete(cstate->copycontext);
1714 pfree(cstate);
1715 }
1716
1717 /*
1718 * Setup CopyState to read tuples from a table or a query for COPY TO.
1719 */
1720 static CopyState
BeginCopyTo(Relation rel,Node * query,const char * queryString,const Oid queryRelId,const char * filename,bool is_program,List * attnamelist,List * options)1721 BeginCopyTo(Relation rel,
1722 Node *query,
1723 const char *queryString,
1724 const Oid queryRelId,
1725 const char *filename,
1726 bool is_program,
1727 List *attnamelist,
1728 List *options)
1729 {
1730 CopyState cstate;
1731 bool pipe = (filename == NULL);
1732 MemoryContext oldcontext;
1733
1734 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1735 {
1736 if (rel->rd_rel->relkind == RELKIND_VIEW)
1737 ereport(ERROR,
1738 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1739 errmsg("cannot copy from view \"%s\"",
1740 RelationGetRelationName(rel)),
1741 errhint("Try the COPY (SELECT ...) TO variant.")));
1742 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1743 ereport(ERROR,
1744 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1745 errmsg("cannot copy from materialized view \"%s\"",
1746 RelationGetRelationName(rel)),
1747 errhint("Try the COPY (SELECT ...) TO variant.")));
1748 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1749 ereport(ERROR,
1750 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1751 errmsg("cannot copy from foreign table \"%s\"",
1752 RelationGetRelationName(rel)),
1753 errhint("Try the COPY (SELECT ...) TO variant.")));
1754 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1755 ereport(ERROR,
1756 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1757 errmsg("cannot copy from sequence \"%s\"",
1758 RelationGetRelationName(rel))));
1759 else
1760 ereport(ERROR,
1761 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1762 errmsg("cannot copy from non-table relation \"%s\"",
1763 RelationGetRelationName(rel))));
1764 }
1765
1766 cstate = BeginCopy(false, rel, query, queryString, queryRelId, attnamelist,
1767 options);
1768 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1769
1770 if (pipe)
1771 {
1772 Assert(!is_program); /* the grammar does not allow this */
1773 if (whereToSendOutput != DestRemote)
1774 cstate->copy_file = stdout;
1775 }
1776 else
1777 {
1778 cstate->filename = pstrdup(filename);
1779 cstate->is_program = is_program;
1780
1781 if (is_program)
1782 {
1783 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1784 if (cstate->copy_file == NULL)
1785 ereport(ERROR,
1786 (errcode_for_file_access(),
1787 errmsg("could not execute command \"%s\": %m",
1788 cstate->filename)));
1789 }
1790 else
1791 {
1792 mode_t oumask; /* Pre-existing umask value */
1793 struct stat st;
1794
1795 /*
1796 * Prevent write to relative path ... too easy to shoot oneself in
1797 * the foot by overwriting a database file ...
1798 */
1799 if (!is_absolute_path(filename))
1800 ereport(ERROR,
1801 (errcode(ERRCODE_INVALID_NAME),
1802 errmsg("relative path not allowed for COPY to file")));
1803
1804 oumask = umask(S_IWGRP | S_IWOTH);
1805 PG_TRY();
1806 {
1807 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1808 }
1809 PG_CATCH();
1810 {
1811 umask(oumask);
1812 PG_RE_THROW();
1813 }
1814 PG_END_TRY();
1815 umask(oumask);
1816 if (cstate->copy_file == NULL)
1817 ereport(ERROR,
1818 (errcode_for_file_access(),
1819 errmsg("could not open file \"%s\" for writing: %m",
1820 cstate->filename)));
1821
1822 if (fstat(fileno(cstate->copy_file), &st))
1823 ereport(ERROR,
1824 (errcode_for_file_access(),
1825 errmsg("could not stat file \"%s\": %m",
1826 cstate->filename)));
1827
1828 if (S_ISDIR(st.st_mode))
1829 ereport(ERROR,
1830 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1831 errmsg("\"%s\" is a directory", cstate->filename)));
1832 }
1833 }
1834
1835 MemoryContextSwitchTo(oldcontext);
1836
1837 return cstate;
1838 }
1839
1840 /*
1841 * This intermediate routine exists mainly to localize the effects of setjmp
1842 * so we don't need to plaster a lot of variables with "volatile".
1843 */
1844 static uint64
DoCopyTo(CopyState cstate)1845 DoCopyTo(CopyState cstate)
1846 {
1847 bool pipe = (cstate->filename == NULL);
1848 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1849 uint64 processed;
1850
1851 PG_TRY();
1852 {
1853 if (fe_copy)
1854 SendCopyBegin(cstate);
1855
1856 processed = CopyTo(cstate);
1857
1858 if (fe_copy)
1859 SendCopyEnd(cstate);
1860 }
1861 PG_CATCH();
1862 {
1863 /*
1864 * Make sure we turn off old-style COPY OUT mode upon error. It is
1865 * okay to do this in all cases, since it does nothing if the mode is
1866 * not on.
1867 */
1868 pq_endcopyout(true);
1869 PG_RE_THROW();
1870 }
1871 PG_END_TRY();
1872
1873 return processed;
1874 }
1875
1876 /*
1877 * Clean up storage and release resources for COPY TO.
1878 */
1879 static void
EndCopyTo(CopyState cstate)1880 EndCopyTo(CopyState cstate)
1881 {
1882 if (cstate->queryDesc != NULL)
1883 {
1884 /* Close down the query and free resources. */
1885 ExecutorFinish(cstate->queryDesc);
1886 ExecutorEnd(cstate->queryDesc);
1887 FreeQueryDesc(cstate->queryDesc);
1888 PopActiveSnapshot();
1889 }
1890
1891 /* Clean up storage */
1892 EndCopy(cstate);
1893 }
1894
1895 /*
1896 * Copy from relation or query TO file.
1897 */
1898 static uint64
CopyTo(CopyState cstate)1899 CopyTo(CopyState cstate)
1900 {
1901 TupleDesc tupDesc;
1902 int num_phys_attrs;
1903 Form_pg_attribute *attr;
1904 ListCell *cur;
1905 uint64 processed;
1906
1907 if (cstate->rel)
1908 tupDesc = RelationGetDescr(cstate->rel);
1909 else
1910 tupDesc = cstate->queryDesc->tupDesc;
1911 attr = tupDesc->attrs;
1912 num_phys_attrs = tupDesc->natts;
1913 cstate->null_print_client = cstate->null_print; /* default */
1914
1915 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1916 cstate->fe_msgbuf = makeStringInfo();
1917
1918 /* Get info about the columns we need to process. */
1919 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1920 foreach(cur, cstate->attnumlist)
1921 {
1922 int attnum = lfirst_int(cur);
1923 Oid out_func_oid;
1924 bool isvarlena;
1925
1926 if (cstate->binary)
1927 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1928 &out_func_oid,
1929 &isvarlena);
1930 else
1931 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1932 &out_func_oid,
1933 &isvarlena);
1934 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1935 }
1936
1937 /*
1938 * Create a temporary memory context that we can reset once per row to
1939 * recover palloc'd memory. This avoids any problems with leaks inside
1940 * datatype output routines, and should be faster than retail pfree's
1941 * anyway. (We don't need a whole econtext as CopyFrom does.)
1942 */
1943 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1944 "COPY TO",
1945 ALLOCSET_DEFAULT_SIZES);
1946
1947 if (cstate->binary)
1948 {
1949 /* Generate header for a binary copy */
1950 int32 tmp;
1951
1952 /* Signature */
1953 CopySendData(cstate, BinarySignature, 11);
1954 /* Flags field */
1955 tmp = 0;
1956 if (cstate->oids)
1957 tmp |= (1 << 16);
1958 CopySendInt32(cstate, tmp);
1959 /* No header extension */
1960 tmp = 0;
1961 CopySendInt32(cstate, tmp);
1962 }
1963 else
1964 {
1965 /*
1966 * For non-binary copy, we need to convert null_print to file
1967 * encoding, because it will be sent directly with CopySendString.
1968 */
1969 if (cstate->need_transcoding)
1970 cstate->null_print_client = pg_server_to_any(cstate->null_print,
1971 cstate->null_print_len,
1972 cstate->file_encoding);
1973
1974 /* if a header has been requested send the line */
1975 if (cstate->header_line)
1976 {
1977 bool hdr_delim = false;
1978
1979 foreach(cur, cstate->attnumlist)
1980 {
1981 int attnum = lfirst_int(cur);
1982 char *colname;
1983
1984 if (hdr_delim)
1985 CopySendChar(cstate, cstate->delim[0]);
1986 hdr_delim = true;
1987
1988 colname = NameStr(attr[attnum - 1]->attname);
1989
1990 CopyAttributeOutCSV(cstate, colname, false,
1991 list_length(cstate->attnumlist) == 1);
1992 }
1993
1994 CopySendEndOfRow(cstate);
1995 }
1996 }
1997
1998 if (cstate->rel)
1999 {
2000 Datum *values;
2001 bool *nulls;
2002 HeapScanDesc scandesc;
2003 HeapTuple tuple;
2004
2005 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2006 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2007
2008 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2009
2010 processed = 0;
2011 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2012 {
2013 CHECK_FOR_INTERRUPTS();
2014
2015 /* Deconstruct the tuple ... faster than repeated heap_getattr */
2016 heap_deform_tuple(tuple, tupDesc, values, nulls);
2017
2018 /* Format and send the data */
2019 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2020 processed++;
2021 }
2022
2023 heap_endscan(scandesc);
2024
2025 pfree(values);
2026 pfree(nulls);
2027 }
2028 else
2029 {
2030 /* run the plan --- the dest receiver will send tuples */
2031 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
2032 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2033 }
2034
2035 if (cstate->binary)
2036 {
2037 /* Generate trailer for a binary copy */
2038 CopySendInt16(cstate, -1);
2039 /* Need to flush out the trailer */
2040 CopySendEndOfRow(cstate);
2041 }
2042
2043 MemoryContextDelete(cstate->rowcontext);
2044
2045 return processed;
2046 }
2047
2048 /*
2049 * Emit one row during CopyTo().
2050 */
2051 static void
CopyOneRowTo(CopyState cstate,Oid tupleOid,Datum * values,bool * nulls)2052 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2053 {
2054 bool need_delim = false;
2055 FmgrInfo *out_functions = cstate->out_functions;
2056 MemoryContext oldcontext;
2057 ListCell *cur;
2058 char *string;
2059
2060 MemoryContextReset(cstate->rowcontext);
2061 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2062
2063 if (cstate->binary)
2064 {
2065 /* Binary per-tuple header */
2066 CopySendInt16(cstate, list_length(cstate->attnumlist));
2067 /* Send OID if wanted --- note attnumlist doesn't include it */
2068 if (cstate->oids)
2069 {
2070 /* Hack --- assume Oid is same size as int32 */
2071 CopySendInt32(cstate, sizeof(int32));
2072 CopySendInt32(cstate, tupleOid);
2073 }
2074 }
2075 else
2076 {
2077 /* Text format has no per-tuple header, but send OID if wanted */
2078 /* Assume digits don't need any quoting or encoding conversion */
2079 if (cstate->oids)
2080 {
2081 string = DatumGetCString(DirectFunctionCall1(oidout,
2082 ObjectIdGetDatum(tupleOid)));
2083 CopySendString(cstate, string);
2084 need_delim = true;
2085 }
2086 }
2087
2088 foreach(cur, cstate->attnumlist)
2089 {
2090 int attnum = lfirst_int(cur);
2091 Datum value = values[attnum - 1];
2092 bool isnull = nulls[attnum - 1];
2093
2094 if (!cstate->binary)
2095 {
2096 if (need_delim)
2097 CopySendChar(cstate, cstate->delim[0]);
2098 need_delim = true;
2099 }
2100
2101 if (isnull)
2102 {
2103 if (!cstate->binary)
2104 CopySendString(cstate, cstate->null_print_client);
2105 else
2106 CopySendInt32(cstate, -1);
2107 }
2108 else
2109 {
2110 if (!cstate->binary)
2111 {
2112 string = OutputFunctionCall(&out_functions[attnum - 1],
2113 value);
2114 if (cstate->csv_mode)
2115 CopyAttributeOutCSV(cstate, string,
2116 cstate->force_quote_flags[attnum - 1],
2117 list_length(cstate->attnumlist) == 1);
2118 else
2119 CopyAttributeOutText(cstate, string);
2120 }
2121 else
2122 {
2123 bytea *outputbytes;
2124
2125 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2126 value);
2127 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2128 CopySendData(cstate, VARDATA(outputbytes),
2129 VARSIZE(outputbytes) - VARHDRSZ);
2130 }
2131 }
2132 }
2133
2134 CopySendEndOfRow(cstate);
2135
2136 MemoryContextSwitchTo(oldcontext);
2137 }
2138
2139
2140 /*
2141 * error context callback for COPY FROM
2142 *
2143 * The argument for the error context must be CopyState.
2144 */
2145 void
CopyFromErrorCallback(void * arg)2146 CopyFromErrorCallback(void *arg)
2147 {
2148 CopyState cstate = (CopyState) arg;
2149 char curlineno_str[32];
2150
2151 snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2152 cstate->cur_lineno);
2153
2154 if (cstate->binary)
2155 {
2156 /* can't usefully display the data */
2157 if (cstate->cur_attname)
2158 errcontext("COPY %s, line %s, column %s",
2159 cstate->cur_relname, curlineno_str,
2160 cstate->cur_attname);
2161 else
2162 errcontext("COPY %s, line %s",
2163 cstate->cur_relname, curlineno_str);
2164 }
2165 else
2166 {
2167 if (cstate->cur_attname && cstate->cur_attval)
2168 {
2169 /* error is relevant to a particular column */
2170 char *attval;
2171
2172 attval = limit_printout_length(cstate->cur_attval);
2173 errcontext("COPY %s, line %s, column %s: \"%s\"",
2174 cstate->cur_relname, curlineno_str,
2175 cstate->cur_attname, attval);
2176 pfree(attval);
2177 }
2178 else if (cstate->cur_attname)
2179 {
2180 /* error is relevant to a particular column, value is NULL */
2181 errcontext("COPY %s, line %s, column %s: null input",
2182 cstate->cur_relname, curlineno_str,
2183 cstate->cur_attname);
2184 }
2185 else
2186 {
2187 /*
2188 * Error is relevant to a particular line.
2189 *
2190 * If line_buf still contains the correct line, and it's already
2191 * transcoded, print it. If it's still in a foreign encoding, it's
2192 * quite likely that the error is precisely a failure to do
2193 * encoding conversion (ie, bad data). We dare not try to convert
2194 * it, and at present there's no way to regurgitate it without
2195 * conversion. So we have to punt and just report the line number.
2196 */
2197 if (cstate->line_buf_valid &&
2198 (cstate->line_buf_converted || !cstate->need_transcoding))
2199 {
2200 char *lineval;
2201
2202 lineval = limit_printout_length(cstate->line_buf.data);
2203 errcontext("COPY %s, line %s: \"%s\"",
2204 cstate->cur_relname, curlineno_str, lineval);
2205 pfree(lineval);
2206 }
2207 else
2208 {
2209 errcontext("COPY %s, line %s",
2210 cstate->cur_relname, curlineno_str);
2211 }
2212 }
2213 }
2214 }
2215
2216 /*
2217 * Make sure we don't print an unreasonable amount of COPY data in a message.
2218 *
2219 * It would seem a lot easier to just use the sprintf "precision" limit to
2220 * truncate the string. However, some versions of glibc have a bug/misfeature
2221 * that vsnprintf will always fail (return -1) if it is asked to truncate
2222 * a string that contains invalid byte sequences for the current encoding.
2223 * So, do our own truncation. We return a pstrdup'd copy of the input.
2224 */
2225 static char *
limit_printout_length(const char * str)2226 limit_printout_length(const char *str)
2227 {
2228 #define MAX_COPY_DATA_DISPLAY 100
2229
2230 int slen = strlen(str);
2231 int len;
2232 char *res;
2233
2234 /* Fast path if definitely okay */
2235 if (slen <= MAX_COPY_DATA_DISPLAY)
2236 return pstrdup(str);
2237
2238 /* Apply encoding-dependent truncation */
2239 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2240
2241 /*
2242 * Truncate, and add "..." to show we truncated the input.
2243 */
2244 res = (char *) palloc(len + 4);
2245 memcpy(res, str, len);
2246 strcpy(res + len, "...");
2247
2248 return res;
2249 }
2250
2251 /*
2252 * Copy FROM file to relation.
2253 */
2254 static uint64
CopyFrom(CopyState cstate)2255 CopyFrom(CopyState cstate)
2256 {
2257 HeapTuple tuple;
2258 TupleDesc tupDesc;
2259 Datum *values;
2260 bool *nulls;
2261 ResultRelInfo *resultRelInfo;
2262 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2263 ExprContext *econtext;
2264 TupleTableSlot *myslot;
2265 MemoryContext oldcontext = CurrentMemoryContext;
2266
2267 ErrorContextCallback errcallback;
2268 CommandId mycid = GetCurrentCommandId(true);
2269 int hi_options = 0; /* start with default heap_insert options */
2270 BulkInsertState bistate;
2271 uint64 processed = 0;
2272 bool useHeapMultiInsert;
2273 int nBufferedTuples = 0;
2274
2275 #define MAX_BUFFERED_TUPLES 1000
2276 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2277 Size bufferedTuplesSize = 0;
2278 uint64 firstBufferedLineNo = 0;
2279
2280 Assert(cstate->rel);
2281
2282 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
2283 {
2284 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2285 ereport(ERROR,
2286 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2287 errmsg("cannot copy to view \"%s\"",
2288 RelationGetRelationName(cstate->rel))));
2289 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2290 ereport(ERROR,
2291 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2292 errmsg("cannot copy to materialized view \"%s\"",
2293 RelationGetRelationName(cstate->rel))));
2294 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2295 ereport(ERROR,
2296 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2297 errmsg("cannot copy to foreign table \"%s\"",
2298 RelationGetRelationName(cstate->rel))));
2299 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2300 ereport(ERROR,
2301 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2302 errmsg("cannot copy to sequence \"%s\"",
2303 RelationGetRelationName(cstate->rel))));
2304 else
2305 ereport(ERROR,
2306 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2307 errmsg("cannot copy to non-table relation \"%s\"",
2308 RelationGetRelationName(cstate->rel))));
2309 }
2310
2311 tupDesc = RelationGetDescr(cstate->rel);
2312
2313 /*----------
2314 * Check to see if we can avoid writing WAL
2315 *
2316 * If archive logging/streaming is not enabled *and* either
2317 * - table was created in same transaction as this COPY
2318 * - data is being written to relfilenode created in this transaction
2319 * then we can skip writing WAL. It's safe because if the transaction
2320 * doesn't commit, we'll discard the table (or the new relfilenode file).
2321 * If it does commit, we'll have done the heap_sync at the bottom of this
2322 * routine first.
2323 *
2324 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2325 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2326 * can be cleared before the end of the transaction. The exact case is
2327 * when a relation sets a new relfilenode twice in same transaction, yet
2328 * the second one fails in an aborted subtransaction, e.g.
2329 *
2330 * BEGIN;
2331 * TRUNCATE t;
2332 * SAVEPOINT save;
2333 * TRUNCATE t;
2334 * ROLLBACK TO save;
2335 * COPY ...
2336 *
2337 * Also, if the target file is new-in-transaction, we assume that checking
2338 * FSM for free space is a waste of time, even if we must use WAL because
2339 * of archiving. This could possibly be wrong, but it's unlikely.
2340 *
2341 * The comments for heap_insert and RelationGetBufferForTuple specify that
2342 * skipping WAL logging is only safe if we ensure that our tuples do not
2343 * go into pages containing tuples from any other transactions --- but this
2344 * must be the case if we have a new table or new relfilenode, so we need
2345 * no additional work to enforce that.
2346 *----------
2347 */
2348 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2349 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2350 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2351 {
2352 hi_options |= HEAP_INSERT_SKIP_FSM;
2353 if (!XLogIsNeeded())
2354 hi_options |= HEAP_INSERT_SKIP_WAL;
2355 }
2356
2357 /*
2358 * Optimize if new relfilenode was created in this subxact or one of its
2359 * committed children and we won't see those rows later as part of an
2360 * earlier scan or command. The subxact test ensures that if this subxact
2361 * aborts then the frozen rows won't be visible after xact cleanup. Note
2362 * that the stronger test of exactly which subtransaction created it is
2363 * crucial for correctness of this optimisation. The test for an earlier
2364 * scan or command tolerates false negatives. FREEZE causes other sessions
2365 * to see rows they would not see under MVCC, and a false negative merely
2366 * spreads that anomaly to the current session.
2367 */
2368 if (cstate->freeze)
2369 {
2370 /*
2371 * Tolerate one registration for the benefit of FirstXactSnapshot.
2372 * Scan-bearing queries generally create at least two registrations,
2373 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2374 * Clear CatalogSnapshot to avoid counting its registration. We'll
2375 * still detect ongoing catalog scans, each of which separately
2376 * registers the snapshot it uses.
2377 */
2378 InvalidateCatalogSnapshot();
2379 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2380 ereport(ERROR,
2381 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2382 errmsg("cannot perform FREEZE because of prior transaction activity")));
2383
2384 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2385 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2386 ereport(ERROR,
2387 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2388 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2389
2390 hi_options |= HEAP_INSERT_FROZEN;
2391 }
2392
2393 /*
2394 * We need a ResultRelInfo so we can use the regular executor's
2395 * index-entry-making machinery. (There used to be a huge amount of code
2396 * here that basically duplicated execUtils.c ...)
2397 */
2398 resultRelInfo = makeNode(ResultRelInfo);
2399 InitResultRelInfo(resultRelInfo,
2400 cstate->rel,
2401 1, /* dummy rangetable index */
2402 0);
2403
2404 ExecOpenIndices(resultRelInfo, false);
2405
2406 estate->es_result_relations = resultRelInfo;
2407 estate->es_num_result_relations = 1;
2408 estate->es_result_relation_info = resultRelInfo;
2409 estate->es_range_table = cstate->range_table;
2410
2411 /* Set up a tuple slot too */
2412 myslot = ExecInitExtraTupleSlot(estate);
2413 ExecSetSlotDescriptor(myslot, tupDesc);
2414 /* Triggers might need a slot as well */
2415 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2416
2417 /*
2418 * It's more efficient to prepare a bunch of tuples for insertion, and
2419 * insert them in one heap_multi_insert() call, than call heap_insert()
2420 * separately for every tuple. However, we can't do that if there are
2421 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2422 * expressions. Such triggers or expressions might query the table we're
2423 * inserting to, and act differently if the tuples that have already been
2424 * processed and prepared for insertion are not there.
2425 */
2426 if ((resultRelInfo->ri_TrigDesc != NULL &&
2427 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2428 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2429 cstate->volatile_defexprs)
2430 {
2431 useHeapMultiInsert = false;
2432 }
2433 else
2434 {
2435 useHeapMultiInsert = true;
2436 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2437 }
2438
2439 /* Prepare to catch AFTER triggers. */
2440 AfterTriggerBeginQuery();
2441
2442 /*
2443 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2444 * should do this for COPY, since it's not really an "INSERT" statement as
2445 * such. However, executing these triggers maintains consistency with the
2446 * EACH ROW triggers that we already fire on COPY.
2447 */
2448 ExecBSInsertTriggers(estate, resultRelInfo);
2449
2450 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2451 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2452
2453 bistate = GetBulkInsertState();
2454 econtext = GetPerTupleExprContext(estate);
2455
2456 /* Set up callback to identify error line number */
2457 errcallback.callback = CopyFromErrorCallback;
2458 errcallback.arg = (void *) cstate;
2459 errcallback.previous = error_context_stack;
2460 error_context_stack = &errcallback;
2461
2462 for (;;)
2463 {
2464 TupleTableSlot *slot;
2465 bool skip_tuple;
2466 Oid loaded_oid = InvalidOid;
2467
2468 CHECK_FOR_INTERRUPTS();
2469
2470 if (nBufferedTuples == 0)
2471 {
2472 /*
2473 * Reset the per-tuple exprcontext. We can only do this if the
2474 * tuple buffer is empty. (Calling the context the per-tuple
2475 * memory context is a bit of a misnomer now.)
2476 */
2477 ResetPerTupleExprContext(estate);
2478 }
2479
2480 /* Switch into its memory context */
2481 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2482
2483 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2484 break;
2485
2486 /* And now we can form the input tuple. */
2487 tuple = heap_form_tuple(tupDesc, values, nulls);
2488
2489 if (loaded_oid != InvalidOid)
2490 HeapTupleSetOid(tuple, loaded_oid);
2491
2492 /*
2493 * Constraints might reference the tableoid column, so initialize
2494 * t_tableOid before evaluating them.
2495 */
2496 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2497
2498 /* Triggers and stuff need to be invoked in query context. */
2499 MemoryContextSwitchTo(oldcontext);
2500
2501 /* Place tuple in tuple slot --- but slot shouldn't free it */
2502 slot = myslot;
2503 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2504
2505 skip_tuple = false;
2506
2507 /* BEFORE ROW INSERT Triggers */
2508 if (resultRelInfo->ri_TrigDesc &&
2509 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2510 {
2511 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2512
2513 if (slot == NULL) /* "do nothing" */
2514 skip_tuple = true;
2515 else /* trigger might have changed tuple */
2516 tuple = ExecMaterializeSlot(slot);
2517 }
2518
2519 if (!skip_tuple)
2520 {
2521 /* Check the constraints of the tuple */
2522 if (cstate->rel->rd_att->constr)
2523 ExecConstraints(resultRelInfo, slot, estate);
2524
2525 if (useHeapMultiInsert)
2526 {
2527 /* Add this tuple to the tuple buffer */
2528 if (nBufferedTuples == 0)
2529 firstBufferedLineNo = cstate->cur_lineno;
2530 bufferedTuples[nBufferedTuples++] = tuple;
2531 bufferedTuplesSize += tuple->t_len;
2532
2533 /*
2534 * If the buffer filled up, flush it. Also flush if the total
2535 * size of all the tuples in the buffer becomes large, to
2536 * avoid using large amounts of memory for the buffers when
2537 * the tuples are exceptionally wide.
2538 */
2539 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2540 bufferedTuplesSize > 65535)
2541 {
2542 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2543 resultRelInfo, myslot, bistate,
2544 nBufferedTuples, bufferedTuples,
2545 firstBufferedLineNo);
2546 nBufferedTuples = 0;
2547 bufferedTuplesSize = 0;
2548 }
2549 }
2550 else
2551 {
2552 List *recheckIndexes = NIL;
2553
2554 /* OK, store the tuple and create index entries for it */
2555 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2556
2557 if (resultRelInfo->ri_NumIndices > 0)
2558 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
2559 estate, false, NULL,
2560 NIL);
2561
2562 /* AFTER ROW INSERT Triggers */
2563 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2564 recheckIndexes);
2565
2566 list_free(recheckIndexes);
2567 }
2568
2569 /*
2570 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2571 * this is the same definition used by execMain.c for counting
2572 * tuples inserted by an INSERT command.
2573 */
2574 processed++;
2575 }
2576 }
2577
2578 /* Flush any remaining buffered tuples */
2579 if (nBufferedTuples > 0)
2580 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2581 resultRelInfo, myslot, bistate,
2582 nBufferedTuples, bufferedTuples,
2583 firstBufferedLineNo);
2584
2585 /* Done, clean up */
2586 error_context_stack = errcallback.previous;
2587
2588 FreeBulkInsertState(bistate);
2589
2590 MemoryContextSwitchTo(oldcontext);
2591
2592 /*
2593 * In the old protocol, tell pqcomm that we can process normal protocol
2594 * messages again.
2595 */
2596 if (cstate->copy_dest == COPY_OLD_FE)
2597 pq_endmsgread();
2598
2599 /* Execute AFTER STATEMENT insertion triggers */
2600 ExecASInsertTriggers(estate, resultRelInfo);
2601
2602 /* Handle queued AFTER triggers */
2603 AfterTriggerEndQuery(estate);
2604
2605 pfree(values);
2606 pfree(nulls);
2607
2608 ExecResetTupleTable(estate->es_tupleTable, false);
2609
2610 ExecCloseIndices(resultRelInfo);
2611
2612 FreeExecutorState(estate);
2613
2614 /*
2615 * If we skipped writing WAL, then we need to sync the heap (but not
2616 * indexes since those use WAL anyway)
2617 */
2618 if (hi_options & HEAP_INSERT_SKIP_WAL)
2619 heap_sync(cstate->rel);
2620
2621 return processed;
2622 }
2623
2624 /*
2625 * A subroutine of CopyFrom, to write the current batch of buffered heap
2626 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2627 * triggers.
2628 */
2629 static void
CopyFromInsertBatch(CopyState cstate,EState * estate,CommandId mycid,int hi_options,ResultRelInfo * resultRelInfo,TupleTableSlot * myslot,BulkInsertState bistate,int nBufferedTuples,HeapTuple * bufferedTuples,uint64 firstBufferedLineNo)2630 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2631 int hi_options, ResultRelInfo *resultRelInfo,
2632 TupleTableSlot *myslot, BulkInsertState bistate,
2633 int nBufferedTuples, HeapTuple *bufferedTuples,
2634 uint64 firstBufferedLineNo)
2635 {
2636 MemoryContext oldcontext;
2637 int i;
2638 uint64 save_cur_lineno;
2639
2640 /*
2641 * Print error context information correctly, if one of the operations
2642 * below fail.
2643 */
2644 cstate->line_buf_valid = false;
2645 save_cur_lineno = cstate->cur_lineno;
2646
2647 /*
2648 * heap_multi_insert leaks memory, so switch to short-lived memory context
2649 * before calling it.
2650 */
2651 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2652 heap_multi_insert(cstate->rel,
2653 bufferedTuples,
2654 nBufferedTuples,
2655 mycid,
2656 hi_options,
2657 bistate);
2658 MemoryContextSwitchTo(oldcontext);
2659
2660 /*
2661 * If there are any indexes, update them for all the inserted tuples, and
2662 * run AFTER ROW INSERT triggers.
2663 */
2664 if (resultRelInfo->ri_NumIndices > 0)
2665 {
2666 for (i = 0; i < nBufferedTuples; i++)
2667 {
2668 List *recheckIndexes;
2669
2670 cstate->cur_lineno = firstBufferedLineNo + i;
2671 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2672 recheckIndexes =
2673 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2674 estate, false, NULL, NIL);
2675 ExecARInsertTriggers(estate, resultRelInfo,
2676 bufferedTuples[i],
2677 recheckIndexes);
2678 list_free(recheckIndexes);
2679 }
2680 }
2681
2682 /*
2683 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2684 * anyway.
2685 */
2686 else if (resultRelInfo->ri_TrigDesc != NULL &&
2687 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2688 {
2689 for (i = 0; i < nBufferedTuples; i++)
2690 {
2691 cstate->cur_lineno = firstBufferedLineNo + i;
2692 ExecARInsertTriggers(estate, resultRelInfo,
2693 bufferedTuples[i],
2694 NIL);
2695 }
2696 }
2697
2698 /* reset cur_lineno to where we were */
2699 cstate->cur_lineno = save_cur_lineno;
2700 }
2701
2702 /*
2703 * Setup to read tuples from a file for COPY FROM.
2704 *
2705 * 'rel': Used as a template for the tuples
2706 * 'filename': Name of server-local file to read
2707 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2708 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2709 *
2710 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2711 */
2712 CopyState
BeginCopyFrom(Relation rel,const char * filename,bool is_program,List * attnamelist,List * options)2713 BeginCopyFrom(Relation rel,
2714 const char *filename,
2715 bool is_program,
2716 List *attnamelist,
2717 List *options)
2718 {
2719 CopyState cstate;
2720 bool pipe = (filename == NULL);
2721 TupleDesc tupDesc;
2722 Form_pg_attribute *attr;
2723 AttrNumber num_phys_attrs,
2724 num_defaults;
2725 FmgrInfo *in_functions;
2726 Oid *typioparams;
2727 int attnum;
2728 Oid in_func_oid;
2729 int *defmap;
2730 ExprState **defexprs;
2731 MemoryContext oldcontext;
2732 bool volatile_defexprs;
2733
2734 cstate = BeginCopy(true, rel, NULL, NULL, InvalidOid, attnamelist, options);
2735 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2736
2737 /* Initialize state variables */
2738 cstate->fe_eof = false;
2739 cstate->eol_type = EOL_UNKNOWN;
2740 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2741 cstate->cur_lineno = 0;
2742 cstate->cur_attname = NULL;
2743 cstate->cur_attval = NULL;
2744
2745 /* Set up variables to avoid per-attribute overhead. */
2746 initStringInfo(&cstate->attribute_buf);
2747 initStringInfo(&cstate->line_buf);
2748 cstate->line_buf_converted = false;
2749 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2750 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2751
2752 tupDesc = RelationGetDescr(cstate->rel);
2753 attr = tupDesc->attrs;
2754 num_phys_attrs = tupDesc->natts;
2755 num_defaults = 0;
2756 volatile_defexprs = false;
2757
2758 /*
2759 * Pick up the required catalog information for each attribute in the
2760 * relation, including the input function, the element type (to pass to
2761 * the input function), and info about defaults and constraints. (Which
2762 * input function we use depends on text/binary format choice.)
2763 */
2764 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2765 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2766 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2767 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2768
2769 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2770 {
2771 /* We don't need info for dropped attributes */
2772 if (attr[attnum - 1]->attisdropped)
2773 continue;
2774
2775 /* Fetch the input function and typioparam info */
2776 if (cstate->binary)
2777 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2778 &in_func_oid, &typioparams[attnum - 1]);
2779 else
2780 getTypeInputInfo(attr[attnum - 1]->atttypid,
2781 &in_func_oid, &typioparams[attnum - 1]);
2782 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2783
2784 /* Get default info if needed */
2785 if (!list_member_int(cstate->attnumlist, attnum))
2786 {
2787 /* attribute is NOT to be copied from input */
2788 /* use default value if one exists */
2789 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
2790 attnum);
2791
2792 if (defexpr != NULL)
2793 {
2794 /* Run the expression through planner */
2795 defexpr = expression_planner(defexpr);
2796
2797 /* Initialize executable expression in copycontext */
2798 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
2799 defmap[num_defaults] = attnum - 1;
2800 num_defaults++;
2801
2802 /*
2803 * If a default expression looks at the table being loaded,
2804 * then it could give the wrong answer when using
2805 * multi-insert. Since database access can be dynamic this is
2806 * hard to test for exactly, so we use the much wider test of
2807 * whether the default expression is volatile. We allow for
2808 * the special case of when the default expression is the
2809 * nextval() of a sequence which in this specific case is
2810 * known to be safe for use with the multi-insert
2811 * optimisation. Hence we use this special case function
2812 * checker rather than the standard check for
2813 * contain_volatile_functions().
2814 */
2815 if (!volatile_defexprs)
2816 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
2817 }
2818 }
2819 }
2820
2821 /* We keep those variables in cstate. */
2822 cstate->in_functions = in_functions;
2823 cstate->typioparams = typioparams;
2824 cstate->defmap = defmap;
2825 cstate->defexprs = defexprs;
2826 cstate->volatile_defexprs = volatile_defexprs;
2827 cstate->num_defaults = num_defaults;
2828 cstate->is_program = is_program;
2829
2830 if (pipe)
2831 {
2832 Assert(!is_program); /* the grammar does not allow this */
2833 if (whereToSendOutput == DestRemote)
2834 ReceiveCopyBegin(cstate);
2835 else
2836 cstate->copy_file = stdin;
2837 }
2838 else
2839 {
2840 cstate->filename = pstrdup(filename);
2841
2842 if (cstate->is_program)
2843 {
2844 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
2845 if (cstate->copy_file == NULL)
2846 ereport(ERROR,
2847 (errcode_for_file_access(),
2848 errmsg("could not execute command \"%s\": %m",
2849 cstate->filename)));
2850 }
2851 else
2852 {
2853 struct stat st;
2854
2855 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
2856 if (cstate->copy_file == NULL)
2857 ereport(ERROR,
2858 (errcode_for_file_access(),
2859 errmsg("could not open file \"%s\" for reading: %m",
2860 cstate->filename)));
2861
2862 if (fstat(fileno(cstate->copy_file), &st))
2863 ereport(ERROR,
2864 (errcode_for_file_access(),
2865 errmsg("could not stat file \"%s\": %m",
2866 cstate->filename)));
2867
2868 if (S_ISDIR(st.st_mode))
2869 ereport(ERROR,
2870 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2871 errmsg("\"%s\" is a directory", cstate->filename)));
2872 }
2873 }
2874
2875 if (!cstate->binary)
2876 {
2877 /* must rely on user to tell us... */
2878 cstate->file_has_oids = cstate->oids;
2879 }
2880 else
2881 {
2882 /* Read and verify binary header */
2883 char readSig[11];
2884 int32 tmp;
2885
2886 /* Signature */
2887 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
2888 memcmp(readSig, BinarySignature, 11) != 0)
2889 ereport(ERROR,
2890 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2891 errmsg("COPY file signature not recognized")));
2892 /* Flags field */
2893 if (!CopyGetInt32(cstate, &tmp))
2894 ereport(ERROR,
2895 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2896 errmsg("invalid COPY file header (missing flags)")));
2897 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
2898 tmp &= ~(1 << 16);
2899 if ((tmp >> 16) != 0)
2900 ereport(ERROR,
2901 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2902 errmsg("unrecognized critical flags in COPY file header")));
2903 /* Header extension length */
2904 if (!CopyGetInt32(cstate, &tmp) ||
2905 tmp < 0)
2906 ereport(ERROR,
2907 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2908 errmsg("invalid COPY file header (missing length)")));
2909 /* Skip extension header, if present */
2910 while (tmp-- > 0)
2911 {
2912 if (CopyGetData(cstate, readSig, 1, 1) != 1)
2913 ereport(ERROR,
2914 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2915 errmsg("invalid COPY file header (wrong length)")));
2916 }
2917 }
2918
2919 if (cstate->file_has_oids && cstate->binary)
2920 {
2921 getTypeBinaryInputInfo(OIDOID,
2922 &in_func_oid, &cstate->oid_typioparam);
2923 fmgr_info(in_func_oid, &cstate->oid_in_function);
2924 }
2925
2926 /* create workspace for CopyReadAttributes results */
2927 if (!cstate->binary)
2928 {
2929 AttrNumber attr_count = list_length(cstate->attnumlist);
2930 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
2931
2932 cstate->max_fields = nfields;
2933 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
2934 }
2935
2936 MemoryContextSwitchTo(oldcontext);
2937
2938 return cstate;
2939 }
2940
2941 /*
2942 * Read raw fields in the next line for COPY FROM in text or csv mode.
2943 * Return false if no more lines.
2944 *
2945 * An internal temporary buffer is returned via 'fields'. It is valid until
2946 * the next call of the function. Since the function returns all raw fields
2947 * in the input file, 'nfields' could be different from the number of columns
2948 * in the relation.
2949 *
2950 * NOTE: force_not_null option are not applied to the returned fields.
2951 */
2952 bool
NextCopyFromRawFields(CopyState cstate,char *** fields,int * nfields)2953 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
2954 {
2955 int fldct;
2956 bool done;
2957
2958 /* only available for text or csv input */
2959 Assert(!cstate->binary);
2960
2961 /* on input just throw the header line away */
2962 if (cstate->cur_lineno == 0 && cstate->header_line)
2963 {
2964 cstate->cur_lineno++;
2965 if (CopyReadLine(cstate))
2966 return false; /* done */
2967 }
2968
2969 cstate->cur_lineno++;
2970
2971 /* Actually read the line into memory here */
2972 done = CopyReadLine(cstate);
2973
2974 /*
2975 * EOF at start of line means we're done. If we see EOF after some
2976 * characters, we act as though it was newline followed by EOF, ie,
2977 * process the line and then exit loop on next iteration.
2978 */
2979 if (done && cstate->line_buf.len == 0)
2980 return false;
2981
2982 /* Parse the line into de-escaped field values */
2983 if (cstate->csv_mode)
2984 fldct = CopyReadAttributesCSV(cstate);
2985 else
2986 fldct = CopyReadAttributesText(cstate);
2987
2988 *fields = cstate->raw_fields;
2989 *nfields = fldct;
2990 return true;
2991 }
2992
2993 /*
2994 * Read next tuple from file for COPY FROM. Return false if no more tuples.
2995 *
2996 * 'econtext' is used to evaluate default expression for each columns not
2997 * read from the file. It can be NULL when no default values are used, i.e.
2998 * when all columns are read from the file.
2999 *
3000 * 'values' and 'nulls' arrays must be the same length as columns of the
3001 * relation passed to BeginCopyFrom. This function fills the arrays.
3002 * Oid of the tuple is returned with 'tupleOid' separately.
3003 */
3004 bool
NextCopyFrom(CopyState cstate,ExprContext * econtext,Datum * values,bool * nulls,Oid * tupleOid)3005 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3006 Datum *values, bool *nulls, Oid *tupleOid)
3007 {
3008 TupleDesc tupDesc;
3009 Form_pg_attribute *attr;
3010 AttrNumber num_phys_attrs,
3011 attr_count,
3012 num_defaults = cstate->num_defaults;
3013 FmgrInfo *in_functions = cstate->in_functions;
3014 Oid *typioparams = cstate->typioparams;
3015 int i;
3016 int nfields;
3017 bool isnull;
3018 bool file_has_oids = cstate->file_has_oids;
3019 int *defmap = cstate->defmap;
3020 ExprState **defexprs = cstate->defexprs;
3021
3022 tupDesc = RelationGetDescr(cstate->rel);
3023 attr = tupDesc->attrs;
3024 num_phys_attrs = tupDesc->natts;
3025 attr_count = list_length(cstate->attnumlist);
3026 nfields = file_has_oids ? (attr_count + 1) : attr_count;
3027
3028 /* Initialize all values for row to NULL */
3029 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3030 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3031
3032 if (!cstate->binary)
3033 {
3034 char **field_strings;
3035 ListCell *cur;
3036 int fldct;
3037 int fieldno;
3038 char *string;
3039
3040 /* read raw fields in the next line */
3041 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3042 return false;
3043
3044 /* check for overflowing fields */
3045 if (nfields > 0 && fldct > nfields)
3046 ereport(ERROR,
3047 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3048 errmsg("extra data after last expected column")));
3049
3050 fieldno = 0;
3051
3052 /* Read the OID field if present */
3053 if (file_has_oids)
3054 {
3055 if (fieldno >= fldct)
3056 ereport(ERROR,
3057 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3058 errmsg("missing data for OID column")));
3059 string = field_strings[fieldno++];
3060
3061 if (string == NULL)
3062 ereport(ERROR,
3063 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3064 errmsg("null OID in COPY data")));
3065 else if (cstate->oids && tupleOid != NULL)
3066 {
3067 cstate->cur_attname = "oid";
3068 cstate->cur_attval = string;
3069 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3070 CStringGetDatum(string)));
3071 if (*tupleOid == InvalidOid)
3072 ereport(ERROR,
3073 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3074 errmsg("invalid OID in COPY data")));
3075 cstate->cur_attname = NULL;
3076 cstate->cur_attval = NULL;
3077 }
3078 }
3079
3080 /* Loop to read the user attributes on the line. */
3081 foreach(cur, cstate->attnumlist)
3082 {
3083 int attnum = lfirst_int(cur);
3084 int m = attnum - 1;
3085
3086 if (fieldno >= fldct)
3087 ereport(ERROR,
3088 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3089 errmsg("missing data for column \"%s\"",
3090 NameStr(attr[m]->attname))));
3091 string = field_strings[fieldno++];
3092
3093 if (cstate->convert_select_flags &&
3094 !cstate->convert_select_flags[m])
3095 {
3096 /* ignore input field, leaving column as NULL */
3097 continue;
3098 }
3099
3100 if (cstate->csv_mode)
3101 {
3102 if (string == NULL &&
3103 cstate->force_notnull_flags[m])
3104 {
3105 /*
3106 * FORCE_NOT_NULL option is set and column is NULL -
3107 * convert it to the NULL string.
3108 */
3109 string = cstate->null_print;
3110 }
3111 else if (string != NULL && cstate->force_null_flags[m]
3112 && strcmp(string, cstate->null_print) == 0)
3113 {
3114 /*
3115 * FORCE_NULL option is set and column matches the NULL
3116 * string. It must have been quoted, or otherwise the
3117 * string would already have been set to NULL. Convert it
3118 * to NULL as specified.
3119 */
3120 string = NULL;
3121 }
3122 }
3123
3124 cstate->cur_attname = NameStr(attr[m]->attname);
3125 cstate->cur_attval = string;
3126 values[m] = InputFunctionCall(&in_functions[m],
3127 string,
3128 typioparams[m],
3129 attr[m]->atttypmod);
3130 if (string != NULL)
3131 nulls[m] = false;
3132 cstate->cur_attname = NULL;
3133 cstate->cur_attval = NULL;
3134 }
3135
3136 Assert(fieldno == nfields);
3137 }
3138 else
3139 {
3140 /* binary */
3141 int16 fld_count;
3142 ListCell *cur;
3143
3144 cstate->cur_lineno++;
3145
3146 if (!CopyGetInt16(cstate, &fld_count))
3147 {
3148 /* EOF detected (end of file, or protocol-level EOF) */
3149 return false;
3150 }
3151
3152 if (fld_count == -1)
3153 {
3154 /*
3155 * Received EOF marker. In a V3-protocol copy, wait for the
3156 * protocol-level EOF, and complain if it doesn't come
3157 * immediately. This ensures that we correctly handle CopyFail,
3158 * if client chooses to send that now.
3159 *
3160 * Note that we MUST NOT try to read more data in an old-protocol
3161 * copy, since there is no protocol-level EOF marker then. We
3162 * could go either way for copy from file, but choose to throw
3163 * error if there's data after the EOF marker, for consistency
3164 * with the new-protocol case.
3165 */
3166 char dummy;
3167
3168 if (cstate->copy_dest != COPY_OLD_FE &&
3169 CopyGetData(cstate, &dummy, 1, 1) > 0)
3170 ereport(ERROR,
3171 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3172 errmsg("received copy data after EOF marker")));
3173 return false;
3174 }
3175
3176 if (fld_count != attr_count)
3177 ereport(ERROR,
3178 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3179 errmsg("row field count is %d, expected %d",
3180 (int) fld_count, attr_count)));
3181
3182 if (file_has_oids)
3183 {
3184 Oid loaded_oid;
3185
3186 cstate->cur_attname = "oid";
3187 loaded_oid =
3188 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3189 0,
3190 &cstate->oid_in_function,
3191 cstate->oid_typioparam,
3192 -1,
3193 &isnull));
3194 if (isnull || loaded_oid == InvalidOid)
3195 ereport(ERROR,
3196 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3197 errmsg("invalid OID in COPY data")));
3198 cstate->cur_attname = NULL;
3199 if (cstate->oids && tupleOid != NULL)
3200 *tupleOid = loaded_oid;
3201 }
3202
3203 i = 0;
3204 foreach(cur, cstate->attnumlist)
3205 {
3206 int attnum = lfirst_int(cur);
3207 int m = attnum - 1;
3208
3209 cstate->cur_attname = NameStr(attr[m]->attname);
3210 i++;
3211 values[m] = CopyReadBinaryAttribute(cstate,
3212 i,
3213 &in_functions[m],
3214 typioparams[m],
3215 attr[m]->atttypmod,
3216 &nulls[m]);
3217 cstate->cur_attname = NULL;
3218 }
3219 }
3220
3221 /*
3222 * Now compute and insert any defaults available for the columns not
3223 * provided by the input data. Anything not processed here or above will
3224 * remain NULL.
3225 */
3226 for (i = 0; i < num_defaults; i++)
3227 {
3228 /*
3229 * The caller must supply econtext and have switched into the
3230 * per-tuple memory context in it.
3231 */
3232 Assert(econtext != NULL);
3233 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3234
3235 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3236 &nulls[defmap[i]], NULL);
3237 }
3238
3239 return true;
3240 }
3241
3242 /*
3243 * Clean up storage and release resources for COPY FROM.
3244 */
3245 void
EndCopyFrom(CopyState cstate)3246 EndCopyFrom(CopyState cstate)
3247 {
3248 /* No COPY FROM related resources except memory. */
3249
3250 EndCopy(cstate);
3251 }
3252
3253 /*
3254 * Read the next input line and stash it in line_buf, with conversion to
3255 * server encoding.
3256 *
3257 * Result is true if read was terminated by EOF, false if terminated
3258 * by newline. The terminating newline or EOF marker is not included
3259 * in the final value of line_buf.
3260 */
3261 static bool
CopyReadLine(CopyState cstate)3262 CopyReadLine(CopyState cstate)
3263 {
3264 bool result;
3265
3266 resetStringInfo(&cstate->line_buf);
3267 cstate->line_buf_valid = true;
3268
3269 /* Mark that encoding conversion hasn't occurred yet */
3270 cstate->line_buf_converted = false;
3271
3272 /* Parse data and transfer into line_buf */
3273 result = CopyReadLineText(cstate);
3274
3275 if (result)
3276 {
3277 /*
3278 * Reached EOF. In protocol version 3, we should ignore anything
3279 * after \. up to the protocol end of copy data. (XXX maybe better
3280 * not to treat \. as special?)
3281 */
3282 if (cstate->copy_dest == COPY_NEW_FE)
3283 {
3284 do
3285 {
3286 cstate->raw_buf_index = cstate->raw_buf_len;
3287 } while (CopyLoadRawBuf(cstate));
3288 }
3289 }
3290 else
3291 {
3292 /*
3293 * If we didn't hit EOF, then we must have transferred the EOL marker
3294 * to line_buf along with the data. Get rid of it.
3295 */
3296 switch (cstate->eol_type)
3297 {
3298 case EOL_NL:
3299 Assert(cstate->line_buf.len >= 1);
3300 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3301 cstate->line_buf.len--;
3302 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3303 break;
3304 case EOL_CR:
3305 Assert(cstate->line_buf.len >= 1);
3306 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3307 cstate->line_buf.len--;
3308 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3309 break;
3310 case EOL_CRNL:
3311 Assert(cstate->line_buf.len >= 2);
3312 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3313 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3314 cstate->line_buf.len -= 2;
3315 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3316 break;
3317 case EOL_UNKNOWN:
3318 /* shouldn't get here */
3319 Assert(false);
3320 break;
3321 }
3322 }
3323
3324 /* Done reading the line. Convert it to server encoding. */
3325 if (cstate->need_transcoding)
3326 {
3327 char *cvt;
3328
3329 cvt = pg_any_to_server(cstate->line_buf.data,
3330 cstate->line_buf.len,
3331 cstate->file_encoding);
3332 if (cvt != cstate->line_buf.data)
3333 {
3334 /* transfer converted data back to line_buf */
3335 resetStringInfo(&cstate->line_buf);
3336 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3337 pfree(cvt);
3338 }
3339 }
3340
3341 /* Now it's safe to use the buffer in error messages */
3342 cstate->line_buf_converted = true;
3343
3344 return result;
3345 }
3346
3347 /*
3348 * CopyReadLineText - inner loop of CopyReadLine for text mode
3349 */
3350 static bool
CopyReadLineText(CopyState cstate)3351 CopyReadLineText(CopyState cstate)
3352 {
3353 char *copy_raw_buf;
3354 int raw_buf_ptr;
3355 int copy_buf_len;
3356 bool need_data = false;
3357 bool hit_eof = false;
3358 bool result = false;
3359 char mblen_str[2];
3360
3361 /* CSV variables */
3362 bool first_char_in_line = true;
3363 bool in_quote = false,
3364 last_was_esc = false;
3365 char quotec = '\0';
3366 char escapec = '\0';
3367
3368 if (cstate->csv_mode)
3369 {
3370 quotec = cstate->quote[0];
3371 escapec = cstate->escape[0];
3372 /* ignore special escape processing if it's the same as quotec */
3373 if (quotec == escapec)
3374 escapec = '\0';
3375 }
3376
3377 mblen_str[1] = '\0';
3378
3379 /*
3380 * The objective of this loop is to transfer the entire next input line
3381 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3382 * \n) and the end-of-copy marker (\.).
3383 *
3384 * In CSV mode, \r and \n inside a quoted field are just part of the data
3385 * value and are put in line_buf. We keep just enough state to know if we
3386 * are currently in a quoted field or not.
3387 *
3388 * These four characters, and the CSV escape and quote characters, are
3389 * assumed the same in frontend and backend encodings.
3390 *
3391 * For speed, we try to move data from raw_buf to line_buf in chunks
3392 * rather than one character at a time. raw_buf_ptr points to the next
3393 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3394 * have been determined to be part of the line, but not yet transferred to
3395 * line_buf.
3396 *
3397 * For a little extra speed within the loop, we copy raw_buf and
3398 * raw_buf_len into local variables.
3399 */
3400 copy_raw_buf = cstate->raw_buf;
3401 raw_buf_ptr = cstate->raw_buf_index;
3402 copy_buf_len = cstate->raw_buf_len;
3403
3404 for (;;)
3405 {
3406 int prev_raw_ptr;
3407 char c;
3408
3409 /*
3410 * Load more data if needed. Ideally we would just force four bytes
3411 * of read-ahead and avoid the many calls to
3412 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3413 * does not allow us to read too far ahead or we might read into the
3414 * next data, so we read-ahead only as far we know we can. One
3415 * optimization would be to read-ahead four byte here if
3416 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3417 * considering the size of the buffer.
3418 */
3419 if (raw_buf_ptr >= copy_buf_len || need_data)
3420 {
3421 REFILL_LINEBUF;
3422
3423 /*
3424 * Try to read some more data. This will certainly reset
3425 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3426 */
3427 if (!CopyLoadRawBuf(cstate))
3428 hit_eof = true;
3429 raw_buf_ptr = 0;
3430 copy_buf_len = cstate->raw_buf_len;
3431
3432 /*
3433 * If we are completely out of data, break out of the loop,
3434 * reporting EOF.
3435 */
3436 if (copy_buf_len <= 0)
3437 {
3438 result = true;
3439 break;
3440 }
3441 need_data = false;
3442 }
3443
3444 /* OK to fetch a character */
3445 prev_raw_ptr = raw_buf_ptr;
3446 c = copy_raw_buf[raw_buf_ptr++];
3447
3448 if (cstate->csv_mode)
3449 {
3450 /*
3451 * If character is '\\' or '\r', we may need to look ahead below.
3452 * Force fetch of the next character if we don't already have it.
3453 * We need to do this before changing CSV state, in case one of
3454 * these characters is also the quote or escape character.
3455 *
3456 * Note: old-protocol does not like forced prefetch, but it's OK
3457 * here since we cannot validly be at EOF.
3458 */
3459 if (c == '\\' || c == '\r')
3460 {
3461 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3462 }
3463
3464 /*
3465 * Dealing with quotes and escapes here is mildly tricky. If the
3466 * quote char is also the escape char, there's no problem - we
3467 * just use the char as a toggle. If they are different, we need
3468 * to ensure that we only take account of an escape inside a
3469 * quoted field and immediately preceding a quote char, and not
3470 * the second in an escape-escape sequence.
3471 */
3472 if (in_quote && c == escapec)
3473 last_was_esc = !last_was_esc;
3474 if (c == quotec && !last_was_esc)
3475 in_quote = !in_quote;
3476 if (c != escapec)
3477 last_was_esc = false;
3478
3479 /*
3480 * Updating the line count for embedded CR and/or LF chars is
3481 * necessarily a little fragile - this test is probably about the
3482 * best we can do. (XXX it's arguable whether we should do this
3483 * at all --- is cur_lineno a physical or logical count?)
3484 */
3485 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3486 cstate->cur_lineno++;
3487 }
3488
3489 /* Process \r */
3490 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3491 {
3492 /* Check for \r\n on first line, _and_ handle \r\n. */
3493 if (cstate->eol_type == EOL_UNKNOWN ||
3494 cstate->eol_type == EOL_CRNL)
3495 {
3496 /*
3497 * If need more data, go back to loop top to load it.
3498 *
3499 * Note that if we are at EOF, c will wind up as '\0' because
3500 * of the guaranteed pad of raw_buf.
3501 */
3502 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3503
3504 /* get next char */
3505 c = copy_raw_buf[raw_buf_ptr];
3506
3507 if (c == '\n')
3508 {
3509 raw_buf_ptr++; /* eat newline */
3510 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3511 }
3512 else
3513 {
3514 /* found \r, but no \n */
3515 if (cstate->eol_type == EOL_CRNL)
3516 ereport(ERROR,
3517 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3518 !cstate->csv_mode ?
3519 errmsg("literal carriage return found in data") :
3520 errmsg("unquoted carriage return found in data"),
3521 !cstate->csv_mode ?
3522 errhint("Use \"\\r\" to represent carriage return.") :
3523 errhint("Use quoted CSV field to represent carriage return.")));
3524
3525 /*
3526 * if we got here, it is the first line and we didn't find
3527 * \n, so don't consume the peeked character
3528 */
3529 cstate->eol_type = EOL_CR;
3530 }
3531 }
3532 else if (cstate->eol_type == EOL_NL)
3533 ereport(ERROR,
3534 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3535 !cstate->csv_mode ?
3536 errmsg("literal carriage return found in data") :
3537 errmsg("unquoted carriage return found in data"),
3538 !cstate->csv_mode ?
3539 errhint("Use \"\\r\" to represent carriage return.") :
3540 errhint("Use quoted CSV field to represent carriage return.")));
3541 /* If reach here, we have found the line terminator */
3542 break;
3543 }
3544
3545 /* Process \n */
3546 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3547 {
3548 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3549 ereport(ERROR,
3550 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3551 !cstate->csv_mode ?
3552 errmsg("literal newline found in data") :
3553 errmsg("unquoted newline found in data"),
3554 !cstate->csv_mode ?
3555 errhint("Use \"\\n\" to represent newline.") :
3556 errhint("Use quoted CSV field to represent newline.")));
3557 cstate->eol_type = EOL_NL; /* in case not set yet */
3558 /* If reach here, we have found the line terminator */
3559 break;
3560 }
3561
3562 /*
3563 * In CSV mode, we only recognize \. alone on a line. This is because
3564 * \. is a valid CSV data value.
3565 */
3566 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3567 {
3568 char c2;
3569
3570 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3571 IF_NEED_REFILL_AND_EOF_BREAK(0);
3572
3573 /* -----
3574 * get next character
3575 * Note: we do not change c so if it isn't \., we can fall
3576 * through and continue processing for file encoding.
3577 * -----
3578 */
3579 c2 = copy_raw_buf[raw_buf_ptr];
3580
3581 if (c2 == '.')
3582 {
3583 raw_buf_ptr++; /* consume the '.' */
3584
3585 /*
3586 * Note: if we loop back for more data here, it does not
3587 * matter that the CSV state change checks are re-executed; we
3588 * will come back here with no important state changed.
3589 */
3590 if (cstate->eol_type == EOL_CRNL)
3591 {
3592 /* Get the next character */
3593 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3594 /* if hit_eof, c2 will become '\0' */
3595 c2 = copy_raw_buf[raw_buf_ptr++];
3596
3597 if (c2 == '\n')
3598 {
3599 if (!cstate->csv_mode)
3600 ereport(ERROR,
3601 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3602 errmsg("end-of-copy marker does not match previous newline style")));
3603 else
3604 NO_END_OF_COPY_GOTO;
3605 }
3606 else if (c2 != '\r')
3607 {
3608 if (!cstate->csv_mode)
3609 ereport(ERROR,
3610 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3611 errmsg("end-of-copy marker corrupt")));
3612 else
3613 NO_END_OF_COPY_GOTO;
3614 }
3615 }
3616
3617 /* Get the next character */
3618 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3619 /* if hit_eof, c2 will become '\0' */
3620 c2 = copy_raw_buf[raw_buf_ptr++];
3621
3622 if (c2 != '\r' && c2 != '\n')
3623 {
3624 if (!cstate->csv_mode)
3625 ereport(ERROR,
3626 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3627 errmsg("end-of-copy marker corrupt")));
3628 else
3629 NO_END_OF_COPY_GOTO;
3630 }
3631
3632 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3633 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3634 (cstate->eol_type == EOL_CR && c2 != '\r'))
3635 {
3636 ereport(ERROR,
3637 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3638 errmsg("end-of-copy marker does not match previous newline style")));
3639 }
3640
3641 /*
3642 * Transfer only the data before the \. into line_buf, then
3643 * discard the data and the \. sequence.
3644 */
3645 if (prev_raw_ptr > cstate->raw_buf_index)
3646 appendBinaryStringInfo(&cstate->line_buf,
3647 cstate->raw_buf + cstate->raw_buf_index,
3648 prev_raw_ptr - cstate->raw_buf_index);
3649 cstate->raw_buf_index = raw_buf_ptr;
3650 result = true; /* report EOF */
3651 break;
3652 }
3653 else if (!cstate->csv_mode)
3654 {
3655 /*
3656 * If we are here, it means we found a backslash followed by
3657 * something other than a period. In non-CSV mode, anything
3658 * after a backslash is special, so we skip over that second
3659 * character too. If we didn't do that \\. would be
3660 * considered an eof-of copy, while in non-CSV mode it is a
3661 * literal backslash followed by a period. In CSV mode,
3662 * backslashes are not special, so we want to process the
3663 * character after the backslash just like a normal character,
3664 * so we don't increment in those cases.
3665 *
3666 * Set 'c' to skip whole character correctly in multi-byte
3667 * encodings. If we don't have the whole character in the
3668 * buffer yet, we might loop back to process it, after all,
3669 * but that's OK because multi-byte characters cannot have any
3670 * special meaning.
3671 */
3672 raw_buf_ptr++;
3673 c = c2;
3674 }
3675 }
3676
3677 /*
3678 * This label is for CSV cases where \. appears at the start of a
3679 * line, but there is more text after it, meaning it was a data value.
3680 * We are more strict for \. in CSV mode because \. could be a data
3681 * value, while in non-CSV mode, \. cannot be a data value.
3682 */
3683 not_end_of_copy:
3684
3685 /*
3686 * Process all bytes of a multi-byte character as a group.
3687 *
3688 * We only support multi-byte sequences where the first byte has the
3689 * high-bit set, so as an optimization we can avoid this block
3690 * entirely if it is not set.
3691 */
3692 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3693 {
3694 int mblen;
3695
3696 mblen_str[0] = c;
3697 /* All our encodings only read the first byte to get the length */
3698 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3699 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3700 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3701 raw_buf_ptr += mblen - 1;
3702 }
3703 first_char_in_line = false;
3704 } /* end of outer loop */
3705
3706 /*
3707 * Transfer any still-uncopied data to line_buf.
3708 */
3709 REFILL_LINEBUF;
3710
3711 return result;
3712 }
3713
3714 /*
3715 * Return decimal value for a hexadecimal digit
3716 */
3717 static int
GetDecimalFromHex(char hex)3718 GetDecimalFromHex(char hex)
3719 {
3720 if (isdigit((unsigned char) hex))
3721 return hex - '0';
3722 else
3723 return tolower((unsigned char) hex) - 'a' + 10;
3724 }
3725
3726 /*
3727 * Parse the current line into separate attributes (fields),
3728 * performing de-escaping as needed.
3729 *
3730 * The input is in line_buf. We use attribute_buf to hold the result
3731 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3732 * string, or NULL when the input matches the null marker string.
3733 * This array is expanded as necessary.
3734 *
3735 * (Note that the caller cannot check for nulls since the returned
3736 * string would be the post-de-escaping equivalent, which may look
3737 * the same as some valid data string.)
3738 *
3739 * delim is the column delimiter string (must be just one byte for now).
3740 * null_print is the null marker string. Note that this is compared to
3741 * the pre-de-escaped input string.
3742 *
3743 * The return value is the number of fields actually read.
3744 */
3745 static int
CopyReadAttributesText(CopyState cstate)3746 CopyReadAttributesText(CopyState cstate)
3747 {
3748 char delimc = cstate->delim[0];
3749 int fieldno;
3750 char *output_ptr;
3751 char *cur_ptr;
3752 char *line_end_ptr;
3753
3754 /*
3755 * We need a special case for zero-column tables: check that the input
3756 * line is empty, and return.
3757 */
3758 if (cstate->max_fields <= 0)
3759 {
3760 if (cstate->line_buf.len != 0)
3761 ereport(ERROR,
3762 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3763 errmsg("extra data after last expected column")));
3764 return 0;
3765 }
3766
3767 resetStringInfo(&cstate->attribute_buf);
3768
3769 /*
3770 * The de-escaped attributes will certainly not be longer than the input
3771 * data line, so we can just force attribute_buf to be large enough and
3772 * then transfer data without any checks for enough space. We need to do
3773 * it this way because enlarging attribute_buf mid-stream would invalidate
3774 * pointers already stored into cstate->raw_fields[].
3775 */
3776 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3777 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3778 output_ptr = cstate->attribute_buf.data;
3779
3780 /* set pointer variables for loop */
3781 cur_ptr = cstate->line_buf.data;
3782 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3783
3784 /* Outer loop iterates over fields */
3785 fieldno = 0;
3786 for (;;)
3787 {
3788 bool found_delim = false;
3789 char *start_ptr;
3790 char *end_ptr;
3791 int input_len;
3792 bool saw_non_ascii = false;
3793
3794 /* Make sure there is enough space for the next value */
3795 if (fieldno >= cstate->max_fields)
3796 {
3797 cstate->max_fields *= 2;
3798 cstate->raw_fields =
3799 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3800 }
3801
3802 /* Remember start of field on both input and output sides */
3803 start_ptr = cur_ptr;
3804 cstate->raw_fields[fieldno] = output_ptr;
3805
3806 /*
3807 * Scan data for field.
3808 *
3809 * Note that in this loop, we are scanning to locate the end of field
3810 * and also speculatively performing de-escaping. Once we find the
3811 * end-of-field, we can match the raw field contents against the null
3812 * marker string. Only after that comparison fails do we know that
3813 * de-escaping is actually the right thing to do; therefore we *must
3814 * not* throw any syntax errors before we've done the null-marker
3815 * check.
3816 */
3817 for (;;)
3818 {
3819 char c;
3820
3821 end_ptr = cur_ptr;
3822 if (cur_ptr >= line_end_ptr)
3823 break;
3824 c = *cur_ptr++;
3825 if (c == delimc)
3826 {
3827 found_delim = true;
3828 break;
3829 }
3830 if (c == '\\')
3831 {
3832 if (cur_ptr >= line_end_ptr)
3833 break;
3834 c = *cur_ptr++;
3835 switch (c)
3836 {
3837 case '0':
3838 case '1':
3839 case '2':
3840 case '3':
3841 case '4':
3842 case '5':
3843 case '6':
3844 case '7':
3845 {
3846 /* handle \013 */
3847 int val;
3848
3849 val = OCTVALUE(c);
3850 if (cur_ptr < line_end_ptr)
3851 {
3852 c = *cur_ptr;
3853 if (ISOCTAL(c))
3854 {
3855 cur_ptr++;
3856 val = (val << 3) + OCTVALUE(c);
3857 if (cur_ptr < line_end_ptr)
3858 {
3859 c = *cur_ptr;
3860 if (ISOCTAL(c))
3861 {
3862 cur_ptr++;
3863 val = (val << 3) + OCTVALUE(c);
3864 }
3865 }
3866 }
3867 }
3868 c = val & 0377;
3869 if (c == '\0' || IS_HIGHBIT_SET(c))
3870 saw_non_ascii = true;
3871 }
3872 break;
3873 case 'x':
3874 /* Handle \x3F */
3875 if (cur_ptr < line_end_ptr)
3876 {
3877 char hexchar = *cur_ptr;
3878
3879 if (isxdigit((unsigned char) hexchar))
3880 {
3881 int val = GetDecimalFromHex(hexchar);
3882
3883 cur_ptr++;
3884 if (cur_ptr < line_end_ptr)
3885 {
3886 hexchar = *cur_ptr;
3887 if (isxdigit((unsigned char) hexchar))
3888 {
3889 cur_ptr++;
3890 val = (val << 4) + GetDecimalFromHex(hexchar);
3891 }
3892 }
3893 c = val & 0xff;
3894 if (c == '\0' || IS_HIGHBIT_SET(c))
3895 saw_non_ascii = true;
3896 }
3897 }
3898 break;
3899 case 'b':
3900 c = '\b';
3901 break;
3902 case 'f':
3903 c = '\f';
3904 break;
3905 case 'n':
3906 c = '\n';
3907 break;
3908 case 'r':
3909 c = '\r';
3910 break;
3911 case 't':
3912 c = '\t';
3913 break;
3914 case 'v':
3915 c = '\v';
3916 break;
3917
3918 /*
3919 * in all other cases, take the char after '\'
3920 * literally
3921 */
3922 }
3923 }
3924
3925 /* Add c to output string */
3926 *output_ptr++ = c;
3927 }
3928
3929 /* Check whether raw input matched null marker */
3930 input_len = end_ptr - start_ptr;
3931 if (input_len == cstate->null_print_len &&
3932 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3933 cstate->raw_fields[fieldno] = NULL;
3934 else
3935 {
3936 /*
3937 * At this point we know the field is supposed to contain data.
3938 *
3939 * If we de-escaped any non-7-bit-ASCII chars, make sure the
3940 * resulting string is valid data for the db encoding.
3941 */
3942 if (saw_non_ascii)
3943 {
3944 char *fld = cstate->raw_fields[fieldno];
3945
3946 pg_verifymbstr(fld, output_ptr - fld, false);
3947 }
3948 }
3949
3950 /* Terminate attribute value in output area */
3951 *output_ptr++ = '\0';
3952
3953 fieldno++;
3954 /* Done if we hit EOL instead of a delim */
3955 if (!found_delim)
3956 break;
3957 }
3958
3959 /* Clean up state of attribute_buf */
3960 output_ptr--;
3961 Assert(*output_ptr == '\0');
3962 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3963
3964 return fieldno;
3965 }
3966
3967 /*
3968 * Parse the current line into separate attributes (fields),
3969 * performing de-escaping as needed. This has exactly the same API as
3970 * CopyReadAttributesText, except we parse the fields according to
3971 * "standard" (i.e. common) CSV usage.
3972 */
3973 static int
CopyReadAttributesCSV(CopyState cstate)3974 CopyReadAttributesCSV(CopyState cstate)
3975 {
3976 char delimc = cstate->delim[0];
3977 char quotec = cstate->quote[0];
3978 char escapec = cstate->escape[0];
3979 int fieldno;
3980 char *output_ptr;
3981 char *cur_ptr;
3982 char *line_end_ptr;
3983
3984 /*
3985 * We need a special case for zero-column tables: check that the input
3986 * line is empty, and return.
3987 */
3988 if (cstate->max_fields <= 0)
3989 {
3990 if (cstate->line_buf.len != 0)
3991 ereport(ERROR,
3992 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3993 errmsg("extra data after last expected column")));
3994 return 0;
3995 }
3996
3997 resetStringInfo(&cstate->attribute_buf);
3998
3999 /*
4000 * The de-escaped attributes will certainly not be longer than the input
4001 * data line, so we can just force attribute_buf to be large enough and
4002 * then transfer data without any checks for enough space. We need to do
4003 * it this way because enlarging attribute_buf mid-stream would invalidate
4004 * pointers already stored into cstate->raw_fields[].
4005 */
4006 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4007 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4008 output_ptr = cstate->attribute_buf.data;
4009
4010 /* set pointer variables for loop */
4011 cur_ptr = cstate->line_buf.data;
4012 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4013
4014 /* Outer loop iterates over fields */
4015 fieldno = 0;
4016 for (;;)
4017 {
4018 bool found_delim = false;
4019 bool saw_quote = false;
4020 char *start_ptr;
4021 char *end_ptr;
4022 int input_len;
4023
4024 /* Make sure there is enough space for the next value */
4025 if (fieldno >= cstate->max_fields)
4026 {
4027 cstate->max_fields *= 2;
4028 cstate->raw_fields =
4029 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4030 }
4031
4032 /* Remember start of field on both input and output sides */
4033 start_ptr = cur_ptr;
4034 cstate->raw_fields[fieldno] = output_ptr;
4035
4036 /*
4037 * Scan data for field,
4038 *
4039 * The loop starts in "not quote" mode and then toggles between that
4040 * and "in quote" mode. The loop exits normally if it is in "not
4041 * quote" mode and a delimiter or line end is seen.
4042 */
4043 for (;;)
4044 {
4045 char c;
4046
4047 /* Not in quote */
4048 for (;;)
4049 {
4050 end_ptr = cur_ptr;
4051 if (cur_ptr >= line_end_ptr)
4052 goto endfield;
4053 c = *cur_ptr++;
4054 /* unquoted field delimiter */
4055 if (c == delimc)
4056 {
4057 found_delim = true;
4058 goto endfield;
4059 }
4060 /* start of quoted field (or part of field) */
4061 if (c == quotec)
4062 {
4063 saw_quote = true;
4064 break;
4065 }
4066 /* Add c to output string */
4067 *output_ptr++ = c;
4068 }
4069
4070 /* In quote */
4071 for (;;)
4072 {
4073 end_ptr = cur_ptr;
4074 if (cur_ptr >= line_end_ptr)
4075 ereport(ERROR,
4076 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4077 errmsg("unterminated CSV quoted field")));
4078
4079 c = *cur_ptr++;
4080
4081 /* escape within a quoted field */
4082 if (c == escapec)
4083 {
4084 /*
4085 * peek at the next char if available, and escape it if it
4086 * is an escape char or a quote char
4087 */
4088 if (cur_ptr < line_end_ptr)
4089 {
4090 char nextc = *cur_ptr;
4091
4092 if (nextc == escapec || nextc == quotec)
4093 {
4094 *output_ptr++ = nextc;
4095 cur_ptr++;
4096 continue;
4097 }
4098 }
4099 }
4100
4101 /*
4102 * end of quoted field. Must do this test after testing for
4103 * escape in case quote char and escape char are the same
4104 * (which is the common case).
4105 */
4106 if (c == quotec)
4107 break;
4108
4109 /* Add c to output string */
4110 *output_ptr++ = c;
4111 }
4112 }
4113 endfield:
4114
4115 /* Terminate attribute value in output area */
4116 *output_ptr++ = '\0';
4117
4118 /* Check whether raw input matched null marker */
4119 input_len = end_ptr - start_ptr;
4120 if (!saw_quote && input_len == cstate->null_print_len &&
4121 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4122 cstate->raw_fields[fieldno] = NULL;
4123
4124 fieldno++;
4125 /* Done if we hit EOL instead of a delim */
4126 if (!found_delim)
4127 break;
4128 }
4129
4130 /* Clean up state of attribute_buf */
4131 output_ptr--;
4132 Assert(*output_ptr == '\0');
4133 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4134
4135 return fieldno;
4136 }
4137
4138
4139 /*
4140 * Read a binary attribute
4141 */
4142 static Datum
CopyReadBinaryAttribute(CopyState cstate,int column_no,FmgrInfo * flinfo,Oid typioparam,int32 typmod,bool * isnull)4143 CopyReadBinaryAttribute(CopyState cstate,
4144 int column_no, FmgrInfo *flinfo,
4145 Oid typioparam, int32 typmod,
4146 bool *isnull)
4147 {
4148 int32 fld_size;
4149 Datum result;
4150
4151 if (!CopyGetInt32(cstate, &fld_size))
4152 ereport(ERROR,
4153 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4154 errmsg("unexpected EOF in COPY data")));
4155 if (fld_size == -1)
4156 {
4157 *isnull = true;
4158 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4159 }
4160 if (fld_size < 0)
4161 ereport(ERROR,
4162 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4163 errmsg("invalid field size")));
4164
4165 /* reset attribute_buf to empty, and load raw data in it */
4166 resetStringInfo(&cstate->attribute_buf);
4167
4168 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4169 if (CopyGetData(cstate, cstate->attribute_buf.data,
4170 fld_size, fld_size) != fld_size)
4171 ereport(ERROR,
4172 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4173 errmsg("unexpected EOF in COPY data")));
4174
4175 cstate->attribute_buf.len = fld_size;
4176 cstate->attribute_buf.data[fld_size] = '\0';
4177
4178 /* Call the column type's binary input converter */
4179 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4180 typioparam, typmod);
4181
4182 /* Trouble if it didn't eat the whole buffer */
4183 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4184 ereport(ERROR,
4185 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4186 errmsg("incorrect binary data format")));
4187
4188 *isnull = false;
4189 return result;
4190 }
4191
4192 /*
4193 * Send text representation of one attribute, with conversion and escaping
4194 */
4195 #define DUMPSOFAR() \
4196 do { \
4197 if (ptr > start) \
4198 CopySendData(cstate, start, ptr - start); \
4199 } while (0)
4200
4201 static void
CopyAttributeOutText(CopyState cstate,char * string)4202 CopyAttributeOutText(CopyState cstate, char *string)
4203 {
4204 char *ptr;
4205 char *start;
4206 char c;
4207 char delimc = cstate->delim[0];
4208
4209 if (cstate->need_transcoding)
4210 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4211 else
4212 ptr = string;
4213
4214 /*
4215 * We have to grovel through the string searching for control characters
4216 * and instances of the delimiter character. In most cases, though, these
4217 * are infrequent. To avoid overhead from calling CopySendData once per
4218 * character, we dump out all characters between escaped characters in a
4219 * single call. The loop invariant is that the data from "start" to "ptr"
4220 * can be sent literally, but hasn't yet been.
4221 *
4222 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4223 * in valid backend encodings, extra bytes of a multibyte character never
4224 * look like ASCII. This loop is sufficiently performance-critical that
4225 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4226 * of the normal safe-encoding path.
4227 */
4228 if (cstate->encoding_embeds_ascii)
4229 {
4230 start = ptr;
4231 while ((c = *ptr) != '\0')
4232 {
4233 if ((unsigned char) c < (unsigned char) 0x20)
4234 {
4235 /*
4236 * \r and \n must be escaped, the others are traditional. We
4237 * prefer to dump these using the C-like notation, rather than
4238 * a backslash and the literal character, because it makes the
4239 * dump file a bit more proof against Microsoftish data
4240 * mangling.
4241 */
4242 switch (c)
4243 {
4244 case '\b':
4245 c = 'b';
4246 break;
4247 case '\f':
4248 c = 'f';
4249 break;
4250 case '\n':
4251 c = 'n';
4252 break;
4253 case '\r':
4254 c = 'r';
4255 break;
4256 case '\t':
4257 c = 't';
4258 break;
4259 case '\v':
4260 c = 'v';
4261 break;
4262 default:
4263 /* If it's the delimiter, must backslash it */
4264 if (c == delimc)
4265 break;
4266 /* All ASCII control chars are length 1 */
4267 ptr++;
4268 continue; /* fall to end of loop */
4269 }
4270 /* if we get here, we need to convert the control char */
4271 DUMPSOFAR();
4272 CopySendChar(cstate, '\\');
4273 CopySendChar(cstate, c);
4274 start = ++ptr; /* do not include char in next run */
4275 }
4276 else if (c == '\\' || c == delimc)
4277 {
4278 DUMPSOFAR();
4279 CopySendChar(cstate, '\\');
4280 start = ptr++; /* we include char in next run */
4281 }
4282 else if (IS_HIGHBIT_SET(c))
4283 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4284 else
4285 ptr++;
4286 }
4287 }
4288 else
4289 {
4290 start = ptr;
4291 while ((c = *ptr) != '\0')
4292 {
4293 if ((unsigned char) c < (unsigned char) 0x20)
4294 {
4295 /*
4296 * \r and \n must be escaped, the others are traditional. We
4297 * prefer to dump these using the C-like notation, rather than
4298 * a backslash and the literal character, because it makes the
4299 * dump file a bit more proof against Microsoftish data
4300 * mangling.
4301 */
4302 switch (c)
4303 {
4304 case '\b':
4305 c = 'b';
4306 break;
4307 case '\f':
4308 c = 'f';
4309 break;
4310 case '\n':
4311 c = 'n';
4312 break;
4313 case '\r':
4314 c = 'r';
4315 break;
4316 case '\t':
4317 c = 't';
4318 break;
4319 case '\v':
4320 c = 'v';
4321 break;
4322 default:
4323 /* If it's the delimiter, must backslash it */
4324 if (c == delimc)
4325 break;
4326 /* All ASCII control chars are length 1 */
4327 ptr++;
4328 continue; /* fall to end of loop */
4329 }
4330 /* if we get here, we need to convert the control char */
4331 DUMPSOFAR();
4332 CopySendChar(cstate, '\\');
4333 CopySendChar(cstate, c);
4334 start = ++ptr; /* do not include char in next run */
4335 }
4336 else if (c == '\\' || c == delimc)
4337 {
4338 DUMPSOFAR();
4339 CopySendChar(cstate, '\\');
4340 start = ptr++; /* we include char in next run */
4341 }
4342 else
4343 ptr++;
4344 }
4345 }
4346
4347 DUMPSOFAR();
4348 }
4349
4350 /*
4351 * Send text representation of one attribute, with conversion and
4352 * CSV-style escaping
4353 */
4354 static void
CopyAttributeOutCSV(CopyState cstate,char * string,bool use_quote,bool single_attr)4355 CopyAttributeOutCSV(CopyState cstate, char *string,
4356 bool use_quote, bool single_attr)
4357 {
4358 char *ptr;
4359 char *start;
4360 char c;
4361 char delimc = cstate->delim[0];
4362 char quotec = cstate->quote[0];
4363 char escapec = cstate->escape[0];
4364
4365 /* force quoting if it matches null_print (before conversion!) */
4366 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4367 use_quote = true;
4368
4369 if (cstate->need_transcoding)
4370 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4371 else
4372 ptr = string;
4373
4374 /*
4375 * Make a preliminary pass to discover if it needs quoting
4376 */
4377 if (!use_quote)
4378 {
4379 /*
4380 * Because '\.' can be a data value, quote it if it appears alone on a
4381 * line so it is not interpreted as the end-of-data marker.
4382 */
4383 if (single_attr && strcmp(ptr, "\\.") == 0)
4384 use_quote = true;
4385 else
4386 {
4387 char *tptr = ptr;
4388
4389 while ((c = *tptr) != '\0')
4390 {
4391 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4392 {
4393 use_quote = true;
4394 break;
4395 }
4396 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4397 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4398 else
4399 tptr++;
4400 }
4401 }
4402 }
4403
4404 if (use_quote)
4405 {
4406 CopySendChar(cstate, quotec);
4407
4408 /*
4409 * We adopt the same optimization strategy as in CopyAttributeOutText
4410 */
4411 start = ptr;
4412 while ((c = *ptr) != '\0')
4413 {
4414 if (c == quotec || c == escapec)
4415 {
4416 DUMPSOFAR();
4417 CopySendChar(cstate, escapec);
4418 start = ptr; /* we include char in next run */
4419 }
4420 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4421 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4422 else
4423 ptr++;
4424 }
4425 DUMPSOFAR();
4426
4427 CopySendChar(cstate, quotec);
4428 }
4429 else
4430 {
4431 /* If it doesn't need quoting, we can just dump it as-is */
4432 CopySendString(cstate, ptr);
4433 }
4434 }
4435
4436 /*
4437 * CopyGetAttnums - build an integer list of attnums to be copied
4438 *
4439 * The input attnamelist is either the user-specified column list,
4440 * or NIL if there was none (in which case we want all the non-dropped
4441 * columns).
4442 *
4443 * rel can be NULL ... it's only used for error reports.
4444 */
4445 static List *
CopyGetAttnums(TupleDesc tupDesc,Relation rel,List * attnamelist)4446 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4447 {
4448 List *attnums = NIL;
4449
4450 if (attnamelist == NIL)
4451 {
4452 /* Generate default column list */
4453 Form_pg_attribute *attr = tupDesc->attrs;
4454 int attr_count = tupDesc->natts;
4455 int i;
4456
4457 for (i = 0; i < attr_count; i++)
4458 {
4459 if (attr[i]->attisdropped)
4460 continue;
4461 attnums = lappend_int(attnums, i + 1);
4462 }
4463 }
4464 else
4465 {
4466 /* Validate the user-supplied list and extract attnums */
4467 ListCell *l;
4468
4469 foreach(l, attnamelist)
4470 {
4471 char *name = strVal(lfirst(l));
4472 int attnum;
4473 int i;
4474
4475 /* Lookup column name */
4476 attnum = InvalidAttrNumber;
4477 for (i = 0; i < tupDesc->natts; i++)
4478 {
4479 if (tupDesc->attrs[i]->attisdropped)
4480 continue;
4481 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4482 {
4483 attnum = tupDesc->attrs[i]->attnum;
4484 break;
4485 }
4486 }
4487 if (attnum == InvalidAttrNumber)
4488 {
4489 if (rel != NULL)
4490 ereport(ERROR,
4491 (errcode(ERRCODE_UNDEFINED_COLUMN),
4492 errmsg("column \"%s\" of relation \"%s\" does not exist",
4493 name, RelationGetRelationName(rel))));
4494 else
4495 ereport(ERROR,
4496 (errcode(ERRCODE_UNDEFINED_COLUMN),
4497 errmsg("column \"%s\" does not exist",
4498 name)));
4499 }
4500 /* Check for duplicates */
4501 if (list_member_int(attnums, attnum))
4502 ereport(ERROR,
4503 (errcode(ERRCODE_DUPLICATE_COLUMN),
4504 errmsg("column \"%s\" specified more than once",
4505 name)));
4506 attnums = lappend_int(attnums, attnum);
4507 }
4508 }
4509
4510 return attnums;
4511 }
4512
4513
4514 /*
4515 * copy_dest_startup --- executor startup
4516 */
4517 static void
copy_dest_startup(DestReceiver * self,int operation,TupleDesc typeinfo)4518 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4519 {
4520 /* no-op */
4521 }
4522
4523 /*
4524 * copy_dest_receive --- receive one tuple
4525 */
4526 static bool
copy_dest_receive(TupleTableSlot * slot,DestReceiver * self)4527 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4528 {
4529 DR_copy *myState = (DR_copy *) self;
4530 CopyState cstate = myState->cstate;
4531
4532 /* Make sure the tuple is fully deconstructed */
4533 slot_getallattrs(slot);
4534
4535 /* And send the data */
4536 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4537 myState->processed++;
4538
4539 return true;
4540 }
4541
4542 /*
4543 * copy_dest_shutdown --- executor end
4544 */
4545 static void
copy_dest_shutdown(DestReceiver * self)4546 copy_dest_shutdown(DestReceiver *self)
4547 {
4548 /* no-op */
4549 }
4550
4551 /*
4552 * copy_dest_destroy --- release DestReceiver object
4553 */
4554 static void
copy_dest_destroy(DestReceiver * self)4555 copy_dest_destroy(DestReceiver *self)
4556 {
4557 pfree(self);
4558 }
4559
4560 /*
4561 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4562 */
4563 DestReceiver *
CreateCopyDestReceiver(void)4564 CreateCopyDestReceiver(void)
4565 {
4566 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4567
4568 self->pub.receiveSlot = copy_dest_receive;
4569 self->pub.rStartup = copy_dest_startup;
4570 self->pub.rShutdown = copy_dest_shutdown;
4571 self->pub.rDestroy = copy_dest_destroy;
4572 self->pub.mydest = DestCopyOut;
4573
4574 self->cstate = NULL; /* will be set later */
4575 self->processed = 0;
4576
4577 return (DestReceiver *) self;
4578 }
4579