postgresql/src/backend/commands/copyfromparse.c

1616 lines
44 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* copyfromparse.c
* Parse CSV/text/binary format for COPY FROM.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/copyfromparse.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "port/pg_bswap.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
* They are macros because they often do continue/break control and to avoid
* function call overhead in tight COPY loops.
*
* We must use "if (1)" because the usual "do {...} while(0)" wrapper would
* prevent the continue/break processing from working. We end the "if (1)"
* with "else ((void) 0)" to ensure the "if" does not unintentionally match
* any "else" in the calling code, and to avoid any compiler warnings about
* empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
*/
/*
* This keeps the character read at the top of the loop in the buffer
* even if there is more than one read-ahead.
*/
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
if (1) \
{ \
if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
{ \
raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
need_data = true; \
continue; \
} \
} else ((void) 0)
/* This consumes the remainder of the buffer and breaks */
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
if (1) \
{ \
if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
{ \
if (extralen) \
raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
/* backslash just before EOF, treat as data char */ \
result = true; \
break; \
} \
} else ((void) 0)
/*
* Transfer any approved data to line_buf; must do this to be sure
* there is some room in raw_buf.
*/
#define REFILL_LINEBUF \
if (1) \
{ \
if (raw_buf_ptr > cstate->raw_buf_index) \
{ \
appendBinaryStringInfo(&cstate->line_buf, \
cstate->raw_buf + cstate->raw_buf_index, \
raw_buf_ptr - cstate->raw_buf_index); \
cstate->raw_buf_index = raw_buf_ptr; \
} \
} else ((void) 0)
/* Undo any read-ahead and jump out of the block. */
#define NO_END_OF_COPY_GOTO \
if (1) \
{ \
raw_buf_ptr = prev_raw_ptr + 1; \
goto not_end_of_copy; \
} else ((void) 0)
/* NOTE: there's a copy of this in copyto.c */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static bool CopyReadLine(CopyFromState cstate);
static bool CopyReadLineText(CopyFromState cstate);
static int CopyReadAttributesText(CopyFromState cstate);
static int CopyReadAttributesCSV(CopyFromState cstate);
static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
Oid typioparam, int32 typmod,
bool *isnull);
/* Low-level communications functions */
static int CopyGetData(CopyFromState cstate, void *databuf,
int minread, int maxread);
static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
static bool CopyLoadRawBuf(CopyFromState cstate);
static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
void
ReceiveCopyBegin(CopyFromState cstate)
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
int natts = list_length(cstate->attnumlist);
int16 format = (cstate->opts.binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'G');
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
pq_sendint16(&buf, format); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_src = COPY_NEW_FE;
cstate->fe_msgbuf = makeStringInfo();
}
else
{
/* old way */
if (cstate->opts.binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('G');
/* any error in old protocol will make us lose sync */
pq_startmsgread();
cstate->copy_src = COPY_OLD_FE;
}
/* We *must* flush here to ensure FE knows it can send. */
pq_flush();
}
void
ReceiveCopyBinaryHeader(CopyFromState cstate)
{
char readSig[11];
int32 tmp;
/* Signature */
if (CopyReadBinaryData(cstate, readSig, 11) != 11 ||
memcmp(readSig, BinarySignature, 11) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("COPY file signature not recognized")));
/* Flags field */
if (!CopyGetInt32(cstate, &tmp))
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
if ((tmp & (1 << 16)) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (WITH OIDS)")));
tmp &= ~(1 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unrecognized critical flags in COPY file header")));
/* Header extension length */
if (!CopyGetInt32(cstate, &tmp) ||
tmp < 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing length)")));
/* Skip extension header, if present */
while (tmp-- > 0)
{
if (CopyReadBinaryData(cstate, readSig, 1) != 1)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (wrong length)")));
}
}
/*
* CopyGetData reads data from the source (file or frontend)
*
* We attempt to read at least minread, and at most maxread, bytes from
* the source. The actual number of bytes read is returned; if this is
* less than minread, EOF was detected.
*
* Note: when copying from the frontend, we expect a proper EOF mark per
* protocol; if the frontend simply drops the connection, we raise error.
* It seems unwise to allow the COPY IN to complete normally in that case.
*
* NB: no data conversion is applied here.
*/
static int
CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
{
int bytesread = 0;
switch (cstate->copy_src)
{
case COPY_FILE:
bytesread = fread(databuf, 1, maxread, cstate->copy_file);
if (ferror(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from COPY file: %m")));
if (bytesread == 0)
cstate->reached_eof = true;
break;
case COPY_OLD_FE:
/*
* We cannot read more than minread bytes (which in practice is 1)
* because old protocol doesn't have any clear way of separating
* the COPY stream from following data. This is slow, but not any
* slower than the code path was originally, and we don't care
* much anymore about the performance of old protocol.
*/
if (pq_getbytes((char *) databuf, minread))
{
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
}
bytesread = minread;
break;
case COPY_NEW_FE:
while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
{
int avail;
while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
{
/* Try to receive another message */
int mtype;
readmessage:
HOLD_CANCEL_INTERRUPTS();
pq_startmsgread();
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
if (pq_getmessage(cstate->fe_msgbuf, 0))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
RESUME_CANCEL_INTERRUPTS();
switch (mtype)
{
case 'd': /* CopyData */
break;
case 'c': /* CopyDone */
/* COPY IN correctly terminated by frontend */
cstate->reached_eof = true;
return bytesread;
case 'f': /* CopyFail */
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
pq_getmsgstring(cstate->fe_msgbuf))));
break;
case 'H': /* Flush */
case 'S': /* Sync */
/*
* Ignore Flush/Sync for the convenience of client
* libraries (such as libpq) that may send those
* without noticing that the command they just
* sent was COPY.
*/
goto readmessage;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type 0x%02X during COPY from stdin",
mtype)));
break;
}
}
avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
if (avail > maxread)
avail = maxread;
pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
databuf = (void *) ((char *) databuf + avail);
maxread -= avail;
bytesread += avail;
}
break;
case COPY_CALLBACK:
bytesread = cstate->data_source_cb(databuf, minread, maxread);
break;
}
return bytesread;
}
/*
* These functions do apply some data conversion
*/
/*
* CopyGetInt32 reads an int32 that appears in network byte order
*
* Returns true if OK, false if EOF
*/
static inline bool
CopyGetInt32(CopyFromState cstate, int32 *val)
{
uint32 buf;
if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
{
*val = 0; /* suppress compiler warning */
return false;
}
*val = (int32) pg_ntoh32(buf);
return true;
}
/*
* CopyGetInt16 reads an int16 that appears in network byte order
*/
static inline bool
CopyGetInt16(CopyFromState cstate, int16 *val)
{
uint16 buf;
if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
{
*val = 0; /* suppress compiler warning */
return false;
}
*val = (int16) pg_ntoh16(buf);
return true;
}
/*
* CopyLoadRawBuf loads some more data into raw_buf
*
* Returns true if able to obtain at least one more byte, else false.
*
* If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start
* of the buffer and then we load more data after that. This case occurs only
* when a multibyte character crosses a bufferload boundary.
*/
static bool
CopyLoadRawBuf(CopyFromState cstate)
{
int nbytes = RAW_BUF_BYTES(cstate);
int inbytes;
/* Copy down the unprocessed data if any. */
if (nbytes > 0)
memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
nbytes);
inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
1, RAW_BUF_SIZE - nbytes);
nbytes += inbytes;
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes;
return (inbytes > 0);
}
/*
* CopyReadBinaryData
*
* Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf
* and writes them to 'dest'. Returns the number of bytes read (which
* would be less than 'nbytes' only if we reach EOF).
*/
static int
CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
{
int copied_bytes = 0;
if (RAW_BUF_BYTES(cstate) >= nbytes)
{
/* Enough bytes are present in the buffer. */
memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes);
cstate->raw_buf_index += nbytes;
copied_bytes = nbytes;
}
else
{
/*
* Not enough bytes in the buffer, so must read from the file. Need
* to loop since 'nbytes' could be larger than the buffer size.
*/
do
{
int copy_bytes;
/* Load more data if buffer is empty. */
if (RAW_BUF_BYTES(cstate) == 0)
{
if (!CopyLoadRawBuf(cstate))
break; /* EOF */
}
/* Transfer some bytes. */
copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate));
memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes);
cstate->raw_buf_index += copy_bytes;
dest += copy_bytes;
copied_bytes += copy_bytes;
} while (copied_bytes < nbytes);
}
return copied_bytes;
}
/*
* Read raw fields in the next line for COPY FROM in text or csv mode.
* Return false if no more lines.
*
* An internal temporary buffer is returned via 'fields'. It is valid until
* the next call of the function. Since the function returns all raw fields
* in the input file, 'nfields' could be different from the number of columns
* in the relation.
*
* NOTE: force_not_null option are not applied to the returned fields.
*/
bool
NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
{
int fldct;
bool done;
/* only available for text or csv input */
Assert(!cstate->opts.binary);
/* on input just throw the header line away */
if (cstate->cur_lineno == 0 && cstate->opts.header_line)
{
cstate->cur_lineno++;
if (CopyReadLine(cstate))
return false; /* done */
}
cstate->cur_lineno++;
/* Actually read the line into memory here */
done = CopyReadLine(cstate);
/*
* EOF at start of line means we're done. If we see EOF after some
* characters, we act as though it was newline followed by EOF, ie,
* process the line and then exit loop on next iteration.
*/
if (done && cstate->line_buf.len == 0)
return false;
/* Parse the line into de-escaped field values */
if (cstate->opts.csv_mode)
fldct = CopyReadAttributesCSV(cstate);
else
fldct = CopyReadAttributesText(cstate);
*fields = cstate->raw_fields;
*nfields = fldct;
return true;
}
/*
* Read next tuple from file for COPY FROM. Return false if no more tuples.
*
* 'econtext' is used to evaluate default expression for each columns not
* read from the file. It can be NULL when no default values are used, i.e.
* when all columns are read from the file.
*
* 'values' and 'nulls' arrays must be the same length as columns of the
* relation passed to BeginCopyFrom. This function fills the arrays.
*/
bool
NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls)
{
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
attr_count,
num_defaults = cstate->num_defaults;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int i;
int *defmap = cstate->defmap;
ExprState **defexprs = cstate->defexprs;
tupDesc = RelationGetDescr(cstate->rel);
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
if (!cstate->opts.binary)
{
char **field_strings;
ListCell *cur;
int fldct;
int fieldno;
char *string;
/* read raw fields in the next line */
if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
return false;
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
fieldno = 0;
/* Loop to read the user attributes on the line. */
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(att->attname))));
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
!cstate->convert_select_flags[m])
{
/* ignore input field, leaving column as NULL */
continue;
}
if (cstate->opts.csv_mode)
{
if (string == NULL &&
cstate->opts.force_notnull_flags[m])
{
/*
* FORCE_NOT_NULL option is set and column is NULL -
* convert it to the NULL string.
*/
string = cstate->opts.null_print;
}
else if (string != NULL && cstate->opts.force_null_flags[m]
&& strcmp(string, cstate->opts.null_print) == 0)
{
/*
* FORCE_NULL option is set and column matches the NULL
* string. It must have been quoted, or otherwise the
* string would already have been set to NULL. Convert it
* to NULL as specified.
*/
string = NULL;
}
}
cstate->cur_attname = NameStr(att->attname);
cstate->cur_attval = string;
values[m] = InputFunctionCall(&in_functions[m],
string,
typioparams[m],
att->atttypmod);
if (string != NULL)
nulls[m] = false;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
Assert(fieldno == attr_count);
}
else
{
/* binary */
int16 fld_count;
ListCell *cur;
cstate->cur_lineno++;
if (!CopyGetInt16(cstate, &fld_count))
{
/* EOF detected (end of file, or protocol-level EOF) */
return false;
}
if (fld_count == -1)
{
/*
* Received EOF marker. In a V3-protocol copy, wait for the
* protocol-level EOF, and complain if it doesn't come
* immediately. This ensures that we correctly handle CopyFail,
* if client chooses to send that now.
*
* Note that we MUST NOT try to read more data in an old-protocol
* copy, since there is no protocol-level EOF marker then. We
* could go either way for copy from file, but choose to throw
* error if there's data after the EOF marker, for consistency
* with the new-protocol case.
*/
char dummy;
if (cstate->copy_src != COPY_OLD_FE &&
CopyReadBinaryData(cstate, &dummy, 1) > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("received copy data after EOF marker")));
return false;
}
if (fld_count != attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("row field count is %d, expected %d",
(int) fld_count, attr_count)));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
cstate->cur_attname = NameStr(att->attname);
values[m] = CopyReadBinaryAttribute(cstate,
&in_functions[m],
typioparams[m],
att->atttypmod,
&nulls[m]);
cstate->cur_attname = NULL;
}
}
/*
* Now compute and insert any defaults available for the columns not
* provided by the input data. Anything not processed here or above will
* remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{
/*
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&nulls[defmap[i]]);
}
return true;
}
/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
*
* Result is true if read was terminated by EOF, false if terminated
* by newline. The terminating newline or EOF marker is not included
* in the final value of line_buf.
*/
static bool
CopyReadLine(CopyFromState cstate)
{
bool result;
resetStringInfo(&cstate->line_buf);
cstate->line_buf_valid = true;
/* Mark that encoding conversion hasn't occurred yet */
cstate->line_buf_converted = false;
/* Parse data and transfer into line_buf */
result = CopyReadLineText(cstate);
if (result)
{
/*
* Reached EOF. In protocol version 3, we should ignore anything
* after \. up to the protocol end of copy data. (XXX maybe better
* not to treat \. as special?)
*/
if (cstate->copy_src == COPY_NEW_FE)
{
do
{
cstate->raw_buf_index = cstate->raw_buf_len;
} while (CopyLoadRawBuf(cstate));
}
}
else
{
/*
* If we didn't hit EOF, then we must have transferred the EOL marker
* to line_buf along with the data. Get rid of it.
*/
switch (cstate->eol_type)
{
case EOL_NL:
Assert(cstate->line_buf.len >= 1);
Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
cstate->line_buf.len--;
cstate->line_buf.data[cstate->line_buf.len] = '\0';
break;
case EOL_CR:
Assert(cstate->line_buf.len >= 1);
Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
cstate->line_buf.len--;
cstate->line_buf.data[cstate->line_buf.len] = '\0';
break;
case EOL_CRNL:
Assert(cstate->line_buf.len >= 2);
Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
cstate->line_buf.len -= 2;
cstate->line_buf.data[cstate->line_buf.len] = '\0';
break;
case EOL_UNKNOWN:
/* shouldn't get here */
Assert(false);
break;
}
}
/* Done reading the line. Convert it to server encoding. */
if (cstate->need_transcoding)
{
char *cvt;
cvt = pg_any_to_server(cstate->line_buf.data,
cstate->line_buf.len,
cstate->file_encoding);
if (cvt != cstate->line_buf.data)
{
/* transfer converted data back to line_buf */
resetStringInfo(&cstate->line_buf);
appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
pfree(cvt);
}
}
/* Now it's safe to use the buffer in error messages */
cstate->line_buf_converted = true;
return result;
}
/*
* CopyReadLineText - inner loop of CopyReadLine for text mode
*/
static bool
CopyReadLineText(CopyFromState cstate)
{
char *copy_raw_buf;
int raw_buf_ptr;
int copy_buf_len;
bool need_data = false;
bool hit_eof = false;
bool result = false;
char mblen_str[2];
/* CSV variables */
bool first_char_in_line = true;
bool in_quote = false,
last_was_esc = false;
char quotec = '\0';
char escapec = '\0';
if (cstate->opts.csv_mode)
{
quotec = cstate->opts.quote[0];
escapec = cstate->opts.escape[0];
/* ignore special escape processing if it's the same as quotec */
if (quotec == escapec)
escapec = '\0';
}
mblen_str[1] = '\0';
/*
* The objective of this loop is to transfer the entire next input line
* into line_buf. Hence, we only care for detecting newlines (\r and/or
* \n) and the end-of-copy marker (\.).
*
* In CSV mode, \r and \n inside a quoted field are just part of the data
* value and are put in line_buf. We keep just enough state to know if we
* are currently in a quoted field or not.
*
* These four characters, and the CSV escape and quote characters, are
* assumed the same in frontend and backend encodings.
*
* For speed, we try to move data from raw_buf to line_buf in chunks
* rather than one character at a time. raw_buf_ptr points to the next
* character to examine; any characters from raw_buf_index to raw_buf_ptr
* have been determined to be part of the line, but not yet transferred to
* line_buf.
*
* For a little extra speed within the loop, we copy raw_buf and
* raw_buf_len into local variables.
*/
copy_raw_buf = cstate->raw_buf;
raw_buf_ptr = cstate->raw_buf_index;
copy_buf_len = cstate->raw_buf_len;
for (;;)
{
int prev_raw_ptr;
char c;
/*
* Load more data if needed. Ideally we would just force four bytes
* of read-ahead and avoid the many calls to
* IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
* does not allow us to read too far ahead or we might read into the
* next data, so we read-ahead only as far we know we can. One
* optimization would be to read-ahead four byte here if
* cstate->copy_src != COPY_OLD_FE, but it hardly seems worth it,
* considering the size of the buffer.
*/
if (raw_buf_ptr >= copy_buf_len || need_data)
{
REFILL_LINEBUF;
/*
* Try to read some more data. This will certainly reset
* raw_buf_index to zero, and raw_buf_ptr must go with it.
*/
if (!CopyLoadRawBuf(cstate))
hit_eof = true;
raw_buf_ptr = 0;
copy_buf_len = cstate->raw_buf_len;
/*
* If we are completely out of data, break out of the loop,
* reporting EOF.
*/
if (copy_buf_len <= 0)
{
result = true;
break;
}
need_data = false;
}
/* OK to fetch a character */
prev_raw_ptr = raw_buf_ptr;
c = copy_raw_buf[raw_buf_ptr++];
if (cstate->opts.csv_mode)
{
/*
* If character is '\\' or '\r', we may need to look ahead below.
* Force fetch of the next character if we don't already have it.
* We need to do this before changing CSV state, in case one of
* these characters is also the quote or escape character.
*
* Note: old-protocol does not like forced prefetch, but it's OK
* here since we cannot validly be at EOF.
*/
if (c == '\\' || c == '\r')
{
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
}
/*
* Dealing with quotes and escapes here is mildly tricky. If the
* quote char is also the escape char, there's no problem - we
* just use the char as a toggle. If they are different, we need
* to ensure that we only take account of an escape inside a
* quoted field and immediately preceding a quote char, and not
* the second in an escape-escape sequence.
*/
if (in_quote && c == escapec)
last_was_esc = !last_was_esc;
if (c == quotec && !last_was_esc)
in_quote = !in_quote;
if (c != escapec)
last_was_esc = false;
/*
* Updating the line count for embedded CR and/or LF chars is
* necessarily a little fragile - this test is probably about the
* best we can do. (XXX it's arguable whether we should do this
* at all --- is cur_lineno a physical or logical count?)
*/
if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
cstate->cur_lineno++;
}
/* Process \r */
if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
{
/* Check for \r\n on first line, _and_ handle \r\n. */
if (cstate->eol_type == EOL_UNKNOWN ||
cstate->eol_type == EOL_CRNL)
{
/*
* If need more data, go back to loop top to load it.
*
* Note that if we are at EOF, c will wind up as '\0' because
* of the guaranteed pad of raw_buf.
*/
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* get next char */
c = copy_raw_buf[raw_buf_ptr];
if (c == '\n')
{
raw_buf_ptr++; /* eat newline */
cstate->eol_type = EOL_CRNL; /* in case not set yet */
}
else
{
/* found \r, but no \n */
if (cstate->eol_type == EOL_CRNL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ?
errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return.")));
/*
* if we got here, it is the first line and we didn't find
* \n, so don't consume the peeked character
*/
cstate->eol_type = EOL_CR;
}
}
else if (cstate->eol_type == EOL_NL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
errmsg("literal carriage return found in data") :
errmsg("unquoted carriage return found in data"),
!cstate->opts.csv_mode ?
errhint("Use \"\\r\" to represent carriage return.") :
errhint("Use quoted CSV field to represent carriage return.")));
/* If reach here, we have found the line terminator */
break;
}
/* Process \n */
if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
{
if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
!cstate->opts.csv_mode ?
errmsg("literal newline found in data") :
errmsg("unquoted newline found in data"),
!cstate->opts.csv_mode ?
errhint("Use \"\\n\" to represent newline.") :
errhint("Use quoted CSV field to represent newline.")));
cstate->eol_type = EOL_NL; /* in case not set yet */
/* If reach here, we have found the line terminator */
break;
}
/*
* In CSV mode, we only recognize \. alone on a line. This is because
* \. is a valid CSV data value.
*/
if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line))
{
char c2;
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
IF_NEED_REFILL_AND_EOF_BREAK(0);
/* -----
* get next character
* Note: we do not change c so if it isn't \., we can fall
* through and continue processing for file encoding.
* -----
*/
c2 = copy_raw_buf[raw_buf_ptr];
if (c2 == '.')
{
raw_buf_ptr++; /* consume the '.' */
/*
* Note: if we loop back for more data here, it does not
* matter that the CSV state change checks are re-executed; we
* will come back here with no important state changed.
*/
if (cstate->eol_type == EOL_CRNL)
{
/* Get the next character */
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* if hit_eof, c2 will become '\0' */
c2 = copy_raw_buf[raw_buf_ptr++];
if (c2 == '\n')
{
if (!cstate->opts.csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker does not match previous newline style")));
else
NO_END_OF_COPY_GOTO;
}
else if (c2 != '\r')
{
if (!cstate->opts.csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker corrupt")));
else
NO_END_OF_COPY_GOTO;
}
}
/* Get the next character */
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* if hit_eof, c2 will become '\0' */
c2 = copy_raw_buf[raw_buf_ptr++];
if (c2 != '\r' && c2 != '\n')
{
if (!cstate->opts.csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker corrupt")));
else
NO_END_OF_COPY_GOTO;
}
if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
(cstate->eol_type == EOL_CRNL && c2 != '\n') ||
(cstate->eol_type == EOL_CR && c2 != '\r'))
{
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker does not match previous newline style")));
}
/*
* Transfer only the data before the \. into line_buf, then
* discard the data and the \. sequence.
*/
if (prev_raw_ptr > cstate->raw_buf_index)
appendBinaryStringInfo(&cstate->line_buf,
cstate->raw_buf + cstate->raw_buf_index,
prev_raw_ptr - cstate->raw_buf_index);
cstate->raw_buf_index = raw_buf_ptr;
result = true; /* report EOF */
break;
}
else if (!cstate->opts.csv_mode)
/*
* If we are here, it means we found a backslash followed by
* something other than a period. In non-CSV mode, anything
* after a backslash is special, so we skip over that second
* character too. If we didn't do that \\. would be
* considered an eof-of copy, while in non-CSV mode it is a
* literal backslash followed by a period. In CSV mode,
* backslashes are not special, so we want to process the
* character after the backslash just like a normal character,
* so we don't increment in those cases.
*/
raw_buf_ptr++;
}
/*
* This label is for CSV cases where \. appears at the start of a
* line, but there is more text after it, meaning it was a data value.
* We are more strict for \. in CSV mode because \. could be a data
* value, while in non-CSV mode, \. cannot be a data value.
*/
not_end_of_copy:
/*
* Process all bytes of a multi-byte character as a group.
*
* We only support multi-byte sequences where the first byte has the
* high-bit set, so as an optimization we can avoid this block
* entirely if it is not set.
*/
if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
{
int mblen;
/*
* It is enough to look at the first byte in all our encodings, to
* get the length. (GB18030 is a bit special, but still works for
* our purposes; see comment in pg_gb18030_mblen())
*/
mblen_str[0] = c;
mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
raw_buf_ptr += mblen - 1;
}
first_char_in_line = false;
} /* end of outer loop */
/*
* Transfer any still-uncopied data to line_buf.
*/
REFILL_LINEBUF;
return result;
}
/*
* Return decimal value for a hexadecimal digit
*/
static int
GetDecimalFromHex(char hex)
{
if (isdigit((unsigned char) hex))
return hex - '0';
else
return tolower((unsigned char) hex) - 'a' + 10;
}
/*
* Parse the current line into separate attributes (fields),
* performing de-escaping as needed.
*
* The input is in line_buf. We use attribute_buf to hold the result
* strings. cstate->raw_fields[k] is set to point to the k'th attribute
* string, or NULL when the input matches the null marker string.
* This array is expanded as necessary.
*
* (Note that the caller cannot check for nulls since the returned
* string would be the post-de-escaping equivalent, which may look
* the same as some valid data string.)
*
* delim is the column delimiter string (must be just one byte for now).
* null_print is the null marker string. Note that this is compared to
* the pre-de-escaped input string.
*
* The return value is the number of fields actually read.
*/
static int
CopyReadAttributesText(CopyFromState cstate)
{
char delimc = cstate->opts.delim[0];
int fieldno;
char *output_ptr;
char *cur_ptr;
char *line_end_ptr;
/*
* We need a special case for zero-column tables: check that the input
* line is empty, and return.
*/
if (cstate->max_fields <= 0)
{
if (cstate->line_buf.len != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
return 0;
}
resetStringInfo(&cstate->attribute_buf);
/*
* The de-escaped attributes will certainly not be longer than the input
* data line, so we can just force attribute_buf to be large enough and
* then transfer data without any checks for enough space. We need to do
* it this way because enlarging attribute_buf mid-stream would invalidate
* pointers already stored into cstate->raw_fields[].
*/
if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
output_ptr = cstate->attribute_buf.data;
/* set pointer variables for loop */
cur_ptr = cstate->line_buf.data;
line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
/* Outer loop iterates over fields */
fieldno = 0;
for (;;)
{
bool found_delim = false;
char *start_ptr;
char *end_ptr;
int input_len;
bool saw_non_ascii = false;
/* Make sure there is enough space for the next value */
if (fieldno >= cstate->max_fields)
{
cstate->max_fields *= 2;
cstate->raw_fields =
repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
}
/* Remember start of field on both input and output sides */
start_ptr = cur_ptr;
cstate->raw_fields[fieldno] = output_ptr;
/*
* Scan data for field.
*
* Note that in this loop, we are scanning to locate the end of field
* and also speculatively performing de-escaping. Once we find the
* end-of-field, we can match the raw field contents against the null
* marker string. Only after that comparison fails do we know that
* de-escaping is actually the right thing to do; therefore we *must
* not* throw any syntax errors before we've done the null-marker
* check.
*/
for (;;)
{
char c;
end_ptr = cur_ptr;
if (cur_ptr >= line_end_ptr)
break;
c = *cur_ptr++;
if (c == delimc)
{
found_delim = true;
break;
}
if (c == '\\')
{
if (cur_ptr >= line_end_ptr)
break;
c = *cur_ptr++;
switch (c)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
{
/* handle \013 */
int val;
val = OCTVALUE(c);
if (cur_ptr < line_end_ptr)
{
c = *cur_ptr;
if (ISOCTAL(c))
{
cur_ptr++;
val = (val << 3) + OCTVALUE(c);
if (cur_ptr < line_end_ptr)
{
c = *cur_ptr;
if (ISOCTAL(c))
{
cur_ptr++;
val = (val << 3) + OCTVALUE(c);
}
}
}
}
c = val & 0377;
if (c == '\0' || IS_HIGHBIT_SET(c))
saw_non_ascii = true;
}
break;
case 'x':
/* Handle \x3F */
if (cur_ptr < line_end_ptr)
{
char hexchar = *cur_ptr;
if (isxdigit((unsigned char) hexchar))
{
int val = GetDecimalFromHex(hexchar);
cur_ptr++;
if (cur_ptr < line_end_ptr)
{
hexchar = *cur_ptr;
if (isxdigit((unsigned char) hexchar))
{
cur_ptr++;
val = (val << 4) + GetDecimalFromHex(hexchar);
}
}
c = val & 0xff;
if (c == '\0' || IS_HIGHBIT_SET(c))
saw_non_ascii = true;
}
}
break;
case 'b':
c = '\b';
break;
case 'f':
c = '\f';
break;
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case 'v':
c = '\v';
break;
/*
* in all other cases, take the char after '\'
* literally
*/
}
}
/* Add c to output string */
*output_ptr++ = c;
}
/* Check whether raw input matched null marker */
input_len = end_ptr - start_ptr;
if (input_len == cstate->opts.null_print_len &&
strncmp(start_ptr, cstate->opts.null_print, input_len) == 0)
cstate->raw_fields[fieldno] = NULL;
else
{
/*
* At this point we know the field is supposed to contain data.
*
* If we de-escaped any non-7-bit-ASCII chars, make sure the
* resulting string is valid data for the db encoding.
*/
if (saw_non_ascii)
{
char *fld = cstate->raw_fields[fieldno];
pg_verifymbstr(fld, output_ptr - fld, false);
}
}
/* Terminate attribute value in output area */
*output_ptr++ = '\0';
fieldno++;
/* Done if we hit EOL instead of a delim */
if (!found_delim)
break;
}
/* Clean up state of attribute_buf */
output_ptr--;
Assert(*output_ptr == '\0');
cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
return fieldno;
}
/*
* Parse the current line into separate attributes (fields),
* performing de-escaping as needed. This has exactly the same API as
* CopyReadAttributesText, except we parse the fields according to
* "standard" (i.e. common) CSV usage.
*/
static int
CopyReadAttributesCSV(CopyFromState cstate)
{
char delimc = cstate->opts.delim[0];
char quotec = cstate->opts.quote[0];
char escapec = cstate->opts.escape[0];
int fieldno;
char *output_ptr;
char *cur_ptr;
char *line_end_ptr;
/*
* We need a special case for zero-column tables: check that the input
* line is empty, and return.
*/
if (cstate->max_fields <= 0)
{
if (cstate->line_buf.len != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
return 0;
}
resetStringInfo(&cstate->attribute_buf);
/*
* The de-escaped attributes will certainly not be longer than the input
* data line, so we can just force attribute_buf to be large enough and
* then transfer data without any checks for enough space. We need to do
* it this way because enlarging attribute_buf mid-stream would invalidate
* pointers already stored into cstate->raw_fields[].
*/
if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
output_ptr = cstate->attribute_buf.data;
/* set pointer variables for loop */
cur_ptr = cstate->line_buf.data;
line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
/* Outer loop iterates over fields */
fieldno = 0;
for (;;)
{
bool found_delim = false;
bool saw_quote = false;
char *start_ptr;
char *end_ptr;
int input_len;
/* Make sure there is enough space for the next value */
if (fieldno >= cstate->max_fields)
{
cstate->max_fields *= 2;
cstate->raw_fields =
repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
}
/* Remember start of field on both input and output sides */
start_ptr = cur_ptr;
cstate->raw_fields[fieldno] = output_ptr;
/*
* Scan data for field,
*
* The loop starts in "not quote" mode and then toggles between that
* and "in quote" mode. The loop exits normally if it is in "not
* quote" mode and a delimiter or line end is seen.
*/
for (;;)
{
char c;
/* Not in quote */
for (;;)
{
end_ptr = cur_ptr;
if (cur_ptr >= line_end_ptr)
goto endfield;
c = *cur_ptr++;
/* unquoted field delimiter */
if (c == delimc)
{
found_delim = true;
goto endfield;
}
/* start of quoted field (or part of field) */
if (c == quotec)
{
saw_quote = true;
break;
}
/* Add c to output string */
*output_ptr++ = c;
}
/* In quote */
for (;;)
{
end_ptr = cur_ptr;
if (cur_ptr >= line_end_ptr)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unterminated CSV quoted field")));
c = *cur_ptr++;
/* escape within a quoted field */
if (c == escapec)
{
/*
* peek at the next char if available, and escape it if it
* is an escape char or a quote char
*/
if (cur_ptr < line_end_ptr)
{
char nextc = *cur_ptr;
if (nextc == escapec || nextc == quotec)
{
*output_ptr++ = nextc;
cur_ptr++;
continue;
}
}
}
/*
* end of quoted field. Must do this test after testing for
* escape in case quote char and escape char are the same
* (which is the common case).
*/
if (c == quotec)
break;
/* Add c to output string */
*output_ptr++ = c;
}
}
endfield:
/* Terminate attribute value in output area */
*output_ptr++ = '\0';
/* Check whether raw input matched null marker */
input_len = end_ptr - start_ptr;
if (!saw_quote && input_len == cstate->opts.null_print_len &&
strncmp(start_ptr, cstate->opts.null_print, input_len) == 0)
cstate->raw_fields[fieldno] = NULL;
fieldno++;
/* Done if we hit EOL instead of a delim */
if (!found_delim)
break;
}
/* Clean up state of attribute_buf */
output_ptr--;
Assert(*output_ptr == '\0');
cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
return fieldno;
}
/*
* Read a binary attribute
*/
static Datum
CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
Oid typioparam, int32 typmod,
bool *isnull)
{
int32 fld_size;
Datum result;
if (!CopyGetInt32(cstate, &fld_size))
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unexpected EOF in COPY data")));
if (fld_size == -1)
{
*isnull = true;
return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
}
if (fld_size < 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid field size")));
/* reset attribute_buf to empty, and load raw data in it */
resetStringInfo(&cstate->attribute_buf);
enlargeStringInfo(&cstate->attribute_buf, fld_size);
if (CopyReadBinaryData(cstate, cstate->attribute_buf.data,
fld_size) != fld_size)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unexpected EOF in COPY data")));
cstate->attribute_buf.len = fld_size;
cstate->attribute_buf.data[fld_size] = '\0';
/* Call the column type's binary input converter */
result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
typioparam, typmod);
/* Trouble if it didn't eat the whole buffer */
if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format")));
*isnull = false;
return result;
}