/*------------------------------------------------------------------------- * * copy.c * Implements the COPY utility command * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/commands/copy.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include "access/heapam.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" #include "executor/tuptable.h" #include "foreign/fdwapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/clauses.h" #include "optimizer/planner.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/partcache.h" #include "utils/portal.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/snapmgr.h" #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') /* * Represents the different source/dest cases we need to worry about at * the bottom level */ typedef enum CopyDest { COPY_FILE, /* to/from file (or a piped program) */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ COPY_CALLBACK /* to/from callback function */ } CopyDest; /* * Represents the end-of-line terminator type of the input */ typedef enum EolType { EOL_UNKNOWN, EOL_NL, EOL_CR, EOL_CRNL } EolType; /* * Represents the heap insert method to be used during COPY FROM. */ typedef enum CopyInsertMethod { CIM_SINGLE, /* use heap_insert or fdw routine */ CIM_MULTI, /* always use heap_multi_insert */ CIM_MULTI_CONDITIONAL /* use heap_multi_insert only if valid */ } CopyInsertMethod; /* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, * even though some fields are used in only some cases. * * Multi-byte encodings: all supported client-side encodings encode multi-byte * characters by having the first byte's high bit set. Subsequent bytes of the * character can have the high bit not set. When scanning data in such an * encoding to look for a match to a single-byte (ie ASCII) character, we must * use the full pg_encoding_mblen() machinery to skip over multibyte * characters, else we might find a false match to a trailing byte. In * supported server encodings, there is no possibility of a false match, and * it's faster to make useless comparisons to trailing bytes than it is to * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true * when we have to do it the hard way. */ typedef struct CopyStateData { /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ bool is_copy_from; /* COPY TO, or COPY FROM? */ bool reached_eof; /* true if we read to end of copy data (not * all copy_dest types maintain this) */ EolType eol_type; /* EOL type of input */ int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ bool header_line; /* CSV header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ char *null_print_client; /* same converted to file encoding */ char *delim; /* column delimiter (must be 1 byte) */ char *quote; /* CSV quote char (must be 1 byte) */ char *escape; /* CSV escape char (must be 1 byte) */ List *force_quote; /* list of column names */ bool force_quote_all; /* FORCE_QUOTE *? */ bool *force_quote_flags; /* per-column CSV FQ flags */ List *force_notnull; /* list of column names */ bool *force_notnull_flags; /* per-column CSV FNN flags */ List *force_null; /* list of column names */ bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ uint64 cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ /* * Working state for COPY TO/FROM */ MemoryContext copycontext; /* per-copy execution context */ /* * Working state for COPY TO */ FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ /* * Working state for COPY FROM */ AttrNumber num_defaults; FmgrInfo oid_in_function; Oid oid_typioparam; FmgrInfo *in_functions; /* array of input functions for each attrs */ Oid *typioparams; /* array of element types for in_functions */ int *defmap; /* array of default att numbers */ ExprState **defexprs; /* array of default att expressions */ bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; TransitionCaptureState *transition_capture; /* * These variables are used to reduce overhead in textual COPY FROM. * * attribute_buf holds the separated, de-escaped text for each field of * the current line. The CopyReadAttributes functions return arrays of * pointers into this buffer. We avoid palloc/pfree overhead by re-using * the buffer on each cycle. */ StringInfoData attribute_buf; /* field raw data pointers found by COPY FROM */ int max_fields; char **raw_fields; /* * Similarly, line_buf holds the whole input line being processed. The * input cycle is first to read the whole line into line_buf, convert it * to server encoding there, and then extract the individual attribute * fields into attribute_buf. line_buf is preserved unmodified so that we * can display it in error messages if appropriate. */ StringInfoData line_buf; bool line_buf_converted; /* converted to server encoding? */ bool line_buf_valid; /* contains the row being processed? */ /* * Finally, raw_buf holds raw data read from the data source (file or * client connection). CopyReadLine parses this data sufficiently to * locate line boundaries, then transfers the data to line_buf and * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */ #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ } CopyStateData; /* DestReceiver for COPY (query) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ uint64 processed; /* # of tuples processed */ } DR_copy; /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid * function call overhead in tight COPY loops. * * We must use "if (1)" because the usual "do {...} while(0)" wrapper would * prevent the continue/break processing from working. We end the "if (1)" * with "else ((void) 0)" to ensure the "if" does not unintentionally match * any "else" in the calling code, and to avoid any compiler warnings about * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. */ /* * This keeps the character read at the top of the loop in the buffer * even if there is more than one read-ahead. */ #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ if (1) \ { \ if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ { \ raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ need_data = true; \ continue; \ } \ } else ((void) 0) /* This consumes the remainder of the buffer and breaks */ #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ if (1) \ { \ if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ { \ if (extralen) \ raw_buf_ptr = copy_buf_len; /* consume the partial character */ \ /* backslash just before EOF, treat as data char */ \ result = true; \ break; \ } \ } else ((void) 0) /* * Transfer any approved data to line_buf; must do this to be sure * there is some room in raw_buf. */ #define REFILL_LINEBUF \ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ appendBinaryStringInfo(&cstate->line_buf, \ cstate->raw_buf + cstate->raw_buf_index, \ raw_buf_ptr - cstate->raw_buf_index); \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) /* Undo any read-ahead and jump out of the block. */ #define NO_END_OF_COPY_GOTO \ if (1) \ { \ raw_buf_ptr = prev_raw_ptr + 1; \ goto not_end_of_copy; \ } else ((void) 0) static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options); static void EndCopy(CopyState cstate); static void ClosePipeToProgram(CopyState cstate); static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options); static void EndCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls); static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, uint64 firstBufferedLineNo); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); static int CopyReadAttributesCSV(CopyState cstate); static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); static void CopyAttributeOutText(CopyState cstate, char *string); static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr); static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static char *limit_printout_length(const char *str); /* Low-level communications functions */ static void SendCopyBegin(CopyState cstate); static void ReceiveCopyBegin(CopyState cstate); static void SendCopyEnd(CopyState cstate); static void CopySendData(CopyState cstate, const void *databuf, int datasize); static void CopySendString(CopyState cstate, const char *str); static void CopySendChar(CopyState cstate, char c); static void CopySendEndOfRow(CopyState cstate); static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. */ static void SendCopyBegin(CopyState cstate) { if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint16(&buf, natts); for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; } else { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('H'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } } static void ReceiveCopyBegin(CopyState cstate) { if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint16(&buf, natts); for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); /* any error in old protocol will make us lose sync */ pq_startmsgread(); cstate->copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); } static void SendCopyEnd(CopyState cstate) { if (cstate->copy_dest == COPY_NEW_FE) { /* Shouldn't have any unsent data */ Assert(cstate->fe_msgbuf->len == 0); /* Send Copy Done message */ pq_putemptymessage('c'); } else { CopySendData(cstate, "\\.", 2); /* Need to flush out the trailer (this also appends a newline) */ CopySendEndOfRow(cstate); pq_endcopyout(false); } } /*---------- * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters * CopySendEndOfRow does the appropriate thing at end of each data row * (data is not actually flushed except by CopySendEndOfRow) * * NB: no data conversion is applied by these functions *---------- */ static void CopySendData(CopyState cstate, const void *databuf, int datasize) { appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); } static void CopySendString(CopyState cstate, const char *str) { appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); } static void CopySendChar(CopyState cstate, char c) { appendStringInfoCharMacro(cstate->fe_msgbuf, c); } static void CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; switch (cstate->copy_dest) { case COPY_FILE: if (!cstate->binary) { /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); #else CopySendString(cstate, "\r\n"); #endif } if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) { if (cstate->is_program) { if (errno == EPIPE) { /* * The pipe will be closed automatically on error at * the end of transaction, but we might get a better * error message from the subprocess' exit code than * just "Broken Pipe" */ ClosePipeToProgram(cstate); /* * If ClosePipeToProgram() didn't throw an error, the * program terminated normally, but closed the pipe * first. Restore errno, and throw an error. */ errno = EPIPE; } ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY program: %m"))); } else ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection lost during COPY to stdout"))); } break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: Assert(false); /* Not yet supported. */ break; } resetStringInfo(fe_msgbuf); } /* * CopyGetData reads data from the source (file or frontend) * * We attempt to read at least minread, and at most maxread, bytes from * the source. The actual number of bytes read is returned; if this is * less than minread, EOF was detected. * * Note: when copying from the frontend, we expect a proper EOF mark per * protocol; if the frontend simply drops the connection, we raise error. * It seems unwise to allow the COPY IN to complete normally in that case. * * NB: no data conversion is applied here. */ static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { int bytesread = 0; switch (cstate->copy_dest) { case COPY_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from COPY file: %m"))); if (bytesread == 0) cstate->reached_eof = true; break; case COPY_OLD_FE: /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating * the COPY stream from following data. This is slow, but not any * slower than the code path was originally, and we don't care * much anymore about the performance of old protocol. */ if (pq_getbytes((char *) databuf, minread)) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); } bytesread = minread; break; case COPY_NEW_FE: while (maxread > 0 && bytesread < minread && !cstate->reached_eof) { int avail; while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) { /* Try to receive another message */ int mtype; readmessage: HOLD_CANCEL_INTERRUPTS(); pq_startmsgread(); mtype = pq_getbyte(); if (mtype == EOF) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); if (pq_getmessage(cstate->fe_msgbuf, 0)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); RESUME_CANCEL_INTERRUPTS(); switch (mtype) { case 'd': /* CopyData */ break; case 'c': /* CopyDone */ /* COPY IN correctly terminated by frontend */ cstate->reached_eof = true; return bytesread; case 'f': /* CopyFail */ ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", pq_getmsgstring(cstate->fe_msgbuf)))); break; case 'H': /* Flush */ case 'S': /* Sync */ /* * Ignore Flush/Sync for the convenience of client * libraries (such as libpq) that may send those * without noticing that the command they just * sent was COPY. */ goto readmessage; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type 0x%02X during COPY from stdin", mtype))); break; } } avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; if (avail > maxread) avail = maxread; pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); databuf = (void *) ((char *) databuf + avail); maxread -= avail; bytesread += avail; } break; case COPY_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } return bytesread; } /* * These functions do apply some data conversion */ /* * CopySendInt32 sends an int32 in network byte order */ static void CopySendInt32(CopyState cstate, int32 val) { uint32 buf; buf = pg_hton32((uint32) val); CopySendData(cstate, &buf, sizeof(buf)); } /* * CopyGetInt32 reads an int32 that appears in network byte order * * Returns true if OK, false if EOF */ static bool CopyGetInt32(CopyState cstate, int32 *val) { uint32 buf; if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int32) pg_ntoh32(buf); return true; } /* * CopySendInt16 sends an int16 in network byte order */ static void CopySendInt16(CopyState cstate, int16 val) { uint16 buf; buf = pg_hton16((uint16) val); CopySendData(cstate, &buf, sizeof(buf)); } /* * CopyGetInt16 reads an int16 that appears in network byte order */ static bool CopyGetInt16(CopyState cstate, int16 *val) { uint16 buf; if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int16) pg_ntoh16(buf); return true; } /* * CopyLoadRawBuf loads some more data into raw_buf * * Returns true if able to obtain at least one more byte, else false. * * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred * down to the start of the buffer and then we load more data after that. * This case is used only when a frontend multibyte character crosses a * bufferload boundary. */ static bool CopyLoadRawBuf(CopyState cstate) { int nbytes; int inbytes; if (cstate->raw_buf_index < cstate->raw_buf_len) { /* Copy down the unprocessed data */ nbytes = cstate->raw_buf_len - cstate->raw_buf_index; memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); } else nbytes = 0; /* no data need be saved */ inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, 1, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; return (inbytes > 0); } /* * DoCopy executes the SQL COPY statement * * Either unload or reload contents of table , depending on . * ( = true means we are inserting into the table.) In the "TO" case * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE * or DELETE query. * * If is false, transfer is between the table and the file named * . Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Do not allow a Postgres user without the 'pg_access_server_files' role to * read from or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed) { CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); Relation rel; Oid relid; RawStmt *query = NULL; /* * Disallow COPY to/from file or program except to users with the * appropriate role. */ if (!pipe) { if (stmt->is_program) { if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } else { if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } } if (stmt->relation) { LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock; RangeTblEntry *rte; TupleDesc tupDesc; List *attnums; ListCell *cur; Assert(!stmt->query); /* Open and lock the relation, using the appropriate lock type. */ rel = heap_openrv(stmt->relation, lockmode); relid = RelationGetRelid(rel); rte = addRangeTableEntryForRelation(pstate, rel, lockmode, NULL, false, false); rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT); tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); foreach(cur, attnums) { int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; if (is_from) rte->insertedCols = bms_add_member(rte->insertedCols, attno); else rte->selectedCols = bms_add_member(rte->selectedCols, attno); } ExecCheckRTPerms(pstate->p_rtable, true); /* * Permission check for row security policies. * * check_enable_rls will ereport(ERROR) if the user has requested * something invalid and will otherwise indicate if we should enable * RLS (returns RLS_ENABLED) or not for this COPY statement. * * If the relation has a row security policy and we are to apply it * then perform a "query" copy and allow the normal query processing * to handle the policies. * * If RLS is not enabled for this, then just fall through to the * normal non-filtering relation handling. */ if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED) { SelectStmt *select; ColumnRef *cr; ResTarget *target; RangeVar *from; List *targetList = NIL; if (is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY FROM not supported with row-level security"), errhint("Use INSERT statements instead."))); /* * Build target list * * If no columns are specified in the attribute list of the COPY * command, then the target list is 'all' columns. Therefore, '*' * should be used as the target list for the resulting SELECT * statement. * * In the case that columns are specified in the attribute list, * create a ColumnRef and ResTarget for each column and add them * to the target list for the resulting SELECT statement. */ if (!stmt->attlist) { cr = makeNode(ColumnRef); cr->fields = list_make1(makeNode(A_Star)); cr->location = -1; target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; targetList = list_make1(target); } else { ListCell *lc; foreach(lc, stmt->attlist) { /* * Build the ColumnRef for each column. The ColumnRef * 'fields' property is a String 'Value' node (see * nodes/value.h) that corresponds to the column name * respectively. */ cr = makeNode(ColumnRef); cr->fields = list_make1(lfirst(lc)); cr->location = -1; /* Build the ResTarget and add the ColumnRef to it. */ target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; /* Add each column to the SELECT statement's target list */ targetList = lappend(targetList, target); } } /* * Build RangeVar for from clause, fully qualified based on the * relation which we have opened and locked. */ from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), pstrdup(RelationGetRelationName(rel)), -1); /* Build query */ select = makeNode(SelectStmt); select->targetList = targetList; select->fromClause = list_make1(from); query = makeNode(RawStmt); query->stmt = (Node *) select; query->stmt_location = stmt_location; query->stmt_len = stmt_len; /* * Close the relation for now, but keep the lock on it to prevent * changes between now and when we start the query-based COPY. * * We'll reopen it later as part of the query-based COPY. */ heap_close(rel, NoLock); rel = NULL; } } else { Assert(stmt->query); query = makeNode(RawStmt); query->stmt = stmt->query; query->stmt_location = stmt_location; query->stmt_len = stmt_len; relid = InvalidOid; rel = NULL; } if (is_from) { Assert(rel); /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } /* * Close the relation. If reading, we can release the AccessShareLock we * got; if writing, we should hold the lock until end of transaction to * ensure that updates will be committed before lock is released. */ if (rel != NULL) heap_close(rel, (is_from ? NoLock : AccessShareLock)); } /* * Process the statement option list for COPY. * * Scan the options list (a list of DefElem) and transpose the information * into cstate, applying appropriate error checking. * * cstate is assumed to be filled with zeroes initially. * * This is exported so that external users of the COPY API can sanity-check * a list of options. In that usage, cstate should be passed as NULL * (since external users don't know sizeof(CopyStateData)) and the collected * data is just leaked until CurrentMemoryContext is reset. * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for * self-consistency of the options list. */ void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options) { bool format_specified = false; ListCell *option; /* Support external use for option sanity checking */ if (cstate == NULL) cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); cstate->is_copy_from = is_from; cstate->file_encoding = -1; /* Extract options from the statement node tree */ foreach(option, options) { DefElem *defel = lfirst_node(DefElem, option); if (strcmp(defel->defname, "format") == 0) { char *fmt = defGetString(defel); if (format_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); format_specified = true; if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) cstate->csv_mode = true; else if (strcmp(fmt, "binary") == 0) cstate->binary = true; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "freeze") == 0) { if (cstate->freeze) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->freeze = defGetBoolean(defel); } else if (strcmp(defel->defname, "delimiter") == 0) { if (cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->delim = defGetString(defel); } else if (strcmp(defel->defname, "null") == 0) { if (cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->null_print = defGetString(defel); } else if (strcmp(defel->defname, "header") == 0) { if (cstate->header_line) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->header_line = defGetBoolean(defel); } else if (strcmp(defel->defname, "quote") == 0) { if (cstate->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->quote = defGetString(defel); } else if (strcmp(defel->defname, "escape") == 0) { if (cstate->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->escape = defGetString(defel); } else if (strcmp(defel->defname, "force_quote") == 0) { if (cstate->force_quote || cstate->force_quote_all) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, A_Star)) cstate->force_quote_all = true; else if (defel->arg && IsA(defel->arg, List)) cstate->force_quote = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_not_null") == 0) { if (cstate->force_notnull) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, List)) cstate->force_notnull = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_null") == 0) { if (cstate->force_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); if (defel->arg && IsA(defel->arg, List)) cstate->force_null = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "convert_selectively") == 0) { /* * Undocumented, not-accessible-from-SQL option: convert only the * named columns to binary form, storing the rest as NULLs. It's * allowed for the column list to be NIL. */ if (cstate->convert_selectively) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->convert_selectively = true; if (defel->arg == NULL || IsA(defel->arg, List)) cstate->convert_select = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "encoding") == 0) { if (cstate->file_encoding >= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->file_encoding = pg_char_to_encoding(defGetString(defel)); if (cstate->file_encoding < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a valid encoding name", defel->defname), parser_errposition(pstate, defel->location))); } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", defel->defname), parser_errposition(pstate, defel->location))); } /* * Check for incompatible options (must do these two before inserting * defaults) */ if (cstate->binary && cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DELIMITER in BINARY mode"))); if (cstate->binary && cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); /* Set defaults for omitted options */ if (!cstate->delim) cstate->delim = cstate->csv_mode ? "," : "\t"; if (!cstate->null_print) cstate->null_print = cstate->csv_mode ? "" : "\\N"; cstate->null_print_len = strlen(cstate->null_print); if (cstate->csv_mode) { if (!cstate->quote) cstate->quote = "\""; if (!cstate->escape) cstate->escape = cstate->quote; } /* Only single-byte delimiter strings are supported. */ if (strlen(cstate->delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character"))); /* Disallow end-of-line characters */ if (strchr(cstate->delim, '\r') != NULL || strchr(cstate->delim, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be newline or carriage return"))); if (strchr(cstate->null_print, '\r') != NULL || strchr(cstate->null_print, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY null representation cannot use newline or carriage return"))); /* * Disallow unsafe delimiter characters in non-CSV mode. We can't allow * backslash because it would be ambiguous. We can't allow the other * cases because data characters matching the delimiter must be * backslashed, and certain backslash combinations are interpreted * non-literally by COPY IN. Disallowing all lower case ASCII letters is * more than strictly necessary, but seems best for consistency and * future-proofing. Likewise we disallow all digits though only octal * digits are actually dangerous. */ if (!cstate->csv_mode && strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", cstate->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be \"%s\"", cstate->delim))); /* Check header */ if (!cstate->csv_mode && cstate->header_line) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY HEADER available only in CSV mode"))); /* Check quote */ if (!cstate->csv_mode && cstate->quote != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote available only in CSV mode"))); if (cstate->csv_mode && strlen(cstate->quote) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single one-byte character"))); if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0]) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter and quote must be different"))); /* Check escape */ if (!cstate->csv_mode && cstate->escape != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape available only in CSV mode"))); if (cstate->csv_mode && strlen(cstate->escape) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape must be a single one-byte character"))); /* Check force_quote */ if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote available only in CSV mode"))); if ((cstate->force_quote || cstate->force_quote_all) && is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote only available using COPY TO"))); /* Check force_notnull */ if (!cstate->csv_mode && cstate->force_notnull != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null available only in CSV mode"))); if (cstate->force_notnull != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); /* Check force_null */ if (!cstate->csv_mode && cstate->force_null != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null available only in CSV mode"))); if (cstate->force_null != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null only available using COPY FROM"))); /* Don't allow the delimiter to appear in the null string. */ if (strchr(cstate->null_print, cstate->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must not appear in the NULL specification"))); /* Don't allow the CSV quote char to appear in the null string. */ if (cstate->csv_mode && strchr(cstate->null_print, cstate->quote[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); } /* * Common setup routines used by BeginCopyFrom and BeginCopyTo. * * Iff , unload or reload in the binary format, as opposed to the * more wasteful but more robust and portable text format. * * Iff , unload or reload the format that includes OID information. * On input, we accept OIDs whether or not the table has an OID column, * but silently drop them if it does not. On output, we report an error * if the user asks for OIDs in a table that has none (not providing an * OID column might seem friendlier, but could seriously confuse programs). * * If in the text format, delimit columns with delimiter and print * NULL values as . */ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options) { CopyState cstate; TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); /* * We allocate everything used by a cstate in a new memory context. This * avoids memory leaks during repeated use of COPY in a query. */ cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, "COPY", ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, cstate, is_from, options); /* Process the source/target relation or query */ if (rel) { Assert(!raw_query); cstate->rel = rel; tupDesc = RelationGetDescr(cstate->rel); } else { List *rewritten; Query *query; PlannedStmt *plan; DestReceiver *dest; Assert(!is_from); cstate->rel = NULL; /* * Run parse analysis and rewrite. Note this also acquires sufficient * locks on the source table(s). * * Because the parser and planner tend to scribble on their input, we * make a preliminary copy of the source querytree. This prevents * problems in the case that the COPY is in a portal or plpgsql * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ rewritten = pg_analyze_and_rewrite(copyObject(raw_query), pstate->p_sourcetext, NULL, 0, NULL); /* check that we got back something we can work with */ if (rewritten == NIL) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); } else if (list_length(rewritten) > 1) { ListCell *lc; /* examine queries to determine which error message to issue */ foreach(lc, rewritten) { Query *q = lfirst_node(Query, lc); if (q->querySource == QSRC_QUAL_INSTEAD_RULE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("conditional DO INSTEAD rules are not supported for COPY"))); if (q->querySource == QSRC_NON_INSTEAD_RULE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("DO ALSO rules are not supported for the COPY"))); } ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); } query = linitial_node(Query, rewritten); /* The grammar allows SELECT INTO, but we don't support that */ if (query->utilityStmt != NULL && IsA(query->utilityStmt, CreateTableAsStmt)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY (SELECT INTO) is not supported"))); Assert(query->utilityStmt == NULL); /* * Similarly the grammar doesn't enforce the presence of a RETURNING * clause, but this is required here. */ if (query->commandType != CMD_SELECT && query->returningList == NIL) { Assert(query->commandType == CMD_INSERT || query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY query must have a RETURNING clause"))); } /* plan the query */ plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL); /* * With row level security and a user using "COPY relation TO", we * have to convert the "COPY relation TO" to a query-based COPY (eg: * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add * in any RLS clauses. * * When this happens, we are passed in the relid of the originally * found relation (which we have locked). As the planner will look up * the relation again, we double-check here to make sure it found the * same one that we have locked. */ if (queryRelId != InvalidOid) { /* * Note that with RLS involved there may be multiple relations, * and while the one we need is almost certainly first, we don't * make any guarantees of that in the planner, so check the whole * list and make sure we find the original relation. */ if (!list_member_oid(plan->relationOids, queryRelId)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("relation referenced by COPY statement has changed"))); } /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. */ PushCopiedSnapshot(GetActiveSnapshot()); UpdateActiveSnapshotCommandId(); /* Create dest receiver for COPY OUT */ dest = CreateDestReceiver(DestCopyOut); ((DR_copy *) dest)->cstate = cstate; /* Create a QueryDesc requesting no output */ cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, GetActiveSnapshot(), InvalidSnapshot, dest, NULL, NULL, 0); /* * Call ExecutorStart to prepare the plan for execution. * * ExecutorStart computes a result tupdesc for us */ ExecutorStart(cstate->queryDesc, 0); tupDesc = cstate->queryDesc->tupDesc; } /* Generate or convert list of attributes to process */ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); num_phys_attrs = tupDesc->natts; /* Convert FORCE_QUOTE name list to per-column flags, check validity */ cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_quote_all) { int i; for (i = 0; i < num_phys_attrs; i++) cstate->force_quote_flags[i] = true; } else if (cstate->force_quote) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_quote_flags[attnum - 1] = true; } } /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_notnull) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_notnull_flags[attnum - 1] = true; } } /* Convert FORCE_NULL name list to per-column flags, check validity */ cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_null) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_NULL column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_null_flags[attnum - 1] = true; } } /* Convert convert_selectively name list to per-column flags */ if (cstate->convert_selectively) { List *attnums; ListCell *cur; cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg_internal("selected column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->convert_select_flags[attnum - 1] = true; } } /* Use client encoding when ENCODING option is not specified. */ if (cstate->file_encoding < 0) cstate->file_encoding = pg_get_client_encoding(); /* * Set up encoding conversion info. Even if the file and server encodings * are the same, we must apply pg_any_to_server() to validate data in * multibyte encodings. */ cstate->need_transcoding = (cstate->file_encoding != GetDatabaseEncoding() || pg_database_encoding_max_length() > 1); /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); return cstate; } /* * Closes the pipe to an external program, checking the pclose() return code. */ static void ClosePipeToProgram(CopyState cstate) { int pclose_rc; Assert(cstate->is_program); pclose_rc = ClosePipeStream(cstate->copy_file); if (pclose_rc == -1) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close pipe to external command: %m"))); else if (pclose_rc != 0) { /* * If we ended a COPY FROM PROGRAM before reaching EOF, then it's * expectable for the called program to fail with SIGPIPE, and we * should not report that as an error. Otherwise, SIGPIPE indicates a * problem. */ if (cstate->is_copy_from && !cstate->reached_eof && wait_result_is_signal(pclose_rc, SIGPIPE)) return; ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("program \"%s\" failed", cstate->filename), errdetail_internal("%s", wait_result_to_str(pclose_rc)))); } } /* * Release resources allocated in a cstate for COPY TO/FROM. */ static void EndCopy(CopyState cstate) { if (cstate->is_program) { ClosePipeToProgram(cstate); } else { if (cstate->filename != NULL && FreeFile(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); } MemoryContextDelete(cstate->copycontext); pfree(cstate); } /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options) { CopyState cstate; bool pipe = (filename == NULL); MemoryContext oldcontext; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from view \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from materialized view \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from foreign table \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(rel)))); else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from partitioned table \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from non-table relation \"%s\"", RelationGetRelationName(rel)))); } cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); if (pipe) { Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else { cstate->filename = pstrdup(filename); cstate->is_program = is_program; if (is_program) { cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not execute command \"%s\": %m", cstate->filename))); } else { mode_t oumask; /* Pre-existing umask value */ struct stat st; /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... */ if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask(S_IWGRP | S_IWOTH); PG_TRY(); { cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); } PG_CATCH(); { umask(oumask); PG_RE_THROW(); } PG_END_TRY(); umask(oumask); if (cstate->copy_file == NULL) { /* copy errno because ereport subfunctions might change it */ int save_errno = errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", cstate->filename), (save_errno == ENOENT || save_errno == EACCES) ? errhint("COPY TO instructs the PostgreSQL server process to write a file. " "You may want a client-side facility such as psql's \\copy.") : 0)); } if (fstat(fileno(cstate->copy_file), &st)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", cstate->filename))); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); } } MemoryContextSwitchTo(oldcontext); return cstate; } /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ static uint64 DoCopyTo(CopyState cstate) { bool pipe = (cstate->filename == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); uint64 processed; PG_TRY(); { if (fe_copy) SendCopyBegin(cstate); processed = CopyTo(cstate); if (fe_copy) SendCopyEnd(cstate); } PG_CATCH(); { /* * Make sure we turn off old-style COPY OUT mode upon error. It is * okay to do this in all cases, since it does nothing if the mode is * not on. */ pq_endcopyout(true); PG_RE_THROW(); } PG_END_TRY(); return processed; } /* * Clean up storage and release resources for COPY TO. */ static void EndCopyTo(CopyState cstate) { if (cstate->queryDesc != NULL) { /* Close down the query and free resources. */ ExecutorFinish(cstate->queryDesc); ExecutorEnd(cstate->queryDesc); FreeQueryDesc(cstate->queryDesc); PopActiveSnapshot(); } /* Clean up storage */ EndCopy(cstate); } /* * Copy from relation or query TO file. */ static uint64 CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; num_phys_attrs = tupDesc->natts; cstate->null_print_client = cstate->null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); /* Get info about the columns we need to process. */ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Oid out_func_oid; bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->binary) getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); else getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); } /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside * datatype output routines, and should be faster than retail pfree's * anyway. (We don't need a whole econtext as CopyFrom does.) */ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", ALLOCSET_DEFAULT_SIZES); if (cstate->binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData(cstate, BinarySignature, 11); /* Flags field */ tmp = 0; CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); } else { /* * For non-binary copy, we need to convert null_print to file * encoding, because it will be sent directly with CopySendString. */ if (cstate->need_transcoding) cstate->null_print_client = pg_server_to_any(cstate->null_print, cstate->null_print_len, cstate->file_encoding); /* if a header has been requested send the line */ if (cstate->header_line) { bool hdr_delim = false; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); char *colname; if (hdr_delim) CopySendChar(cstate, cstate->delim[0]); hdr_delim = true; colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); CopyAttributeOutCSV(cstate, colname, false, list_length(cstate->attnumlist) == 1); } CopySendEndOfRow(cstate); } } if (cstate->rel) { Datum *values; bool *nulls; HeapScanDesc scandesc; HeapTuple tuple; values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); processed = 0; while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { CHECK_FOR_INTERRUPTS(); /* Deconstruct the tuple ... faster than repeated heap_getattr */ heap_deform_tuple(tuple, tupDesc, values, nulls); /* Format and send the data */ CopyOneRowTo(cstate, values, nulls); processed++; } heap_endscan(scandesc); pfree(values); pfree(nulls); } else { /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } if (cstate->binary) { /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } MemoryContextDelete(cstate->rowcontext); return processed; } /* * Emit one row during CopyTo(). */ static void CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls) { bool need_delim = false; FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; ListCell *cur; char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); if (cstate->binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Datum value = values[attnum - 1]; bool isnull = nulls[attnum - 1]; if (!cstate->binary) { if (need_delim) CopySendChar(cstate, cstate->delim[0]); need_delim = true; } if (isnull) { if (!cstate->binary) CopySendString(cstate, cstate->null_print_client); else CopySendInt32(cstate, -1); } else { if (!cstate->binary) { string = OutputFunctionCall(&out_functions[attnum - 1], value); if (cstate->csv_mode) CopyAttributeOutCSV(cstate, string, cstate->force_quote_flags[attnum - 1], list_length(cstate->attnumlist) == 1); else CopyAttributeOutText(cstate, string); } else { bytea *outputbytes; outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); CopySendData(cstate, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); } } } CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); } /* * error context callback for COPY FROM * * The argument for the error context must be CopyState. */ void CopyFromErrorCallback(void *arg) { CopyState cstate = (CopyState) arg; char curlineno_str[32]; snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, cstate->cur_lineno); if (cstate->binary) { /* can't usefully display the data */ if (cstate->cur_attname) errcontext("COPY %s, line %s, column %s", cstate->cur_relname, curlineno_str, cstate->cur_attname); else errcontext("COPY %s, line %s", cstate->cur_relname, curlineno_str); } else { if (cstate->cur_attname && cstate->cur_attval) { /* error is relevant to a particular column */ char *attval; attval = limit_printout_length(cstate->cur_attval); errcontext("COPY %s, line %s, column %s: \"%s\"", cstate->cur_relname, curlineno_str, cstate->cur_attname, attval); pfree(attval); } else if (cstate->cur_attname) { /* error is relevant to a particular column, value is NULL */ errcontext("COPY %s, line %s, column %s: null input", cstate->cur_relname, curlineno_str, cstate->cur_attname); } else { /* * Error is relevant to a particular line. * * If line_buf still contains the correct line, and it's already * transcoded, print it. If it's still in a foreign encoding, it's * quite likely that the error is precisely a failure to do * encoding conversion (ie, bad data). We dare not try to convert * it, and at present there's no way to regurgitate it without * conversion. So we have to punt and just report the line number. */ if (cstate->line_buf_valid && (cstate->line_buf_converted || !cstate->need_transcoding)) { char *lineval; lineval = limit_printout_length(cstate->line_buf.data); errcontext("COPY %s, line %s: \"%s\"", cstate->cur_relname, curlineno_str, lineval); pfree(lineval); } else { errcontext("COPY %s, line %s", cstate->cur_relname, curlineno_str); } } } } /* * Make sure we don't print an unreasonable amount of COPY data in a message. * * It would seem a lot easier to just use the sprintf "precision" limit to * truncate the string. However, some versions of glibc have a bug/misfeature * that vsnprintf will always fail (return -1) if it is asked to truncate * a string that contains invalid byte sequences for the current encoding. * So, do our own truncation. We return a pstrdup'd copy of the input. */ static char * limit_printout_length(const char *str) { #define MAX_COPY_DATA_DISPLAY 100 int slen = strlen(str); int len; char *res; /* Fast path if definitely okay */ if (slen <= MAX_COPY_DATA_DISPLAY) return pstrdup(str); /* Apply encoding-dependent truncation */ len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); /* * Truncate, and add "..." to show we truncated the input. */ res = (char *) palloc(len + 4); memcpy(res, str, len); strcpy(res + len, "..."); return res; } /* * Copy FROM file to relation. */ uint64 CopyFrom(CopyState cstate) { HeapTuple tuple; TupleDesc tupDesc; Datum *values; bool *nulls; ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; ResultRelInfo *prevResultRelInfo = NULL; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ModifyTableState *mtstate; ExprContext *econtext; TupleTableSlot *myslot; MemoryContext oldcontext = CurrentMemoryContext; PartitionTupleRouting *proute = NULL; ExprContext *secondaryExprContext = NULL; ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; CopyInsertMethod insertMethod; uint64 processed = 0; int nBufferedTuples = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; #define MAX_BUFFERED_TUPLES 1000 #define RECHECK_MULTI_INSERT_THRESHOLD 1000 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ Size bufferedTuplesSize = 0; uint64 firstBufferedLineNo = 0; uint64 lastPartitionSampleLineNo = 0; uint64 nPartitionChanges = 0; double avgTuplesPerPartChange = 0; Assert(cstate->rel); /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only * allowed on views, so we only hint about them in the view case.) */ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && !(cstate->rel->trigdesc && cstate->rel->trigdesc->trig_insert_instead_row)) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", RelationGetRelationName(cstate->rel)), errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to materialized view \"%s\"", RelationGetRelationName(cstate->rel)))); else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", RelationGetRelationName(cstate->rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } tupDesc = RelationGetDescr(cstate->rel); /*---------- * Check to see if we can avoid writing WAL * * If archive logging/streaming is not enabled *and* either * - table was created in same transaction as this COPY * - data is being written to relfilenode created in this transaction * then we can skip writing WAL. It's safe because if the transaction * doesn't commit, we'll discard the table (or the new relfilenode file). * If it does commit, we'll have done the heap_sync at the bottom of this * routine first. * * As mentioned in comments in utils/rel.h, the in-same-transaction test * is not always set correctly, since in rare cases rd_newRelfilenodeSubid * can be cleared before the end of the transaction. The exact case is * when a relation sets a new relfilenode twice in same transaction, yet * the second one fails in an aborted subtransaction, e.g. * * BEGIN; * TRUNCATE t; * SAVEPOINT save; * TRUNCATE t; * ROLLBACK TO save; * COPY ... * * Also, if the target file is new-in-transaction, we assume that checking * FSM for free space is a waste of time, even if we must use WAL because * of archiving. This could possibly be wrong, but it's unlikely. * * The comments for heap_insert and RelationGetBufferForTuple specify that * skipping WAL logging is only safe if we ensure that our tuples do not * go into pages containing tuples from any other transactions --- but this * must be the case if we have a new table or new relfilenode, so we need * no additional work to enforce that. * * We currently don't support this optimization if the COPY target is a * partitioned table as we currently only lazily initialize partition * information when routing the first tuple to the partition. We cannot * know at this stage if we can perform this optimization. It should be * possible to improve on this, but it does mean maintaining heap insert * option flags per partition and setting them when we first open the * partition. * * This optimization is not supported for relation types which do not * have any physical storage, with foreign tables and views using * INSTEAD OF triggers entering in this category. Partitioned tables * are not supported as per the description above. *---------- */ /* createSubid is creation check, newRelfilenodeSubid is truncation check */ if (RELKIND_CAN_HAVE_STORAGE(cstate->rel->rd_rel->relkind) && (cstate->rel->rd_createSubid != InvalidSubTransactionId || cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)) { hi_options |= HEAP_INSERT_SKIP_FSM; if (!XLogIsNeeded()) hi_options |= HEAP_INSERT_SKIP_WAL; } /* * Optimize if new relfilenode was created in this subxact or one of its * committed children and we won't see those rows later as part of an * earlier scan or command. The subxact test ensures that if this subxact * aborts then the frozen rows won't be visible after xact cleanup. Note * that the stronger test of exactly which subtransaction created it is * crucial for correctness of this optimization. The test for an earlier * scan or command tolerates false negatives. FREEZE causes other sessions * to see rows they would not see under MVCC, and a false negative merely * spreads that anomaly to the current session. */ if (cstate->freeze) { /* * We currently disallow COPY FREEZE on partitioned tables. The * reason for this is that we've simply not yet opened the partitions * to determine if the optimization can be applied to them. We could * go and open them all here, but doing so may be quite a costly * overhead for small copies. In any case, we may just end up routing * tuples to a small number of partitions. It seems better just to * raise an ERROR for partitioned tables. */ if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform FREEZE on a partitioned table"))); } /* * Tolerate one registration for the benefit of FirstXactSnapshot. * Scan-bearing queries generally create at least two registrations, * though relying on that is fragile, as is ignoring ActiveSnapshot. * Clear CatalogSnapshot to avoid counting its registration. We'll * still detect ongoing catalog scans, each of which separately * registers the snapshot it uses. */ InvalidateCatalogSnapshot(); if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot perform FREEZE because of prior transaction activity"))); if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction"))); hi_options |= HEAP_INSERT_FROZEN; } /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code * here that basically duplicated execUtils.c ...) */ resultRelInfo = makeNode(ResultRelInfo); InitResultRelInfo(resultRelInfo, cstate->rel, 1, /* must match rel's position in range_table */ NULL, 0); target_resultRelInfo = resultRelInfo; /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; ExecInitRangeTable(estate, cstate->range_table); /* Set up a tuple slot too */ myslot = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsHeapTuple); /* Triggers might need a slot as well */ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL, &TTSOpsHeapTuple); /* * Set up a ModifyTableState so we can let FDW(s) init themselves for * foreign-table result relation(s). */ mtstate = makeNode(ModifyTableState); mtstate->ps.plan = NULL; mtstate->ps.state = estate; mtstate->operation = CMD_INSERT; mtstate->resultRelInfo = estate->es_result_relations; if (resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); /* * If there are any triggers with transition tables on the named relation, * we need to be prepared to capture transition tuples. * * Because partition tuple routing would like to know about whether * transition capture is active, we also set it in mtstate, which is * passed to ExecFindPartition() below. */ cstate->transition_capture = mtstate->mt_transition_capture = MakeTransitionCaptureState(cstate->rel->trigdesc, RelationGetRelid(cstate->rel), CMD_INSERT); /* * If the named relation is a partitioned table, initialize state for * CopyFrom tuple routing. */ if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) proute = ExecSetupPartitionTupleRouting(NULL, cstate->rel); /* * It's more efficient to prepare a bunch of tuples for insertion, and * insert them in one heap_multi_insert() call, than call heap_insert() * separately for every tuple. However, there are a number of reasons why * we might not be able to do this. These are explained below. */ if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) { /* * Can't support multi-inserts when there are any BEFORE/INSTEAD OF * triggers on the table. Such triggers might query the table we're * inserting into and act differently if the tuples that have already * been processed and prepared for insertion are not there. */ insertMethod = CIM_SINGLE; } else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && resultRelInfo->ri_TrigDesc->trig_insert_new_table) { /* * For partitioned tables we can't support multi-inserts when there * are any statement level insert triggers. It might be possible to * allow partitioned tables with such triggers in the future, but for * now, CopyFromInsertBatch expects that any before row insert and * statement level insert triggers are on the same relation. */ insertMethod = CIM_SINGLE; } else if (resultRelInfo->ri_FdwRoutine != NULL || cstate->volatile_defexprs) { /* * Can't support multi-inserts to foreign tables or if there are any * volatile default expressions in the table. Similarly to the * trigger case above, such expressions may query the table we're * inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the * COPY command. */ insertMethod = CIM_SINGLE; } else { /* * For partitioned tables, we may still be able to perform bulk * inserts for sets of consecutive tuples which belong to the same * partition. However, the possibility of this depends on which types * of triggers exist on the partition. We must disable bulk inserts * if the partition is a foreign table or it has any before row insert * or insert instead triggers (same as we checked above for the parent * table). Since the partition's resultRelInfos are initialized only * when we actually need to insert the first tuple into them, we must * have the intermediate insert method of CIM_MULTI_CONDITIONAL to * flag that we must later determine if we can use bulk-inserts for * the partition being inserted into. * * Normally, when performing bulk inserts we just flush the insert * buffer whenever it becomes full, but for the partitioned table * case, we flush it whenever the current tuple does not belong to the * same partition as the previous tuple, and since we flush the * previous partition's buffer once the new tuple has already been * built, we're unable to reset the estate since we'd free the memory * in which the new tuple is stored. To work around this we maintain * a secondary expression context and alternate between these when the * partition changes. This does mean we do store the first new tuple * in a different context than subsequent tuples, but that does not * matter, providing we don't free anything while it's still needed. */ if (proute) { insertMethod = CIM_MULTI_CONDITIONAL; secondaryExprContext = CreateExprContext(estate); } else insertMethod = CIM_MULTI; bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); } has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); /* * Check BEFORE STATEMENT insertion triggers. It's debatable whether we * should do this for COPY, since it's not really an "INSERT" statement as * such. However, executing these triggers maintains consistency with the * EACH ROW triggers that we already fire on COPY. */ ExecBSInsertTriggers(estate, resultRelInfo); values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); bistate = GetBulkInsertState(); econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ errcallback.callback = CopyFromErrorCallback; errcallback.arg = (void *) cstate; errcallback.previous = error_context_stack; error_context_stack = &errcallback; for (;;) { TupleTableSlot *slot; bool skip_tuple; CHECK_FOR_INTERRUPTS(); if (nBufferedTuples == 0) { /* * Reset the per-tuple exprcontext. We can only do this if the * tuple buffer is empty. (Calling the context the per-tuple * memory context is a bit of a misnomer now.) */ ResetPerTupleExprContext(estate); } /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); if (!NextCopyFrom(cstate, econtext, values, nulls)) break; /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); /* * Constraints might reference the tableoid column, so initialize * t_tableOid before evaluating them. */ tuple->t_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); /* Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); /* Place tuple in tuple slot --- but slot shouldn't free it */ slot = myslot; ExecStoreHeapTuple(tuple, slot, false); /* Determine the partition to heap_insert the tuple into */ if (proute) { TupleConversionMap *map; /* * Attempt to find a partition suitable for this tuple. * ExecFindPartition() will raise an error if none can be found or * if the found partition is not suitable for INSERTs. */ resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, proute, slot, estate); if (prevResultRelInfo != resultRelInfo) { /* Check if we can multi-insert into this partition */ if (insertMethod == CIM_MULTI_CONDITIONAL) { /* * When performing bulk-inserts into partitioned tables we * must insert the tuples seen so far to the heap whenever * the partition changes. */ if (nBufferedTuples > 0) { ExprContext *swapcontext; CopyFromInsertBatch(cstate, estate, mycid, hi_options, prevResultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); nBufferedTuples = 0; bufferedTuplesSize = 0; Assert(secondaryExprContext); /* * Normally we reset the per-tuple context whenever * the bufferedTuples array is empty at the beginning * of the loop, however, it is possible since we flush * the buffer here that the buffer is never empty at * the start of the loop. To prevent the per-tuple * context from never being reset we maintain a second * context and alternate between them when the * partition changes. We can now reset * secondaryExprContext as this is no longer needed, * since we just flushed any tuples stored in it. We * also now switch over to the other context. This * does mean that the first tuple in the buffer won't * be in the same context as the others, but that does * not matter since we only reset it after the flush. */ ReScanExprContext(secondaryExprContext); swapcontext = secondaryExprContext; secondaryExprContext = estate->es_per_tuple_exprcontext; estate->es_per_tuple_exprcontext = swapcontext; } nPartitionChanges++; /* * Here we adaptively enable multi-inserts based on the * average number of tuples from recent multi-insert * batches. We recalculate the average every * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking * the average over the whole copy. This allows us to * enable multi-inserts when we get periods in the copy * stream that have tuples commonly belonging to the same * partition, and disable when the partition is changing * too often. */ if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno - RECHECK_MULTI_INSERT_THRESHOLD) && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD)) { avgTuplesPerPartChange = (cstate->cur_lineno - lastPartitionSampleLineNo) / (double) nPartitionChanges; lastPartitionSampleLineNo = cstate->cur_lineno; nPartitionChanges = 0; } } /* Determine which triggers exist on this partition */ has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); /* * Tests have shown that using multi-inserts when the * partition changes on every tuple slightly decreases the * performance, however, there are benefits even when only * some batches have just 2 tuples, so let's enable * multi-inserts even when the average is quite low. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && avgTuplesPerPartChange >= 1.3 && !has_before_insert_row_trig && !has_instead_insert_row_trig && resultRelInfo->ri_FdwRoutine == NULL; /* * We'd better make the bulk insert mechanism gets a new * buffer when the partition being inserted into changes. */ ReleaseBulkInsertStatePin(bistate); prevResultRelInfo = resultRelInfo; } /* * For ExecInsertIndexTuples() to work on the partition's indexes */ estate->es_result_relation_info = resultRelInfo; /* * If we're capturing transition tuples, we might need to convert * from the partition rowtype to root rowtype. */ if (cstate->transition_capture != NULL) { if (has_before_insert_row_trig) { /* * If there are any BEFORE triggers on the partition, * we'll have to be ready to convert their result back to * tuplestore format. */ cstate->transition_capture->tcs_original_insert_tuple = NULL; cstate->transition_capture->tcs_map = resultRelInfo->ri_PartitionInfo->pi_PartitionToRootMap; } else { /* * Otherwise, just remember the original unconverted * tuple, to avoid a needless round trip conversion. */ cstate->transition_capture->tcs_original_insert_tuple = tuple; cstate->transition_capture->tcs_map = NULL; } } /* * We might need to convert from the root rowtype to the partition * rowtype. */ map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap; if (map != NULL) { TupleTableSlot *new_slot; MemoryContext oldcontext; new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; Assert(new_slot != NULL); slot = execute_attr_map_slot(map->attrMap, slot, new_slot); /* * Get the tuple in the per-tuple context, so that it will be * freed after each batch insert. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); tuple = ExecCopySlotHeapTuple(slot); MemoryContextSwitchTo(oldcontext); } tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } skip_tuple = false; /* BEFORE ROW INSERT Triggers */ if (has_before_insert_row_trig) { slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); if (slot == NULL) /* "do nothing" */ skip_tuple = true; else /* trigger might have changed tuple */ tuple = ExecFetchSlotHeapTuple(slot, true, NULL); } if (!skip_tuple) { if (has_instead_insert_row_trig) { /* Pass the data to the INSTEAD ROW INSERT trigger */ ExecIRInsertTriggers(estate, resultRelInfo, slot); } else { /* * If the target is a plain table, check the constraints of * the tuple. */ if (resultRelInfo->ri_FdwRoutine == NULL && resultRelInfo->ri_RelationDesc->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); /* * Also check the tuple against the partition constraint, if * there is one; except that if we got here via tuple-routing, * we don't need to if there's no BR trigger defined on the * partition. */ if (resultRelInfo->ri_PartitionCheck && (proute == NULL || has_before_insert_row_trig)) ExecPartitionCheck(resultRelInfo, slot, estate, true); /* * Perform multi-inserts when enabled, or when loading a * partitioned table that can support multi-inserts as * determined above. */ if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) { /* Add this tuple to the tuple buffer */ if (nBufferedTuples == 0) firstBufferedLineNo = cstate->cur_lineno; bufferedTuples[nBufferedTuples++] = tuple; bufferedTuplesSize += tuple->t_len; /* * If the buffer filled up, flush it. Also flush if the * total size of all the tuples in the buffer becomes * large, to avoid using large amounts of memory for the * buffer when the tuples are exceptionally wide. */ if (nBufferedTuples == MAX_BUFFERED_TUPLES || bufferedTuplesSize > 65535) { CopyFromInsertBatch(cstate, estate, mycid, hi_options, resultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); nBufferedTuples = 0; bufferedTuplesSize = 0; } } else { List *recheckIndexes = NIL; /* OK, store the tuple */ if (resultRelInfo->ri_FdwRoutine != NULL) { slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, resultRelInfo, slot, NULL); if (slot == NULL) /* "do nothing" */ continue; /* next tuple please */ /* FDW might have changed tuple */ tuple = ExecFetchSlotHeapTuple(slot, true, NULL); /* * AFTER ROW Triggers might reference the tableoid * column, so initialize t_tableOid before evaluating * them. */ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } else heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, hi_options, bistate); /* And create index entries for it */ if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false, NULL, NIL); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, tuple, recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } } /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ processed++; } } /* Flush any remaining buffered tuples */ if (nBufferedTuples > 0) { if (insertMethod == CIM_MULTI_CONDITIONAL) { CopyFromInsertBatch(cstate, estate, mycid, hi_options, prevResultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); } else CopyFromInsertBatch(cstate, estate, mycid, hi_options, resultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); } /* Done, clean up */ error_context_stack = errcallback.previous; FreeBulkInsertState(bistate); MemoryContextSwitchTo(oldcontext); /* * In the old protocol, tell pqcomm that we can process normal protocol * messages again. */ if (cstate->copy_dest == COPY_OLD_FE) pq_endmsgread(); /* Execute AFTER STATEMENT insertion triggers */ ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); /* Handle queued AFTER triggers */ AfterTriggerEndQuery(estate); pfree(values); pfree(nulls); ExecResetTupleTable(estate->es_tupleTable, false); /* Allow the FDW to shut down */ if (target_resultRelInfo->ri_FdwRoutine != NULL && target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, target_resultRelInfo); ExecCloseIndices(target_resultRelInfo); /* Close all the partitioned tables, leaf partitions, and their indices */ if (proute) ExecCleanupTupleRouting(mtstate, proute); /* Close any trigger target relations */ ExecCleanUpTriggerState(estate); FreeExecutorState(estate); /* * If we skipped writing WAL, then we need to sync the heap (but not * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) heap_sync(cstate->rel); return processed; } /* * A subroutine of CopyFrom, to write the current batch of buffered heap * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT * triggers. */ static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, uint64 firstBufferedLineNo) { MemoryContext oldcontext; int i; uint64 save_cur_lineno; bool line_buf_valid = cstate->line_buf_valid; /* * Print error context information correctly, if one of the operations * below fail. */ cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; /* * heap_multi_insert leaks memory, so switch to short-lived memory context * before calling it. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); heap_multi_insert(resultRelInfo->ri_RelationDesc, bufferedTuples, nBufferedTuples, mycid, hi_options, bistate); MemoryContextSwitchTo(oldcontext); /* * If there are any indexes, update them for all the inserted tuples, and * run AFTER ROW INSERT triggers. */ if (resultRelInfo->ri_NumIndices > 0) { for (i = 0; i < nBufferedTuples; i++) { List *recheckIndexes; cstate->cur_lineno = firstBufferedLineNo + i; ExecStoreHeapTuple(bufferedTuples[i], myslot, false); recheckIndexes = ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self), estate, false, NULL, NIL); ExecARInsertTriggers(estate, resultRelInfo, bufferedTuples[i], recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } } /* * There's no indexes, but see if we need to run AFTER ROW INSERT triggers * anyway. */ else if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_after_row || resultRelInfo->ri_TrigDesc->trig_insert_new_table)) { for (i = 0; i < nBufferedTuples; i++) { cstate->cur_lineno = firstBufferedLineNo + i; ExecARInsertTriggers(estate, resultRelInfo, bufferedTuples[i], NIL, cstate->transition_capture); } } /* reset cur_lineno and line_buf_valid to what they were */ cstate->line_buf_valid = line_buf_valid; cstate->cur_lineno = save_cur_lineno; } /* * Setup to read tuples from a file for COPY FROM. * * 'rel': Used as a template for the tuples * 'filename': Name of server-local file to read * 'attnamelist': List of char *, columns to include. NIL selects all cols. * 'options': List of DefElem. See copy_opt_item in gram.y for selections. * * Returns a CopyState, to be passed to NextCopyFrom and related functions. */ CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options) { CopyState cstate; bool pipe = (filename == NULL); TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; FmgrInfo *in_functions; Oid *typioparams; int attnum; Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; bool volatile_defexprs; cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Initialize state variables */ cstate->reached_eof = false; cstate->eol_type = EOL_UNKNOWN; cstate->cur_relname = RelationGetRelationName(cstate->rel); cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; /* Assign range table, we'll need it in CopyFrom. */ if (pstate) cstate->range_table = pstate->p_rtable; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; num_defaults = 0; volatile_defexprs = false; /* * Pick up the required catalog information for each attribute in the * relation, including the input function, the element type (to pass to * the input function), and info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); for (attnum = 1; attnum <= num_phys_attrs; attnum++) { Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); /* We don't need info for dropped attributes */ if (att->attisdropped) continue; /* Fetch the input function and typioparam info */ if (cstate->binary) getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); else getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); fmgr_info(in_func_oid, &in_functions[attnum - 1]); /* Get default info if needed */ if (!list_member_int(cstate->attnumlist, attnum)) { /* attribute is NOT to be copied from input */ /* use default value if one exists */ Expr *defexpr = (Expr *) build_column_default(cstate->rel, attnum); if (defexpr != NULL) { /* Run the expression through planner */ defexpr = expression_planner(defexpr); /* Initialize executable expression in copycontext */ defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); defmap[num_defaults] = attnum - 1; num_defaults++; /* * If a default expression looks at the table being loaded, * then it could give the wrong answer when using * multi-insert. Since database access can be dynamic this is * hard to test for exactly, so we use the much wider test of * whether the default expression is volatile. We allow for * the special case of when the default expression is the * nextval() of a sequence which in this specific case is * known to be safe for use with the multi-insert * optimization. Hence we use this special case function * checker rather than the standard check for * contain_volatile_functions(). */ if (!volatile_defexprs) volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); } } } /* We keep those variables in cstate. */ cstate->in_functions = in_functions; cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; cstate->is_program = is_program; if (data_source_cb) { cstate->copy_dest = COPY_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) { Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); else cstate->copy_file = stdin; } else { cstate->filename = pstrdup(filename); if (cstate->is_program) { cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not execute command \"%s\": %m", cstate->filename))); } else { struct stat st; cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) { /* copy errno because ereport subfunctions might change it */ int save_errno = errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m", cstate->filename), (save_errno == ENOENT || save_errno == EACCES) ? errhint("COPY FROM instructs the PostgreSQL server process to read a file. " "You may want a client-side facility such as psql's \\copy.") : 0)); } if (fstat(fileno(cstate->copy_file), &st)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", cstate->filename))); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); } } if (cstate->binary) { /* Read and verify binary header */ char readSig[11]; int32 tmp; /* Signature */ if (CopyGetData(cstate, readSig, 11, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ if (!CopyGetInt32(cstate, &tmp)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); if ((tmp & (1 << 16)) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (WITH OIDS)"))); tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ if (!CopyGetInt32(cstate, &tmp) || tmp < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { if (CopyGetData(cstate, readSig, 1, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); } } /* create workspace for CopyReadAttributes results */ if (!cstate->binary) { AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); } MemoryContextSwitchTo(oldcontext); return cstate; } /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. * * An internal temporary buffer is returned via 'fields'. It is valid until * the next call of the function. Since the function returns all raw fields * in the input file, 'nfields' could be different from the number of columns * in the relation. * * NOTE: force_not_null option are not applied to the returned fields. */ bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) { int fldct; bool done; /* only available for text or csv input */ Assert(!cstate->binary); /* on input just throw the header line away */ if (cstate->cur_lineno == 0 && cstate->header_line) { cstate->cur_lineno++; if (CopyReadLine(cstate)) return false; /* done */ } cstate->cur_lineno++; /* Actually read the line into memory here */ done = CopyReadLine(cstate); /* * EOF at start of line means we're done. If we see EOF after some * characters, we act as though it was newline followed by EOF, ie, * process the line and then exit loop on next iteration. */ if (done && cstate->line_buf.len == 0) return false; /* Parse the line into de-escaped field values */ if (cstate->csv_mode) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); *fields = cstate->raw_fields; *nfields = fldct; return true; } /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * * 'econtext' is used to evaluate default expression for each columns not * read from the file. It can be NULL when no default values are used, i.e. * when all columns are read from the file. * * 'values' and 'nulls' arrays must be the same length as columns of the * relation passed to BeginCopyFrom. This function fills the arrays. * Oid of the tuple is returned with 'tupleOid' separately. */ bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls) { TupleDesc tupDesc; AttrNumber num_phys_attrs, attr_count, num_defaults = cstate->num_defaults; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); if (!cstate->binary) { char **field_strings; ListCell *cur; int fldct; int fieldno; char *string; /* read raw fields in the next line */ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) return false; /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); fieldno = 0; /* Loop to read the user attributes on the line. */ foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(att->attname)))); string = field_strings[fieldno++]; if (cstate->convert_select_flags && !cstate->convert_select_flags[m]) { /* ignore input field, leaving column as NULL */ continue; } if (cstate->csv_mode) { if (string == NULL && cstate->force_notnull_flags[m]) { /* * FORCE_NOT_NULL option is set and column is NULL - * convert it to the NULL string. */ string = cstate->null_print; } else if (string != NULL && cstate->force_null_flags[m] && strcmp(string, cstate->null_print) == 0) { /* * FORCE_NULL option is set and column matches the NULL * string. It must have been quoted, or otherwise the * string would already have been set to NULL. Convert it * to NULL as specified. */ string = NULL; } } cstate->cur_attname = NameStr(att->attname); cstate->cur_attval = string; values[m] = InputFunctionCall(&in_functions[m], string, typioparams[m], att->atttypmod); if (string != NULL) nulls[m] = false; cstate->cur_attname = NULL; cstate->cur_attval = NULL; } Assert(fieldno == attr_count); } else { /* binary */ int16 fld_count; ListCell *cur; cstate->cur_lineno++; if (!CopyGetInt16(cstate, &fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ return false; } if (fld_count == -1) { /* * Received EOF marker. In a V3-protocol copy, wait for the * protocol-level EOF, and complain if it doesn't come * immediately. This ensures that we correctly handle CopyFail, * if client chooses to send that now. * * Note that we MUST NOT try to read more data in an old-protocol * copy, since there is no protocol-level EOF marker then. We * could go either way for copy from file, but choose to throw * error if there's data after the EOF marker, for consistency * with the new-protocol case. */ char dummy; if (cstate->copy_dest != COPY_OLD_FE && CopyGetData(cstate, &dummy, 1, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); return false; } if (fld_count != attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("row field count is %d, expected %d", (int) fld_count, attr_count))); i = 0; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); cstate->cur_attname = NameStr(att->attname); i++; values[m] = CopyReadBinaryAttribute(cstate, i, &in_functions[m], typioparams[m], att->atttypmod, &nulls[m]); cstate->cur_attname = NULL; } } /* * Now compute and insert any defaults available for the columns not * provided by the input data. Anything not processed here or above will * remain NULL. */ for (i = 0; i < num_defaults; i++) { /* * The caller must supply econtext and have switched into the * per-tuple memory context in it. */ Assert(econtext != NULL); Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, &nulls[defmap[i]]); } return true; } /* * Clean up storage and release resources for COPY FROM. */ void EndCopyFrom(CopyState cstate) { /* No COPY FROM related resources except memory. */ EndCopy(cstate); } /* * Read the next input line and stash it in line_buf, with conversion to * server encoding. * * Result is true if read was terminated by EOF, false if terminated * by newline. The terminating newline or EOF marker is not included * in the final value of line_buf. */ static bool CopyReadLine(CopyState cstate) { bool result; resetStringInfo(&cstate->line_buf); cstate->line_buf_valid = true; /* Mark that encoding conversion hasn't occurred yet */ cstate->line_buf_converted = false; /* Parse data and transfer into line_buf */ result = CopyReadLineText(cstate); if (result) { /* * Reached EOF. In protocol version 3, we should ignore anything * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ if (cstate->copy_dest == COPY_NEW_FE) { do { cstate->raw_buf_index = cstate->raw_buf_len; } while (CopyLoadRawBuf(cstate)); } } else { /* * If we didn't hit EOF, then we must have transferred the EOL marker * to line_buf along with the data. Get rid of it. */ switch (cstate->eol_type) { case EOL_NL: Assert(cstate->line_buf.len >= 1); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); cstate->line_buf.len--; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_CR: Assert(cstate->line_buf.len >= 1); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); cstate->line_buf.len--; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_CRNL: Assert(cstate->line_buf.len >= 2); Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); cstate->line_buf.len -= 2; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_UNKNOWN: /* shouldn't get here */ Assert(false); break; } } /* Done reading the line. Convert it to server encoding. */ if (cstate->need_transcoding) { char *cvt; cvt = pg_any_to_server(cstate->line_buf.data, cstate->line_buf.len, cstate->file_encoding); if (cvt != cstate->line_buf.data) { /* transfer converted data back to line_buf */ resetStringInfo(&cstate->line_buf); appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); pfree(cvt); } } /* Now it's safe to use the buffer in error messages */ cstate->line_buf_converted = true; return result; } /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ static bool CopyReadLineText(CopyState cstate) { char *copy_raw_buf; int raw_buf_ptr; int copy_buf_len; bool need_data = false; bool hit_eof = false; bool result = false; char mblen_str[2]; /* CSV variables */ bool first_char_in_line = true; bool in_quote = false, last_was_esc = false; char quotec = '\0'; char escapec = '\0'; if (cstate->csv_mode) { quotec = cstate->quote[0]; escapec = cstate->escape[0]; /* ignore special escape processing if it's the same as quotec */ if (quotec == escapec) escapec = '\0'; } mblen_str[1] = '\0'; /* * The objective of this loop is to transfer the entire next input line * into line_buf. Hence, we only care for detecting newlines (\r and/or * \n) and the end-of-copy marker (\.). * * In CSV mode, \r and \n inside a quoted field are just part of the data * value and are put in line_buf. We keep just enough state to know if we * are currently in a quoted field or not. * * These four characters, and the CSV escape and quote characters, are * assumed the same in frontend and backend encodings. * * For speed, we try to move data from raw_buf to line_buf in chunks * rather than one character at a time. raw_buf_ptr points to the next * character to examine; any characters from raw_buf_index to raw_buf_ptr * have been determined to be part of the line, but not yet transferred to * line_buf. * * For a little extra speed within the loop, we copy raw_buf and * raw_buf_len into local variables. */ copy_raw_buf = cstate->raw_buf; raw_buf_ptr = cstate->raw_buf_index; copy_buf_len = cstate->raw_buf_len; for (;;) { int prev_raw_ptr; char c; /* * Load more data if needed. Ideally we would just force four bytes * of read-ahead and avoid the many calls to * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol * does not allow us to read too far ahead or we might read into the * next data, so we read-ahead only as far we know we can. One * optimization would be to read-ahead four byte here if * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it, * considering the size of the buffer. */ if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; /* * Try to read some more data. This will certainly reset * raw_buf_index to zero, and raw_buf_ptr must go with it. */ if (!CopyLoadRawBuf(cstate)) hit_eof = true; raw_buf_ptr = 0; copy_buf_len = cstate->raw_buf_len; /* * If we are completely out of data, break out of the loop, * reporting EOF. */ if (copy_buf_len <= 0) { result = true; break; } need_data = false; } /* OK to fetch a character */ prev_raw_ptr = raw_buf_ptr; c = copy_raw_buf[raw_buf_ptr++]; if (cstate->csv_mode) { /* * If character is '\\' or '\r', we may need to look ahead below. * Force fetch of the next character if we don't already have it. * We need to do this before changing CSV state, in case one of * these characters is also the quote or escape character. * * Note: old-protocol does not like forced prefetch, but it's OK * here since we cannot validly be at EOF. */ if (c == '\\' || c == '\r') { IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); } /* * Dealing with quotes and escapes here is mildly tricky. If the * quote char is also the escape char, there's no problem - we * just use the char as a toggle. If they are different, we need * to ensure that we only take account of an escape inside a * quoted field and immediately preceding a quote char, and not * the second in an escape-escape sequence. */ if (in_quote && c == escapec) last_was_esc = !last_was_esc; if (c == quotec && !last_was_esc) in_quote = !in_quote; if (c != escapec) last_was_esc = false; /* * Updating the line count for embedded CR and/or LF chars is * necessarily a little fragile - this test is probably about the * best we can do. (XXX it's arguable whether we should do this * at all --- is cur_lineno a physical or logical count?) */ if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) cstate->cur_lineno++; } /* Process \r */ if (c == '\r' && (!cstate->csv_mode || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || cstate->eol_type == EOL_CRNL) { /* * If need more data, go back to loop top to load it. * * Note that if we are at EOF, c will wind up as '\0' because * of the guaranteed pad of raw_buf. */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* get next char */ c = copy_raw_buf[raw_buf_ptr]; if (c == '\n') { raw_buf_ptr++; /* eat newline */ cstate->eol_type = EOL_CRNL; /* in case not set yet */ } else { /* found \r, but no \n */ if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), !cstate->csv_mode ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* * if we got here, it is the first line and we didn't find * \n, so don't consume the peeked character */ cstate->eol_type = EOL_CR; } } else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), !cstate->csv_mode ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ break; } /* Process \n */ if (c == '\n' && (!cstate->csv_mode || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), !cstate->csv_mode ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ /* If reach here, we have found the line terminator */ break; } /* * In CSV mode, we only recognize \. alone on a line. This is because * \. is a valid CSV data value. */ if (c == '\\' && (!cstate->csv_mode || first_char_in_line)) { char c2; IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); IF_NEED_REFILL_AND_EOF_BREAK(0); /* ----- * get next character * Note: we do not change c so if it isn't \., we can fall * through and continue processing for file encoding. * ----- */ c2 = copy_raw_buf[raw_buf_ptr]; if (c2 == '.') { raw_buf_ptr++; /* consume the '.' */ /* * Note: if we loop back for more data here, it does not * matter that the CSV state change checks are re-executed; we * will come back here with no important state changed. */ if (cstate->eol_type == EOL_CRNL) { /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ c2 = copy_raw_buf[raw_buf_ptr++]; if (c2 == '\n') { if (!cstate->csv_mode) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); else NO_END_OF_COPY_GOTO; } else if (c2 != '\r') { if (!cstate->csv_mode) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); else NO_END_OF_COPY_GOTO; } } /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ c2 = copy_raw_buf[raw_buf_ptr++]; if (c2 != '\r' && c2 != '\n') { if (!cstate->csv_mode) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); else NO_END_OF_COPY_GOTO; } if ((cstate->eol_type == EOL_NL && c2 != '\n') || (cstate->eol_type == EOL_CRNL && c2 != '\n') || (cstate->eol_type == EOL_CR && c2 != '\r')) { ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); } /* * Transfer only the data before the \. into line_buf, then * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf + cstate->raw_buf_index, prev_raw_ptr - cstate->raw_buf_index); cstate->raw_buf_index = raw_buf_ptr; result = true; /* report EOF */ break; } else if (!cstate->csv_mode) /* * If we are here, it means we found a backslash followed by * something other than a period. In non-CSV mode, anything * after a backslash is special, so we skip over that second * character too. If we didn't do that \\. would be * considered an eof-of copy, while in non-CSV mode it is a * literal backslash followed by a period. In CSV mode, * backslashes are not special, so we want to process the * character after the backslash just like a normal character, * so we don't increment in those cases. */ raw_buf_ptr++; } /* * This label is for CSV cases where \. appears at the start of a * line, but there is more text after it, meaning it was a data value. * We are more strict for \. in CSV mode because \. could be a data * value, while in non-CSV mode, \. cannot be a data value. */ not_end_of_copy: /* * Process all bytes of a multi-byte character as a group. * * We only support multi-byte sequences where the first byte has the * high-bit set, so as an optimization we can avoid this block * entirely if it is not set. */ if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c)) { int mblen; mblen_str[0] = c; /* All our encodings only read the first byte to get the length */ mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str); IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); raw_buf_ptr += mblen - 1; } first_char_in_line = false; } /* end of outer loop */ /* * Transfer any still-uncopied data to line_buf. */ REFILL_LINEBUF; return result; } /* * Return decimal value for a hexadecimal digit */ static int GetDecimalFromHex(char hex) { if (isdigit((unsigned char) hex)) return hex - '0'; else return tolower((unsigned char) hex) - 'a' + 10; } /* * Parse the current line into separate attributes (fields), * performing de-escaping as needed. * * The input is in line_buf. We use attribute_buf to hold the result * strings. cstate->raw_fields[k] is set to point to the k'th attribute * string, or NULL when the input matches the null marker string. * This array is expanded as necessary. * * (Note that the caller cannot check for nulls since the returned * string would be the post-de-escaping equivalent, which may look * the same as some valid data string.) * * delim is the column delimiter string (must be just one byte for now). * null_print is the null marker string. Note that this is compared to * the pre-de-escaped input string. * * The return value is the number of fields actually read. */ static int CopyReadAttributesText(CopyState cstate) { char delimc = cstate->delim[0]; int fieldno; char *output_ptr; char *cur_ptr; char *line_end_ptr; /* * We need a special case for zero-column tables: check that the input * line is empty, and return. */ if (cstate->max_fields <= 0) { if (cstate->line_buf.len != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); return 0; } resetStringInfo(&cstate->attribute_buf); /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); output_ptr = cstate->attribute_buf.data; /* set pointer variables for loop */ cur_ptr = cstate->line_buf.data; line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; /* Outer loop iterates over fields */ fieldno = 0; for (;;) { bool found_delim = false; char *start_ptr; char *end_ptr; int input_len; bool saw_non_ascii = false; /* Make sure there is enough space for the next value */ if (fieldno >= cstate->max_fields) { cstate->max_fields *= 2; cstate->raw_fields = repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); } /* Remember start of field on both input and output sides */ start_ptr = cur_ptr; cstate->raw_fields[fieldno] = output_ptr; /* * Scan data for field. * * Note that in this loop, we are scanning to locate the end of field * and also speculatively performing de-escaping. Once we find the * end-of-field, we can match the raw field contents against the null * marker string. Only after that comparison fails do we know that * de-escaping is actually the right thing to do; therefore we *must * not* throw any syntax errors before we've done the null-marker * check. */ for (;;) { char c; end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) break; c = *cur_ptr++; if (c == delimc) { found_delim = true; break; } if (c == '\\') { if (cur_ptr >= line_end_ptr) break; c = *cur_ptr++; switch (c) { case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7': { /* handle \013 */ int val; val = OCTVALUE(c); if (cur_ptr < line_end_ptr) { c = *cur_ptr; if (ISOCTAL(c)) { cur_ptr++; val = (val << 3) + OCTVALUE(c); if (cur_ptr < line_end_ptr) { c = *cur_ptr; if (ISOCTAL(c)) { cur_ptr++; val = (val << 3) + OCTVALUE(c); } } } } c = val & 0377; if (c == '\0' || IS_HIGHBIT_SET(c)) saw_non_ascii = true; } break; case 'x': /* Handle \x3F */ if (cur_ptr < line_end_ptr) { char hexchar = *cur_ptr; if (isxdigit((unsigned char) hexchar)) { int val = GetDecimalFromHex(hexchar); cur_ptr++; if (cur_ptr < line_end_ptr) { hexchar = *cur_ptr; if (isxdigit((unsigned char) hexchar)) { cur_ptr++; val = (val << 4) + GetDecimalFromHex(hexchar); } } c = val & 0xff; if (c == '\0' || IS_HIGHBIT_SET(c)) saw_non_ascii = true; } } break; case 'b': c = '\b'; break; case 'f': c = '\f'; break; case 'n': c = '\n'; break; case 'r': c = '\r'; break; case 't': c = '\t'; break; case 'v': c = '\v'; break; /* * in all other cases, take the char after '\' * literally */ } } /* Add c to output string */ *output_ptr++ = c; } /* Check whether raw input matched null marker */ input_len = end_ptr - start_ptr; if (input_len == cstate->null_print_len && strncmp(start_ptr, cstate->null_print, input_len) == 0) cstate->raw_fields[fieldno] = NULL; else { /* * At this point we know the field is supposed to contain data. * * If we de-escaped any non-7-bit-ASCII chars, make sure the * resulting string is valid data for the db encoding. */ if (saw_non_ascii) { char *fld = cstate->raw_fields[fieldno]; pg_verifymbstr(fld, output_ptr - fld, false); } } /* Terminate attribute value in output area */ *output_ptr++ = '\0'; fieldno++; /* Done if we hit EOL instead of a delim */ if (!found_delim) break; } /* Clean up state of attribute_buf */ output_ptr--; Assert(*output_ptr == '\0'); cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); return fieldno; } /* * Parse the current line into separate attributes (fields), * performing de-escaping as needed. This has exactly the same API as * CopyReadAttributesText, except we parse the fields according to * "standard" (i.e. common) CSV usage. */ static int CopyReadAttributesCSV(CopyState cstate) { char delimc = cstate->delim[0]; char quotec = cstate->quote[0]; char escapec = cstate->escape[0]; int fieldno; char *output_ptr; char *cur_ptr; char *line_end_ptr; /* * We need a special case for zero-column tables: check that the input * line is empty, and return. */ if (cstate->max_fields <= 0) { if (cstate->line_buf.len != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); return 0; } resetStringInfo(&cstate->attribute_buf); /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); output_ptr = cstate->attribute_buf.data; /* set pointer variables for loop */ cur_ptr = cstate->line_buf.data; line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; /* Outer loop iterates over fields */ fieldno = 0; for (;;) { bool found_delim = false; bool saw_quote = false; char *start_ptr; char *end_ptr; int input_len; /* Make sure there is enough space for the next value */ if (fieldno >= cstate->max_fields) { cstate->max_fields *= 2; cstate->raw_fields = repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); } /* Remember start of field on both input and output sides */ start_ptr = cur_ptr; cstate->raw_fields[fieldno] = output_ptr; /* * Scan data for field, * * The loop starts in "not quote" mode and then toggles between that * and "in quote" mode. The loop exits normally if it is in "not * quote" mode and a delimiter or line end is seen. */ for (;;) { char c; /* Not in quote */ for (;;) { end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) goto endfield; c = *cur_ptr++; /* unquoted field delimiter */ if (c == delimc) { found_delim = true; goto endfield; } /* start of quoted field (or part of field) */ if (c == quotec) { saw_quote = true; break; } /* Add c to output string */ *output_ptr++ = c; } /* In quote */ for (;;) { end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unterminated CSV quoted field"))); c = *cur_ptr++; /* escape within a quoted field */ if (c == escapec) { /* * peek at the next char if available, and escape it if it * is an escape char or a quote char */ if (cur_ptr < line_end_ptr) { char nextc = *cur_ptr; if (nextc == escapec || nextc == quotec) { *output_ptr++ = nextc; cur_ptr++; continue; } } } /* * end of quoted field. Must do this test after testing for * escape in case quote char and escape char are the same * (which is the common case). */ if (c == quotec) break; /* Add c to output string */ *output_ptr++ = c; } } endfield: /* Terminate attribute value in output area */ *output_ptr++ = '\0'; /* Check whether raw input matched null marker */ input_len = end_ptr - start_ptr; if (!saw_quote && input_len == cstate->null_print_len && strncmp(start_ptr, cstate->null_print, input_len) == 0) cstate->raw_fields[fieldno] = NULL; fieldno++; /* Done if we hit EOL instead of a delim */ if (!found_delim) break; } /* Clean up state of attribute_buf */ output_ptr--; Assert(*output_ptr == '\0'); cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); return fieldno; } /* * Read a binary attribute */ static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull) { int32 fld_size; Datum result; if (!CopyGetInt32(cstate, &fld_size)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } if (fld_size < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid field size"))); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyGetData(cstate, cstate->attribute_buf.data, fld_size, fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; /* Call the column type's binary input converter */ result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, typioparam, typmod); /* Trouble if it didn't eat the whole buffer */ if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("incorrect binary data format"))); *isnull = false; return result; } /* * Send text representation of one attribute, with conversion and escaping */ #define DUMPSOFAR() \ do { \ if (ptr > start) \ CopySendData(cstate, start, ptr - start); \ } while (0) static void CopyAttributeOutText(CopyState cstate, char *string) { char *ptr; char *start; char c; char delimc = cstate->delim[0]; if (cstate->need_transcoding) ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); else ptr = string; /* * We have to grovel through the string searching for control characters * and instances of the delimiter character. In most cases, though, these * are infrequent. To avoid overhead from calling CopySendData once per * character, we dump out all characters between escaped characters in a * single call. The loop invariant is that the data from "start" to "ptr" * can be sent literally, but hasn't yet been. * * We can skip pg_encoding_mblen() overhead when encoding is safe, because * in valid backend encodings, extra bytes of a multibyte character never * look like ASCII. This loop is sufficiently performance-critical that * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out * of the normal safe-encoding path. */ if (cstate->encoding_embeds_ascii) { start = ptr; while ((c = *ptr) != '\0') { if ((unsigned char) c < (unsigned char) 0x20) { /* * \r and \n must be escaped, the others are traditional. We * prefer to dump these using the C-like notation, rather than * a backslash and the literal character, because it makes the * dump file a bit more proof against Microsoftish data * mangling. */ switch (c) { case '\b': c = 'b'; break; case '\f': c = 'f'; break; case '\n': c = 'n'; break; case '\r': c = 'r'; break; case '\t': c = 't'; break; case '\v': c = 'v'; break; default: /* If it's the delimiter, must backslash it */ if (c == delimc) break; /* All ASCII control chars are length 1 */ ptr++; continue; /* fall to end of loop */ } /* if we get here, we need to convert the control char */ DUMPSOFAR(); CopySendChar(cstate, '\\'); CopySendChar(cstate, c); start = ++ptr; /* do not include char in next run */ } else if (c == '\\' || c == delimc) { DUMPSOFAR(); CopySendChar(cstate, '\\'); start = ptr++; /* we include char in next run */ } else if (IS_HIGHBIT_SET(c)) ptr += pg_encoding_mblen(cstate->file_encoding, ptr); else ptr++; } } else { start = ptr; while ((c = *ptr) != '\0') { if ((unsigned char) c < (unsigned char) 0x20) { /* * \r and \n must be escaped, the others are traditional. We * prefer to dump these using the C-like notation, rather than * a backslash and the literal character, because it makes the * dump file a bit more proof against Microsoftish data * mangling. */ switch (c) { case '\b': c = 'b'; break; case '\f': c = 'f'; break; case '\n': c = 'n'; break; case '\r': c = 'r'; break; case '\t': c = 't'; break; case '\v': c = 'v'; break; default: /* If it's the delimiter, must backslash it */ if (c == delimc) break; /* All ASCII control chars are length 1 */ ptr++; continue; /* fall to end of loop */ } /* if we get here, we need to convert the control char */ DUMPSOFAR(); CopySendChar(cstate, '\\'); CopySendChar(cstate, c); start = ++ptr; /* do not include char in next run */ } else if (c == '\\' || c == delimc) { DUMPSOFAR(); CopySendChar(cstate, '\\'); start = ptr++; /* we include char in next run */ } else ptr++; } } DUMPSOFAR(); } /* * Send text representation of one attribute, with conversion and * CSV-style escaping */ static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr) { char *ptr; char *start; char c; char delimc = cstate->delim[0]; char quotec = cstate->quote[0]; char escapec = cstate->escape[0]; /* force quoting if it matches null_print (before conversion!) */ if (!use_quote && strcmp(string, cstate->null_print) == 0) use_quote = true; if (cstate->need_transcoding) ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding); else ptr = string; /* * Make a preliminary pass to discover if it needs quoting */ if (!use_quote) { /* * Because '\.' can be a data value, quote it if it appears alone on a * line so it is not interpreted as the end-of-data marker. */ if (single_attr && strcmp(ptr, "\\.") == 0) use_quote = true; else { char *tptr = ptr; while ((c = *tptr) != '\0') { if (c == delimc || c == quotec || c == '\n' || c == '\r') { use_quote = true; break; } if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) tptr += pg_encoding_mblen(cstate->file_encoding, tptr); else tptr++; } } } if (use_quote) { CopySendChar(cstate, quotec); /* * We adopt the same optimization strategy as in CopyAttributeOutText */ start = ptr; while ((c = *ptr) != '\0') { if (c == quotec || c == escapec) { DUMPSOFAR(); CopySendChar(cstate, escapec); start = ptr; /* we include char in next run */ } if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) ptr += pg_encoding_mblen(cstate->file_encoding, ptr); else ptr++; } DUMPSOFAR(); CopySendChar(cstate, quotec); } else { /* If it doesn't need quoting, we can just dump it as-is */ CopySendString(cstate, ptr); } } /* * CopyGetAttnums - build an integer list of attnums to be copied * * The input attnamelist is either the user-specified column list, * or NIL if there was none (in which case we want all the non-dropped * columns). * * rel can be NULL ... it's only used for error reports. */ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) { List *attnums = NIL; if (attnamelist == NIL) { /* Generate default column list */ int attr_count = tupDesc->natts; int i; for (i = 0; i < attr_count; i++) { if (TupleDescAttr(tupDesc, i)->attisdropped) continue; attnums = lappend_int(attnums, i + 1); } } else { /* Validate the user-supplied list and extract attnums */ ListCell *l; foreach(l, attnamelist) { char *name = strVal(lfirst(l)); int attnum; int i; /* Lookup column name */ attnum = InvalidAttrNumber; for (i = 0; i < tupDesc->natts; i++) { Form_pg_attribute att = TupleDescAttr(tupDesc, i); if (att->attisdropped) continue; if (namestrcmp(&(att->attname), name) == 0) { attnum = att->attnum; break; } } if (attnum == InvalidAttrNumber) { if (rel != NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", name, RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" does not exist", name))); } /* Check for duplicates */ if (list_member_int(attnums, attnum)) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), errmsg("column \"%s\" specified more than once", name))); attnums = lappend_int(attnums, attnum); } } return attnums; } /* * copy_dest_startup --- executor startup */ static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { /* no-op */ } /* * copy_dest_receive --- receive one tuple */ static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) { DR_copy *myState = (DR_copy *) self; CopyState cstate = myState->cstate; /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); /* And send the data */ CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull); myState->processed++; return true; } /* * copy_dest_shutdown --- executor end */ static void copy_dest_shutdown(DestReceiver *self) { /* no-op */ } /* * copy_dest_destroy --- release DestReceiver object */ static void copy_dest_destroy(DestReceiver *self) { pfree(self); } /* * CreateCopyDestReceiver -- create a suitable DestReceiver object */ DestReceiver * CreateCopyDestReceiver(void) { DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); self->pub.receiveSlot = copy_dest_receive; self->pub.rStartup = copy_dest_startup; self->pub.rShutdown = copy_dest_shutdown; self->pub.rDestroy = copy_dest_destroy; self->pub.mydest = DestCopyOut; self->cstate = NULL; /* will be set later */ self->processed = 0; return (DestReceiver *) self; }