Do COPY FROM encoding conversion/verification in larger chunks.

This gives a small performance gain, by reducing the number of calls
to the conversion/verification function, and letting it work with
larger inputs. Also, reorganizing the input pipeline makes it easier
to parallelize the input parsing: after the input has been converted
to the database encoding, the next stage of finding the newlines can
be done in parallel, because there cannot be any newline chars
"embedded" in multi-byte characters in the encodings that we support
as server encodings.

This changes behavior in one corner case: if client and server
encodings are the same single-byte encoding (e.g. latin1), previously
the input would not be checked for zero bytes ('\0'). Any fields
containing zero bytes would be truncated at the zero. But if encoding
conversion was needed, the conversion routine would throw an error on
the zero. After this commit, the input is always checked for zeros.

Reviewed-by: John Naylor
Discussion: https://www.postgresql.org/message-id/e7861509-3960-538a-9025-b75a61188e01%40iki.fi
This commit is contained in:
Heikki Linnakangas 2021-04-01 12:23:40 +03:00
parent ea1b99a661
commit f82de5c46b
4 changed files with 503 additions and 185 deletions

View File

@ -3,6 +3,12 @@
* copyfrom.c
* COPY <table> FROM file/program/client
*
* This file contains routines needed to efficiently load tuples into a
* table. That includes looking up the correct partition, firing triggers,
* calling the table AM function to insert the data, and updating indexes.
* Reading data from the input file or client and parsing it into Datums
* is handled in copyfromparse.c.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@ -23,6 +29,7 @@
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
@ -87,7 +94,7 @@ typedef struct CopyMultiInsertInfo
List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
int bufferedTuples; /* number of tuples buffered over all buffers */
int bufferedBytes; /* number of bytes from all buffered tuples */
CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
EState *estate; /* Executor state used for COPY */
CommandId mycid; /* Command Id used for COPY */
int ti_options; /* table insert options */
@ -107,7 +114,7 @@ static void ClosePipeFromProgram(CopyFromState cstate);
void
CopyFromErrorCallback(void *arg)
{
CopyFromState cstate = (CopyFromState) arg;
CopyFromState cstate = (CopyFromState) arg;
char curlineno_str[32];
snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
@ -149,15 +156,9 @@ CopyFromErrorCallback(void *arg)
/*
* Error is relevant to a particular line.
*
* If line_buf still contains the correct line, and it's already
* transcoded, print it. If it's still in a foreign encoding, it's
* quite likely that the error is precisely a failure to do
* encoding conversion (ie, bad data). We dare not try to convert
* it, and at present there's no way to regurgitate it without
* conversion. So we have to punt and just report the line number.
* If line_buf still contains the correct line, print it.
*/
if (cstate->line_buf_valid &&
(cstate->line_buf_converted || !cstate->need_transcoding))
if (cstate->line_buf_valid)
{
char *lineval;
@ -300,7 +301,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
MemoryContext oldcontext;
int i;
uint64 save_cur_lineno;
CopyFromState cstate = miinfo->cstate;
CopyFromState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
@ -1191,7 +1192,7 @@ BeginCopyFrom(ParseState *pstate,
List *attnamelist,
List *options)
{
CopyFromState cstate;
CopyFromState cstate;
bool pipe = (filename == NULL);
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
@ -1229,7 +1230,7 @@ BeginCopyFrom(ParseState *pstate,
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */, options);
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
/* Process the target relation */
cstate->rel = rel;
@ -1320,15 +1321,20 @@ BeginCopyFrom(ParseState *pstate,
cstate->file_encoding = cstate->opts.file_encoding;
/*
* Set up encoding conversion info. Even if the file and server encodings
* are the same, we must apply pg_any_to_server() to validate data in
* multibyte encodings.
* Look up encoding conversion function.
*/
cstate->need_transcoding =
(cstate->file_encoding != GetDatabaseEncoding() ||
pg_database_encoding_max_length() > 1);
/* See Multibyte encoding comment above */
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
if (cstate->file_encoding == GetDatabaseEncoding() ||
cstate->file_encoding == PG_SQL_ASCII ||
GetDatabaseEncoding() == PG_SQL_ASCII)
{
cstate->need_transcoding = false;
}
else
{
cstate->need_transcoding = true;
cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
GetDatabaseEncoding());
}
cstate->copy_src = COPY_FILE; /* default */
@ -1339,7 +1345,6 @@ BeginCopyFrom(ParseState *pstate,
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Initialize state variables */
cstate->reached_eof = false;
cstate->eol_type = EOL_UNKNOWN;
cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
@ -1347,19 +1352,36 @@ BeginCopyFrom(ParseState *pstate,
cstate->cur_attval = NULL;
/*
* Set up variables to avoid per-attribute overhead. attribute_buf and
* raw_buf are used in both text and binary modes, but we use line_buf
* only in text mode.
* Allocate buffers for the input pipeline.
*
* attribute_buf and raw_buf are used in both text and binary modes, but
* input_buf and line_buf only in text mode.
*/
initStringInfo(&cstate->attribute_buf);
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->raw_reached_eof = false;
if (!cstate->opts.binary)
{
/*
* If encoding conversion is needed, we need another buffer to hold
* the converted input data. Otherwise, we can just point input_buf
* to the same buffer as raw_buf.
*/
if (cstate->need_transcoding)
{
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
cstate->input_buf_index = cstate->input_buf_len = 0;
}
else
cstate->input_buf = cstate->raw_buf;
cstate->input_reached_eof = false;
initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
}
initStringInfo(&cstate->attribute_buf);
/* Assign range table, we'll need it in CopyFrom. */
if (pstate)
cstate->range_table = pstate->p_rtable;
@ -1584,7 +1606,7 @@ ClosePipeFromProgram(CopyFromState cstate)
* should not report that as an error. Otherwise, SIGPIPE indicates a
* problem.
*/
if (!cstate->reached_eof &&
if (!cstate->raw_reached_eof &&
wait_result_is_signal(pclose_rc, SIGPIPE))
return;

View File

@ -3,6 +3,50 @@
* copyfromparse.c
* Parse CSV/text/binary format for COPY FROM.
*
* This file contains routines to parse the text, CSV and binary input
* formats. The main entry point is NextCopyFrom(), which parses the
* next input line and returns it as Datums.
*
* In text/CSV mode, the parsing happens in multiple stages:
*
* [data source] --> raw_buf --> input_buf --> line_buf --> attribute_buf
* 1. 2. 3. 4.
*
* 1. CopyLoadRawBuf() reads raw data from the input file or client, and
* places it into 'raw_buf'.
*
* 2. CopyConvertBuf() calls the encoding conversion function to convert
* the data in 'raw_buf' from client to server encoding, placing the
* converted result in 'input_buf'.
*
* 3. CopyReadLine() parses the data in 'input_buf', one line at a time.
* It is responsible for finding the next newline marker, taking quote and
* escape characters into account according to the COPY options. The line
* is copied into 'line_buf', with quotes and escape characters still
* intact.
*
* 4. CopyReadAttributesText/CSV() function takes the input line from
* 'line_buf', and splits it into fields, unescaping the data as required.
* The fields are stored in 'attribute_buf', and 'raw_fields' array holds
* pointers to each field.
*
* If encoding conversion is not required, a shortcut is taken in step 2 to
* avoid copying the data unnecessarily. The 'input_buf' pointer is set to
* point directly to 'raw_buf', so that CopyLoadRawBuf() loads the raw data
* directly into 'input_buf'. CopyConvertBuf() then merely validates that
* the data is valid in the current encoding.
*
* In binary mode, the pipeline is much simpler. Input is loaded into
* into 'raw_buf', and encoding conversion is done in the datatype-specific
* receive functions, if required. 'input_buf' and 'line_buf' are not used,
* but 'attribute_buf' is used as a temporary buffer to hold one attribute's
* data when it's passed the receive function.
*
* 'raw_buf' is always 64 kB in size (RAW_BUF_SIZE). 'input_buf' is also
* 64 kB (INPUT_BUF_SIZE), if encoding conversion is required. 'line_buf'
* and 'attribute_buf' are expanded on demand, to hold the longest line
* encountered so far.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@ -35,7 +79,7 @@
#define OCTVALUE(c) ((c) - '0')
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
* These macros centralize code used to process line_buf and input_buf buffers.
* They are macros because they often do continue/break control and to avoid
* function call overhead in tight COPY loops.
*
@ -53,9 +97,9 @@
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
if (1) \
{ \
if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
if (input_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
{ \
raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
input_buf_ptr = prev_raw_ptr; /* undo fetch */ \
need_data = true; \
continue; \
} \
@ -65,10 +109,10 @@ if (1) \
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
if (1) \
{ \
if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
if (input_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
{ \
if (extralen) \
raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
input_buf_ptr = copy_buf_len; /* consume the partial character */ \
/* backslash just before EOF, treat as data char */ \
result = true; \
break; \
@ -77,17 +121,17 @@ if (1) \
/*
* Transfer any approved data to line_buf; must do this to be sure
* there is some room in raw_buf.
* there is some room in input_buf.
*/
#define REFILL_LINEBUF \
if (1) \
{ \
if (raw_buf_ptr > cstate->raw_buf_index) \
if (input_buf_ptr > cstate->input_buf_index) \
{ \
appendBinaryStringInfo(&cstate->line_buf, \
cstate->raw_buf + cstate->raw_buf_index, \
raw_buf_ptr - cstate->raw_buf_index); \
cstate->raw_buf_index = raw_buf_ptr; \
cstate->input_buf + cstate->input_buf_index, \
input_buf_ptr - cstate->input_buf_index); \
cstate->input_buf_index = input_buf_ptr; \
} \
} else ((void) 0)
@ -95,7 +139,7 @@ if (1) \
#define NO_END_OF_COPY_GOTO \
if (1) \
{ \
raw_buf_ptr = prev_raw_ptr + 1; \
input_buf_ptr = prev_raw_ptr + 1; \
goto not_end_of_copy; \
} else ((void) 0)
@ -118,7 +162,7 @@ static int CopyGetData(CopyFromState cstate, void *databuf,
int minread, int maxread);
static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
static bool CopyLoadRawBuf(CopyFromState cstate);
static void CopyLoadInputBuf(CopyFromState cstate);
static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
void
@ -210,10 +254,10 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
(errcode_for_file_access(),
errmsg("could not read from COPY file: %m")));
if (bytesread == 0)
cstate->reached_eof = true;
cstate->raw_reached_eof = true;
break;
case COPY_FRONTEND:
while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
{
int avail;
@ -241,7 +285,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
break;
case 'c': /* CopyDone */
/* COPY IN correctly terminated by frontend */
cstate->reached_eof = true;
cstate->raw_reached_eof = true;
return bytesread;
case 'f': /* CopyFail */
ereport(ERROR,
@ -327,34 +371,303 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
/*
* CopyLoadRawBuf loads some more data into raw_buf
* Perform encoding conversion on data in 'raw_buf', writing the converted
* data into 'input_buf'.
*
* Returns true if able to obtain at least one more byte, else false.
*
* If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start
* of the buffer and then we load more data after that. This case occurs only
* when a multibyte character crosses a bufferload boundary.
* On entry, there must be some data to convert in 'raw_buf'.
*/
static bool
static void
CopyConvertBuf(CopyFromState cstate)
{
/*
* If the file and server encoding are the same, no encoding conversion is
* required. However, we still need to verify that the input is valid for
* the encoding.
*/
if (!cstate->need_transcoding)
{
/*
* When conversion is not required, input_buf and raw_buf are the
* same. raw_buf_len is the total number of bytes in the buffer, and
* input_buf_len tracks how many of those bytes have already been
* verified.
*/
int preverifiedlen = cstate->input_buf_len;
int unverifiedlen = cstate->raw_buf_len - cstate->input_buf_len;
int nverified;
if (unverifiedlen == 0)
{
/*
* If no more raw data is coming, report the EOF to the caller.
*/
if (cstate->raw_reached_eof)
cstate->input_reached_eof = true;
return;
}
/*
* Verify the new data, including any residual unverified bytes from
* previous round.
*/
nverified = pg_encoding_verifymbstr(cstate->file_encoding,
cstate->raw_buf + preverifiedlen,
unverifiedlen);
if (nverified == 0)
{
/*
* Could not verify anything.
*
* If there is no more raw input data coming, it means that there
* was an incomplete multi-byte sequence at the end. Also, if
* there's "enough" input left, we should be able to verify at
* least one character, and a failure to do so means that we've
* hit an invalid byte sequence.
*/
if (cstate->raw_reached_eof || unverifiedlen >= pg_database_encoding_max_length())
cstate->input_reached_error = true;
return;
}
cstate->input_buf_len += nverified;
}
else
{
/*
* Encoding conversion is needed.
*/
int nbytes;
unsigned char *src;
int srclen;
unsigned char *dst;
int dstlen;
int convertedlen;
if (RAW_BUF_BYTES(cstate) == 0)
{
/*
* If no more raw data is coming, report the EOF to the caller.
*/
if (cstate->raw_reached_eof)
cstate->input_reached_eof = true;
return;
}
/*
* First, copy down any unprocessed data.
*/
nbytes = INPUT_BUF_BYTES(cstate);
if (nbytes > 0 && cstate->input_buf_index > 0)
memmove(cstate->input_buf, cstate->input_buf + cstate->input_buf_index,
nbytes);
cstate->input_buf_index = 0;
cstate->input_buf_len = nbytes;
cstate->input_buf[nbytes] = '\0';
src = (unsigned char *) cstate->raw_buf + cstate->raw_buf_index;
srclen = cstate->raw_buf_len - cstate->raw_buf_index;
dst = (unsigned char *) cstate->input_buf + cstate->input_buf_len;
dstlen = INPUT_BUF_SIZE - cstate->input_buf_len + 1;
/*
* Do the conversion. This might stop short, if there is an invalid
* byte sequence in the input. We'll convert as much as we can in
* that case.
*
* Note: Even if we hit an invalid byte sequence, we don't report the
* error until all the valid bytes have been consumed. The input
* might contain an end-of-input marker (\.), and we don't want to
* report an error if the invalid byte sequence is after the
* end-of-input marker. We might unnecessarily convert some data
* after the end-of-input marker as long as it's valid for the
* encoding, but that's harmless.
*/
convertedlen = pg_do_encoding_conversion_buf(cstate->conversion_proc,
cstate->file_encoding,
GetDatabaseEncoding(),
src, srclen,
dst, dstlen,
true);
if (convertedlen == 0)
{
/*
* Could not convert anything. If there is no more raw input data
* coming, it means that there was an incomplete multi-byte
* sequence at the end. Also, if there is plenty of input left,
* we should be able to convert at least one character, so a
* failure to do so must mean that we've hit a byte sequence
* that's invalid.
*/
if (cstate->raw_reached_eof || srclen >= MAX_CONVERSION_INPUT_LENGTH)
cstate->input_reached_error = true;
return;
}
cstate->raw_buf_index += convertedlen;
cstate->input_buf_len += strlen((char *) dst);
}
}
/*
* Report an encoding or conversion error.
*/
static void
CopyConversionError(CopyFromState cstate)
{
Assert(cstate->raw_buf_len > 0);
Assert(cstate->input_reached_error);
if (!cstate->need_transcoding)
{
/*
* Everything up to input_buf_len was successfully verified, and
* input_buf_len points to the invalid or incomplete character.
*/
report_invalid_encoding(cstate->file_encoding,
cstate->raw_buf + cstate->input_buf_len,
cstate->raw_buf_len - cstate->input_buf_len);
}
else
{
/*
* raw_buf_index points to the invalid or untranslatable character. We
* let the conversion routine report the error, because it can provide
* a more specific error message than we could here. An earlier call
* to the conversion routine in CopyConvertBuf() detected that there
* is an error, now we call the conversion routine again with
* noError=false, to have it throw the error.
*/
unsigned char *src;
int srclen;
unsigned char *dst;
int dstlen;
src = (unsigned char *) cstate->raw_buf + cstate->raw_buf_index;
srclen = cstate->raw_buf_len - cstate->raw_buf_index;
dst = (unsigned char *) cstate->input_buf + cstate->input_buf_len;
dstlen = INPUT_BUF_SIZE - cstate->input_buf_len + 1;
(void) pg_do_encoding_conversion_buf(cstate->conversion_proc,
cstate->file_encoding,
GetDatabaseEncoding(),
src, srclen,
dst, dstlen,
false);
/*
* The conversion routine should have reported an error, so this
* should not be reached.
*/
elog(ERROR, "encoding conversion failed without error");
}
}
/*
* Load more data from data source to raw_buf.
*
* If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the
* beginning of the buffer, and we load new data after that.
*/
static void
CopyLoadRawBuf(CopyFromState cstate)
{
int nbytes = RAW_BUF_BYTES(cstate);
int nbytes;
int inbytes;
/* Copy down the unprocessed data if any. */
if (nbytes > 0)
/*
* In text mode, if encoding conversion is not required, raw_buf and
* input_buf point to the same buffer. Their len/index better agree, too.
*/
if (cstate->raw_buf == cstate->input_buf)
{
Assert(!cstate->need_transcoding);
Assert(cstate->raw_buf_index == cstate->input_buf_index);
Assert(cstate->input_buf_len <= cstate->raw_buf_len);
}
/*
* Copy down the unprocessed data if any.
*/
nbytes = RAW_BUF_BYTES(cstate);
if (nbytes > 0 && cstate->raw_buf_index > 0)
memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
nbytes);
cstate->raw_buf_len -= cstate->raw_buf_index;
cstate->raw_buf_index = 0;
inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
1, RAW_BUF_SIZE - nbytes);
/*
* If raw_buf and input_buf are in fact the same buffer, adjust the
* input_buf variables, too.
*/
if (cstate->raw_buf == cstate->input_buf)
{
cstate->input_buf_len -= cstate->input_buf_index;
cstate->input_buf_index = 0;
}
/* Load more data */
inbytes = CopyGetData(cstate, cstate->raw_buf + cstate->raw_buf_len,
1, RAW_BUF_SIZE - cstate->raw_buf_len);
nbytes += inbytes;
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes;
cstate->bytes_processed += inbytes;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
return (inbytes > 0);
if (inbytes == 0)
cstate->raw_reached_eof = true;
}
/*
* CopyLoadInputBuf loads some more data into input_buf
*
* On return, at least one more input character is loaded into
* input_buf, or input_reached_eof is set.
*
* If INPUT_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start
* of the buffer and then we load more data after that.
*/
static void
CopyLoadInputBuf(CopyFromState cstate)
{
int nbytes = INPUT_BUF_BYTES(cstate);
/*
* The caller has updated input_buf_index to indicate how much of the
* input has been consumed and isn't needed anymore. If input_buf is the
* same physical area as raw_buf, update raw_buf_index accordingly.
*/
if (cstate->raw_buf == cstate->input_buf)
{
Assert(!cstate->need_transcoding);
Assert(cstate->input_buf_index >= cstate->raw_buf_index);
cstate->raw_buf_index = cstate->input_buf_index;
}
for (;;)
{
/* If we now have some unconverted data, try to convert it */
CopyConvertBuf(cstate);
/* If we now have some more input bytes ready, return them */
if (INPUT_BUF_BYTES(cstate) > nbytes)
return;
/*
* If we reached an invalid byte sequence, or we're at an incomplete
* multi-byte character but there is no more raw input data, report
* conversion error.
*/
if (cstate->input_reached_error)
CopyConversionError(cstate);
/* no more input, and everything has been converted */
if (cstate->input_reached_eof)
break;
/* Try to load more raw data */
Assert(!cstate->raw_reached_eof);
CopyLoadRawBuf(cstate);
}
}
/*
@ -389,7 +702,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
/* Load more data if buffer is empty. */
if (RAW_BUF_BYTES(cstate) == 0)
{
if (!CopyLoadRawBuf(cstate))
CopyLoadRawBuf(cstate);
if (cstate->raw_reached_eof)
break; /* EOF */
}
@ -645,8 +959,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
}
/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
* Read the next input line and stash it in line_buf.
*
* Result is true if read was terminated by EOF, false if terminated
* by newline. The terminating newline or EOF marker is not included
@ -658,10 +971,7 @@ CopyReadLine(CopyFromState cstate)
bool result;
resetStringInfo(&cstate->line_buf);
cstate->line_buf_valid = true;
/* Mark that encoding conversion hasn't occurred yet */
cstate->line_buf_converted = false;
cstate->line_buf_valid = false;
/* Parse data and transfer into line_buf */
result = CopyReadLineText(cstate);
@ -675,10 +985,17 @@ CopyReadLine(CopyFromState cstate)
*/
if (cstate->copy_src == COPY_FRONTEND)
{
int inbytes;
do
{
cstate->raw_buf_index = cstate->raw_buf_len;
} while (CopyLoadRawBuf(cstate));
inbytes = CopyGetData(cstate, cstate->input_buf,
1, INPUT_BUF_SIZE);
} while (inbytes > 0);
cstate->input_buf_index = 0;
cstate->input_buf_len = 0;
cstate->raw_buf_index = 0;
cstate->raw_buf_len = 0;
}
}
else
@ -715,25 +1032,8 @@ CopyReadLine(CopyFromState cstate)
}
}
/* Done reading the line. Convert it to server encoding. */
if (cstate->need_transcoding)
{
char *cvt;
cvt = pg_any_to_server(cstate->line_buf.data,
cstate->line_buf.len,
cstate->file_encoding);
if (cvt != cstate->line_buf.data)
{
/* transfer converted data back to line_buf */
resetStringInfo(&cstate->line_buf);
appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
pfree(cvt);
}
}
/* Now it's safe to use the buffer in error messages */
cstate->line_buf_converted = true;
cstate->line_buf_valid = true;
return result;
}
@ -744,13 +1044,12 @@ CopyReadLine(CopyFromState cstate)
static bool
CopyReadLineText(CopyFromState cstate)
{
char *copy_raw_buf;
int raw_buf_ptr;
char *copy_input_buf;
int input_buf_ptr;
int copy_buf_len;
bool need_data = false;
bool hit_eof = false;
bool result = false;
char mblen_str[2];
/* CSV variables */
bool first_char_in_line = true;
@ -768,8 +1067,6 @@ CopyReadLineText(CopyFromState cstate)
escapec = '\0';
}
mblen_str[1] = '\0';
/*
* The objective of this loop is to transfer the entire next input line
* into line_buf. Hence, we only care for detecting newlines (\r and/or
@ -782,18 +1079,25 @@ CopyReadLineText(CopyFromState cstate)
* These four characters, and the CSV escape and quote characters, are
* assumed the same in frontend and backend encodings.
*
* For speed, we try to move data from raw_buf to line_buf in chunks
* rather than one character at a time. raw_buf_ptr points to the next
* character to examine; any characters from raw_buf_index to raw_buf_ptr
* have been determined to be part of the line, but not yet transferred to
* line_buf.
* The input has already been converted to the database encoding. All
* supported server encodings have the property that all bytes in a
* multi-byte sequence have the high bit set, so a multibyte character
* cannot contain any newline or escape characters embedded in the
* multibyte sequence. Therefore, we can process the input byte-by-byte,
* regardless of the encoding.
*
* For a little extra speed within the loop, we copy raw_buf and
* raw_buf_len into local variables.
* For speed, we try to move data from input_buf to line_buf in chunks
* rather than one character at a time. input_buf_ptr points to the next
* character to examine; any characters from input_buf_index to
* input_buf_ptr have been determined to be part of the line, but not yet
* transferred to line_buf.
*
* For a little extra speed within the loop, we copy input_buf and
* input_buf_len into local variables.
*/
copy_raw_buf = cstate->raw_buf;
raw_buf_ptr = cstate->raw_buf_index;
copy_buf_len = cstate->raw_buf_len;
copy_input_buf = cstate->input_buf;
input_buf_ptr = cstate->input_buf_index;
copy_buf_len = cstate->input_buf_len;
for (;;)
{
@ -810,24 +1114,21 @@ CopyReadLineText(CopyFromState cstate)
* cstate->copy_src != COPY_OLD_FE, but it hardly seems worth it,
* considering the size of the buffer.
*/
if (raw_buf_ptr >= copy_buf_len || need_data)
if (input_buf_ptr >= copy_buf_len || need_data)
{
REFILL_LINEBUF;
/*
* Try to read some more data. This will certainly reset
* raw_buf_index to zero, and raw_buf_ptr must go with it.
*/
if (!CopyLoadRawBuf(cstate))
hit_eof = true;
raw_buf_ptr = 0;
copy_buf_len = cstate->raw_buf_len;
CopyLoadInputBuf(cstate);
/* update our local variables */
hit_eof = cstate->input_reached_eof;
input_buf_ptr = cstate->input_buf_index;
copy_buf_len = cstate->input_buf_len;
/*
* If we are completely out of data, break out of the loop,
* reporting EOF.
*/
if (copy_buf_len <= 0)
if (INPUT_BUF_BYTES(cstate) <= 0)
{
result = true;
break;
@ -836,8 +1137,8 @@ CopyReadLineText(CopyFromState cstate)
}
/* OK to fetch a character */
prev_raw_ptr = raw_buf_ptr;
c = copy_raw_buf[raw_buf_ptr++];
prev_raw_ptr = input_buf_ptr;
c = copy_input_buf[input_buf_ptr++];
if (cstate->opts.csv_mode)
{
@ -891,16 +1192,16 @@ CopyReadLineText(CopyFromState cstate)
* If need more data, go back to loop top to load it.
*
* Note that if we are at EOF, c will wind up as '\0' because
* of the guaranteed pad of raw_buf.
* of the guaranteed pad of input_buf.
*/
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* get next char */
c = copy_raw_buf[raw_buf_ptr];
c = copy_input_buf[input_buf_ptr];
if (c == '\n')
{
raw_buf_ptr++; /* eat newline */
input_buf_ptr++; /* eat newline */
cstate->eol_type = EOL_CRNL; /* in case not set yet */
}
else
@ -967,14 +1268,14 @@ CopyReadLineText(CopyFromState cstate)
/* -----
* get next character
* Note: we do not change c so if it isn't \., we can fall
* through and continue processing for file encoding.
* through and continue processing.
* -----
*/
c2 = copy_raw_buf[raw_buf_ptr];
c2 = copy_input_buf[input_buf_ptr];
if (c2 == '.')
{
raw_buf_ptr++; /* consume the '.' */
input_buf_ptr++; /* consume the '.' */
/*
* Note: if we loop back for more data here, it does not
@ -986,7 +1287,7 @@ CopyReadLineText(CopyFromState cstate)
/* Get the next character */
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* if hit_eof, c2 will become '\0' */
c2 = copy_raw_buf[raw_buf_ptr++];
c2 = copy_input_buf[input_buf_ptr++];
if (c2 == '\n')
{
@ -1011,7 +1312,7 @@ CopyReadLineText(CopyFromState cstate)
/* Get the next character */
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
/* if hit_eof, c2 will become '\0' */
c2 = copy_raw_buf[raw_buf_ptr++];
c2 = copy_input_buf[input_buf_ptr++];
if (c2 != '\r' && c2 != '\n')
{
@ -1036,11 +1337,11 @@ CopyReadLineText(CopyFromState cstate)
* Transfer only the data before the \. into line_buf, then
* discard the data and the \. sequence.
*/
if (prev_raw_ptr > cstate->raw_buf_index)
if (prev_raw_ptr > cstate->input_buf_index)
appendBinaryStringInfo(&cstate->line_buf,
cstate->raw_buf + cstate->raw_buf_index,
prev_raw_ptr - cstate->raw_buf_index);
cstate->raw_buf_index = raw_buf_ptr;
cstate->input_buf + cstate->input_buf_index,
prev_raw_ptr - cstate->input_buf_index);
cstate->input_buf_index = input_buf_ptr;
result = true; /* report EOF */
break;
}
@ -1056,15 +1357,8 @@ CopyReadLineText(CopyFromState cstate)
* backslashes are not special, so we want to process the
* character after the backslash just like a normal character,
* so we don't increment in those cases.
*
* Set 'c' to skip whole character correctly in multi-byte
* encodings. If we don't have the whole character in the
* buffer yet, we might loop back to process it, after all,
* but that's OK because multi-byte characters cannot have any
* special meaning.
*/
raw_buf_ptr++;
c = c2;
input_buf_ptr++;
}
}
@ -1075,30 +1369,6 @@ CopyReadLineText(CopyFromState cstate)
* value, while in non-CSV mode, \. cannot be a data value.
*/
not_end_of_copy:
/*
* Process all bytes of a multi-byte character as a group.
*
* We only support multi-byte sequences where the first byte has the
* high-bit set, so as an optimization we can avoid this block
* entirely if it is not set.
*/
if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
{
int mblen;
/*
* It is enough to look at the first byte in all our encodings, to
* get the length. (GB18030 is a bit special, but still works for
* our purposes; see comment in pg_gb18030_mblen())
*/
mblen_str[0] = c;
mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
raw_buf_ptr += mblen - 1;
}
first_char_in_line = false;
} /* end of outer loop */

View File

@ -52,17 +52,6 @@ typedef enum CopyInsertMethod
/*
* This struct contains all the state variables used throughout a COPY FROM
* operation.
*
* Multi-byte encodings: all supported client-side encodings encode multi-byte
* characters by having the first byte's high bit set. Subsequent bytes of the
* character can have the high bit not set. When scanning data in such an
* encoding to look for a match to a single-byte (ie ASCII) character, we must
* use the full pg_encoding_mblen() machinery to skip over multibyte
* characters, else we might find a false match to a trailing byte. In
* supported server encodings, there is no possibility of a false match, and
* it's faster to make useless comparisons to trailing bytes than it is to
* invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
* when we have to do it the hard way.
*/
typedef struct CopyFromStateData
{
@ -70,13 +59,11 @@ typedef struct CopyFromStateData
CopySource copy_src; /* type of copy source */
FILE *copy_file; /* used if copy_src == COPY_FILE */
StringInfo fe_msgbuf; /* used if copy_src == COPY_NEW_FE */
bool reached_eof; /* true if we read to end of copy data (not
* all copy_src types maintain this) */
EolType eol_type; /* EOL type of input */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
Oid conversion_proc; /* encoding conversion function */
/* parameters from the COPY command */
Relation rel; /* relation to copy from */
@ -131,31 +118,52 @@ typedef struct CopyFromStateData
/*
* Similarly, line_buf holds the whole input line being processed. The
* input cycle is first to read the whole line into line_buf, convert it
* to server encoding there, and then extract the individual attribute
* fields into attribute_buf. line_buf is preserved unmodified so that we
* can display it in error messages if appropriate. (In binary mode,
* line_buf is not used.)
* input cycle is first to read the whole line into line_buf, and then
* extract the individual attribute fields into attribute_buf. line_buf
* is preserved unmodified so that we can display it in error messages if
* appropriate. (In binary mode, line_buf is not used.)
*/
StringInfoData line_buf;
bool line_buf_converted; /* converted to server encoding? */
bool line_buf_valid; /* contains the row being processed? */
/*
* Finally, raw_buf holds raw data read from the data source (file or
* client connection). In text mode, CopyReadLine parses this data
* sufficiently to locate line boundaries, then transfers the data to
* line_buf and converts it. In binary mode, CopyReadBinaryData fetches
* appropriate amounts of data from this buffer. In both modes, we
* guarantee that there is a \0 at raw_buf[raw_buf_len].
* input_buf holds input data, already converted to database encoding.
*
* In text mode, CopyReadLine parses this data sufficiently to locate
* line boundaries, then transfers the data to line_buf. We guarantee
* that there is a \0 at input_buf[input_buf_len] at all times. (In
* binary mode, input_buf is not used.)
*
* If encoding conversion is not required, input_buf is not a separate
* buffer but points directly to raw_buf. In that case, input_buf_len
* tracks the number of bytes that have been verified as valid in the
* database encoding, and raw_buf_len is the total number of bytes
* stored in the buffer.
*/
#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */
char *input_buf;
int input_buf_index; /* next byte to process */
int input_buf_len; /* total # of bytes stored */
bool input_reached_eof; /* true if we reached EOF */
bool input_reached_error; /* true if a conversion error happened */
/* Shorthand for number of unconsumed bytes available in input_buf */
#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
/*
* raw_buf holds raw input data read from the data source (file or client
* connection), not yet converted to the database encoding. Like with
* 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
*/
#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
uint64 bytes_processed;/* number of bytes processed so far */
bool raw_reached_eof; /* true if we reached EOF */
/* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
uint64 bytes_processed; /* number of bytes processed so far */
} CopyFromStateData;
extern void ReceiveCopyBegin(CopyFromState cstate);

View File

@ -306,15 +306,33 @@ typedef enum pg_enc
/*
* When converting strings between different encodings, we assume that space
* for converted result is 4-to-1 growth in the worst case. The rate for
* for converted result is 4-to-1 growth in the worst case. The rate for
* currently supported encoding pairs are within 3 (SJIS JIS X0201 half width
* kanna -> UTF8 is the worst case). So "4" should be enough for the moment.
* kana -> UTF8 is the worst case). So "4" should be enough for the moment.
*
* Note that this is not the same as the maximum character width in any
* particular encoding.
*/
#define MAX_CONVERSION_GROWTH 4
/*
* Maximum byte length of a string that's required in any encoding to convert
* at least one character to any other encoding. In other words, if you feed
* MAX_CONVERSION_INPUT_LENGTH bytes to any encoding conversion function, it
* is guaranteed to be able to convert something without needing more input
* (assuming the input is valid).
*
* Currently, the maximum case is the conversion UTF8 -> SJIS JIS X0201 half
* width kana, where a pair of UTF-8 characters is converted into a single
* SHIFT_JIS_2004 character (the reverse of the worst case for
* MAX_CONVERSION_GROWTH). It needs 6 bytes of input. In theory, a
* user-defined conversion function might have more complicated cases, although
* for the reverse mapping you would probably also need to bump up
* MAX_CONVERSION_GROWTH. But there is no need to be stingy here, so make it
* generous.
*/
#define MAX_CONVERSION_INPUT_LENGTH 16
/*
* Maximum byte length of the string equivalent to any one Unicode code point,
* in any backend encoding. The current value assumes that a 4-byte UTF-8