postgresql/src/backend/commands/copy.c

2799 lines
71 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* copy.c
This patch implements holdable cursors, following the proposal (materialization into a tuple store) discussed on pgsql-hackers earlier. I've updated the documentation and the regression tests. Notes on the implementation: - I needed to change the tuple store API slightly -- it assumes that it won't be used to hold data across transaction boundaries, so the temp files that it uses for on-disk storage are automatically reclaimed at end-of-transaction. I added a flag to tuplestore_begin_heap() to control this behavior. Is changing the tuple store API in this fashion OK? - in order to store executor results in a tuple store, I added a new CommandDest. This works well for the most part, with one exception: the current DestFunction API doesn't provide enough information to allow the Executor to store results into an arbitrary tuple store (where the particular tuple store to use is chosen by the call site of ExecutorRun). To workaround this, I've temporarily hacked up a solution that works, but is not ideal: since the receiveTuple DestFunction is passed the portal name, we can use that to lookup the Portal data structure for the cursor and then use that to get at the tuple store the Portal is using. This unnecessarily ties the Portal code with the tupleReceiver code, but it works... The proper fix for this is probably to change the DestFunction API -- Tom suggested passing the full QueryDesc to the receiveTuple function. In that case, callers of ExecutorRun could "subclass" QueryDesc to add any additional fields that their particular CommandDest needed to get access to. This approach would work, but I'd like to think about it for a little bit longer before deciding which route to go. In the mean time, the code works fine, so I don't think a fix is urgent. - (semi-related) I added a NO SCROLL keyword to DECLARE CURSOR, and adjusted the behavior of SCROLL in accordance with the discussion on -hackers. - (unrelated) Cleaned up some SGML markup in sql.sgml, copy.sgml Neil Conway
2003-03-27 17:51:29 +01:00
* Implements the COPY utility command.
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.246 2005/06/28 05:08:53 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
1996-11-06 09:21:43 +01:00
#include <unistd.h>
1999-07-16 07:00:38 +02:00
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
1996-11-06 09:21:43 +01:00
#include "access/genam.h"
1999-07-16 07:00:38 +02:00
#include "access/heapam.h"
#include "access/printtup.h"
1999-07-16 07:00:38 +02:00
#include "catalog/index.h"
#include "catalog/namespace.h"
2000-06-15 05:33:12 +02:00
#include "catalog/pg_index.h"
1999-07-16 07:00:38 +02:00
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/trigger.h"
1999-07-16 07:00:38 +02:00
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
1999-07-16 07:00:38 +02:00
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_coerce.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/pquery.h"
1999-09-27 22:00:44 +02:00
#include "tcop/tcopprot.h"
1999-07-16 07:00:38 +02:00
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
1999-07-16 07:00:38 +02:00
#include "utils/syscache.h"
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
/*
* Represents the different source/dest cases we need to worry about at
* the bottom level
*/
typedef enum CopyDest
{
COPY_FILE, /* to/from file */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
COPY_NEW_FE /* to/from frontend (3.0 protocol) */
} CopyDest;
This patch implements holdable cursors, following the proposal (materialization into a tuple store) discussed on pgsql-hackers earlier. I've updated the documentation and the regression tests. Notes on the implementation: - I needed to change the tuple store API slightly -- it assumes that it won't be used to hold data across transaction boundaries, so the temp files that it uses for on-disk storage are automatically reclaimed at end-of-transaction. I added a flag to tuplestore_begin_heap() to control this behavior. Is changing the tuple store API in this fashion OK? - in order to store executor results in a tuple store, I added a new CommandDest. This works well for the most part, with one exception: the current DestFunction API doesn't provide enough information to allow the Executor to store results into an arbitrary tuple store (where the particular tuple store to use is chosen by the call site of ExecutorRun). To workaround this, I've temporarily hacked up a solution that works, but is not ideal: since the receiveTuple DestFunction is passed the portal name, we can use that to lookup the Portal data structure for the cursor and then use that to get at the tuple store the Portal is using. This unnecessarily ties the Portal code with the tupleReceiver code, but it works... The proper fix for this is probably to change the DestFunction API -- Tom suggested passing the full QueryDesc to the receiveTuple function. In that case, callers of ExecutorRun could "subclass" QueryDesc to add any additional fields that their particular CommandDest needed to get access to. This approach would work, but I'd like to think about it for a little bit longer before deciding which route to go. In the mean time, the code works fine, so I don't think a fix is urgent. - (semi-related) I added a NO SCROLL keyword to DECLARE CURSOR, and adjusted the behavior of SCROLL in accordance with the discussion on -hackers. - (unrelated) Cleaned up some SGML markup in sql.sgml, copy.sgml Neil Conway
2003-03-27 17:51:29 +01:00
/*
* State indicator showing what stopped CopyReadAttribute()
This patch implements holdable cursors, following the proposal (materialization into a tuple store) discussed on pgsql-hackers earlier. I've updated the documentation and the regression tests. Notes on the implementation: - I needed to change the tuple store API slightly -- it assumes that it won't be used to hold data across transaction boundaries, so the temp files that it uses for on-disk storage are automatically reclaimed at end-of-transaction. I added a flag to tuplestore_begin_heap() to control this behavior. Is changing the tuple store API in this fashion OK? - in order to store executor results in a tuple store, I added a new CommandDest. This works well for the most part, with one exception: the current DestFunction API doesn't provide enough information to allow the Executor to store results into an arbitrary tuple store (where the particular tuple store to use is chosen by the call site of ExecutorRun). To workaround this, I've temporarily hacked up a solution that works, but is not ideal: since the receiveTuple DestFunction is passed the portal name, we can use that to lookup the Portal data structure for the cursor and then use that to get at the tuple store the Portal is using. This unnecessarily ties the Portal code with the tupleReceiver code, but it works... The proper fix for this is probably to change the DestFunction API -- Tom suggested passing the full QueryDesc to the receiveTuple function. In that case, callers of ExecutorRun could "subclass" QueryDesc to add any additional fields that their particular CommandDest needed to get access to. This approach would work, but I'd like to think about it for a little bit longer before deciding which route to go. In the mean time, the code works fine, so I don't think a fix is urgent. - (semi-related) I added a NO SCROLL keyword to DECLARE CURSOR, and adjusted the behavior of SCROLL in accordance with the discussion on -hackers. - (unrelated) Cleaned up some SGML markup in sql.sgml, copy.sgml Neil Conway
2003-03-27 17:51:29 +01:00
*/
typedef enum CopyReadResult
{
NORMAL_ATTR,
END_OF_LINE,
UNTERMINATED_FIELD
} CopyReadResult;
/*
* Represents the end-of-line terminator type of the input
*/
typedef enum EolType
{
EOL_UNKNOWN,
EOL_NL,
EOL_CR,
EOL_CRNL
} EolType;
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/*
* Static communication variables ... pretty grotty, but COPY has
* never been reentrant...
*/
static CopyDest copy_dest;
static FILE *copy_file; /* used if copy_dest == COPY_FILE */
static StringInfo copy_msgbuf; /* used if copy_dest == COPY_NEW_FE */
static bool fe_eof; /* true if detected end of copy data */
static EolType eol_type; /* EOL type of input */
static int client_encoding; /* remote side's character encoding */
static int server_encoding; /* local encoding */
/* these are just for error messages, see copy_in_error_callback */
static bool copy_binary; /* is it a binary copy? */
static const char *copy_relname; /* table name for error messages */
static int copy_lineno; /* line number for error messages */
static const char *copy_attname; /* current att for error messages */
/*
* These static variables are used to avoid incurring overhead for each
* attribute processed. attribute_buf is reused on each CopyReadAttribute
* call to hold the string being read in. Under normal use it will soon
* grow to a suitable size, and then we will avoid palloc/pfree overhead
* for subsequent attributes. Note that CopyReadAttribute returns a pointer
* to attribute_buf's data buffer!
*/
static StringInfoData attribute_buf;
1999-05-25 18:15:34 +02:00
/*
* Similarly, line_buf holds the whole input line being processed (its
* cursor field points to the next character to be read by CopyReadAttribute).
* The input cycle is first to read the whole line into line_buf, convert it
* to server encoding, and then extract individual attribute fields into
* attribute_buf. (We used to have CopyReadAttribute read the input source
* directly, but that caused a lot of encoding issues and unnecessary logic
* complexity.)
*/
static StringInfoData line_buf;
static bool line_buf_converted;
/* non-export function prototypes */
static void DoCopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *force_quote_atts, bool header_line, bool fe_copy);
static void CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
2004-08-29 07:07:03 +02:00
char *delim, char *null_print, bool csv_mode, char *quote, char *escape,
List *force_quote_atts, bool header_line);
static void CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
2004-08-29 07:07:03 +02:00
char *delim, char *null_print, bool csv_mode, char *quote, char *escape,
List *force_notnull_atts, bool header_line);
static bool CopyReadLine(char * quote, char * escape);
static char *CopyReadAttribute(const char *delim, const char *null_print,
2004-08-29 07:07:03 +02:00
CopyReadResult *result, bool *isnull);
static char *CopyReadAttributeCSV(const char *delim, const char *null_print,
2004-08-29 07:07:03 +02:00
char *quote, char *escape,
CopyReadResult *result, bool *isnull);
static Datum CopyReadBinaryAttribute(int column_no, FmgrInfo *flinfo,
Oid typioparam, bool *isnull);
static void CopyAttributeOut(char *string, char *delim);
static void CopyAttributeOutCSV(char *string, char *delim, char *quote,
2004-08-29 07:07:03 +02:00
char *escape, bool force_quote);
static List *CopyGetAttnums(Relation rel, List *attnamelist);
static void limit_printout_length(StringInfo buf);
/* Internal communications functions */
static void SendCopyBegin(bool binary, int natts);
static void ReceiveCopyBegin(bool binary, int natts);
static void SendCopyEnd(bool binary);
static void CopySendData(void *databuf, int datasize);
static void CopySendString(const char *str);
static void CopySendChar(char c);
static void CopySendEndOfRow(bool binary);
static void CopyGetData(void *databuf, int datasize);
static int CopyGetChar(void);
2003-08-04 02:43:34 +02:00
#define CopyGetEof() (fe_eof)
static int CopyPeekChar(void);
static void CopyDonePeek(int c, bool pickup);
static void CopySendInt32(int32 val);
static int32 CopyGetInt32(void);
static void CopySendInt16(int16 val);
static int16 CopyGetInt16(void);
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
*/
static void
SendCopyBegin(bool binary, int natts)
1999-05-25 18:15:34 +02:00
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
2003-08-04 02:43:34 +02:00
int16 format = (binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'H');
2003-08-04 02:43:34 +02:00
pq_sendbyte(&buf, format); /* overall format */
pq_sendint(&buf, natts, 2);
for (i = 0; i < natts; i++)
2003-08-04 02:43:34 +02:00
pq_sendint(&buf, format, 2); /* per-column formats */
pq_endmessage(&buf);
copy_dest = COPY_NEW_FE;
copy_msgbuf = makeStringInfo();
}
else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
{
/* old way */
if (binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('H');
/* grottiness needed for old COPY OUT protocol */
pq_startcopyout();
copy_dest = COPY_OLD_FE;
}
1999-05-25 18:15:34 +02:00
else
{
/* very old way */
if (binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('B');
/* grottiness needed for old COPY OUT protocol */
pq_startcopyout();
copy_dest = COPY_OLD_FE;
}
}
1999-05-25 18:15:34 +02:00
static void
ReceiveCopyBegin(bool binary, int natts)
1999-05-25 18:15:34 +02:00
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
2003-08-04 02:43:34 +02:00
int16 format = (binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'G');
2003-08-04 02:43:34 +02:00
pq_sendbyte(&buf, format); /* overall format */
pq_sendint(&buf, natts, 2);
for (i = 0; i < natts; i++)
2003-08-04 02:43:34 +02:00
pq_sendint(&buf, format, 2); /* per-column formats */
pq_endmessage(&buf);
copy_dest = COPY_NEW_FE;
copy_msgbuf = makeStringInfo();
}
else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
{
/* old way */
if (binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('G');
copy_dest = COPY_OLD_FE;
}
else
{
/* very old way */
if (binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('D');
copy_dest = COPY_OLD_FE;
}
/* We *must* flush here to ensure FE knows it can send. */
pq_flush();
}
static void
SendCopyEnd(bool binary)
1999-05-25 18:15:34 +02:00
{
if (copy_dest == COPY_NEW_FE)
{
if (binary)
{
/* Need to flush out file trailer word */
CopySendEndOfRow(true);
}
else
{
/* Shouldn't have any unsent data */
Assert(copy_msgbuf->len == 0);
}
/* Send Copy Done message */
pq_putemptymessage('c');
}
else
{
/* The FE/BE protocol uses \n as newline for all platforms */
CopySendData("\\.\n", 3);
pq_endcopyout(false);
}
}
/*----------
* CopySendData sends output data to the destination (file or frontend)
* CopySendString does the same for null-terminated strings
* CopySendChar does the same for single characters
* CopySendEndOfRow does the appropriate thing at end of each data row
*
* NB: no data conversion is applied by these functions
*----------
*/
static void
CopySendData(void *databuf, int datasize)
1999-05-25 18:15:34 +02:00
{
switch (copy_dest)
{
case COPY_FILE:
fwrite(databuf, datasize, 1, copy_file);
if (ferror(copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
if (pq_putbytes((char *) databuf, datasize))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
(errcode(ERRCODE_CONNECTION_FAILURE),
2003-08-04 02:43:34 +02:00
errmsg("connection lost during COPY to stdout")));
}
break;
case COPY_NEW_FE:
appendBinaryStringInfo(copy_msgbuf, (char *) databuf, datasize);
break;
}
}
static void
CopySendString(const char *str)
{
CopySendData((void *) str, strlen(str));
}
static void
CopySendChar(char c)
{
CopySendData(&c, 1);
}
static void
CopySendEndOfRow(bool binary)
{
switch (copy_dest)
{
case COPY_FILE:
if (!binary)
{
/* Default line termination depends on platform */
#ifndef WIN32
CopySendChar('\n');
#else
CopySendString("\r\n");
#endif
}
break;
case COPY_OLD_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!binary)
CopySendChar('\n');
break;
case COPY_NEW_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!binary)
CopySendChar('\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', copy_msgbuf->data, copy_msgbuf->len);
/* Reset copy_msgbuf to empty */
copy_msgbuf->len = 0;
copy_msgbuf->data[0] = '\0';
break;
}
}
/*
* CopyGetData reads data from the source (file or frontend)
* CopyGetChar does the same for single characters
*
* CopyGetEof checks if EOF was detected by previous Get operation.
*
* Note: when copying from the frontend, we expect a proper EOF mark per
* protocol; if the frontend simply drops the connection, we raise error.
* It seems unwise to allow the COPY IN to complete normally in that case.
*
* NB: no data conversion is applied by these functions
*/
static void
CopyGetData(void *databuf, int datasize)
1999-05-25 18:15:34 +02:00
{
switch (copy_dest)
1999-05-25 18:15:34 +02:00
{
case COPY_FILE:
fread(databuf, datasize, 1, copy_file);
if (feof(copy_file))
fe_eof = true;
break;
case COPY_OLD_FE:
if (pq_getbytes((char *) databuf, datasize))
{
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
}
break;
case COPY_NEW_FE:
while (datasize > 0 && !fe_eof)
{
2003-08-04 02:43:34 +02:00
int avail;
1999-05-25 18:15:34 +02:00
while (copy_msgbuf->cursor >= copy_msgbuf->len)
{
/* Try to receive another message */
int mtype;
2004-08-29 07:07:03 +02:00
readmessage:
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
2003-08-04 02:43:34 +02:00
errmsg("unexpected EOF on client connection")));
if (pq_getmessage(copy_msgbuf, 0))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
2003-08-04 02:43:34 +02:00
errmsg("unexpected EOF on client connection")));
switch (mtype)
{
2003-08-04 02:43:34 +02:00
case 'd': /* CopyData */
break;
2003-08-04 02:43:34 +02:00
case 'c': /* CopyDone */
/* COPY IN correctly terminated by frontend */
fe_eof = true;
return;
2003-08-04 02:43:34 +02:00
case 'f': /* CopyFail */
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
2003-08-04 02:43:34 +02:00
pq_getmsgstring(copy_msgbuf))));
break;
case 'H': /* Flush */
case 'S': /* Sync */
2004-08-29 07:07:03 +02:00
/*
* Ignore Flush/Sync for the convenience of
* client libraries (such as libpq) that may
2004-08-29 07:07:03 +02:00
* send those without noticing that the
* command they just sent was COPY.
*/
goto readmessage;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type 0x%02X during COPY from stdin",
mtype)));
break;
}
}
avail = copy_msgbuf->len - copy_msgbuf->cursor;
if (avail > datasize)
avail = datasize;
pq_copymsgbytes(copy_msgbuf, databuf, avail);
databuf = (void *) ((char *) databuf + avail);
datasize -= avail;
}
break;
1999-05-25 18:15:34 +02:00
}
}
static int
CopyGetChar(void)
1999-05-25 18:15:34 +02:00
{
2003-08-04 02:43:34 +02:00
int ch;
switch (copy_dest)
{
case COPY_FILE:
ch = getc(copy_file);
break;
case COPY_OLD_FE:
ch = pq_getbyte();
if (ch == EOF)
{
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
}
break;
case COPY_NEW_FE:
2003-08-04 02:43:34 +02:00
{
unsigned char cc;
2003-08-04 02:43:34 +02:00
CopyGetData(&cc, 1);
if (fe_eof)
ch = EOF;
else
ch = cc;
break;
}
default:
ch = EOF;
break;
}
if (ch == EOF)
fe_eof = true;
return ch;
}
/*
* CopyPeekChar reads a byte in "peekable" mode.
*
* after each call to CopyPeekChar, a call to CopyDonePeek _must_
* follow, unless EOF was returned.
*
* CopyDonePeek will either take the peeked char off the stream
* (if pickup is true) or leave it on the stream (if pickup is false).
*/
static int
CopyPeekChar(void)
1999-05-25 18:15:34 +02:00
{
2003-08-04 02:43:34 +02:00
int ch;
switch (copy_dest)
{
case COPY_FILE:
ch = getc(copy_file);
break;
case COPY_OLD_FE:
ch = pq_peekbyte();
if (ch == EOF)
{
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
}
break;
case COPY_NEW_FE:
2003-08-04 02:43:34 +02:00
{
unsigned char cc;
2003-08-04 02:43:34 +02:00
CopyGetData(&cc, 1);
if (fe_eof)
ch = EOF;
else
ch = cc;
break;
}
default:
ch = EOF;
break;
}
if (ch == EOF)
fe_eof = true;
return ch;
}
static void
CopyDonePeek(int c, bool pickup)
1999-05-25 18:15:34 +02:00
{
if (fe_eof)
return; /* can't unget an EOF */
switch (copy_dest)
1999-05-25 18:15:34 +02:00
{
case COPY_FILE:
2003-08-04 02:43:34 +02:00
if (!pickup)
{
/* We don't want to pick it up - so put it back in there */
ungetc(c, copy_file);
}
/* If we wanted to pick it up, it's already done */
break;
case COPY_OLD_FE:
if (pickup)
{
/* We want to pick it up */
(void) pq_getbyte();
}
2003-08-04 02:43:34 +02:00
/*
* If we didn't want to pick it up, just leave it where it
* sits
*/
break;
case COPY_NEW_FE:
if (!pickup)
{
/* We don't want to pick it up - so put it back in there */
copy_msgbuf->cursor--;
}
/* If we wanted to pick it up, it's already done */
break;
1999-05-25 18:15:34 +02:00
}
}
1999-05-25 18:15:34 +02:00
/*
* These functions do apply some data conversion
*/
/*
* CopySendInt32 sends an int32 in network byte order
*/
static void
CopySendInt32(int32 val)
{
uint32 buf;
buf = htonl((uint32) val);
CopySendData(&buf, sizeof(buf));
}
/*
* CopyGetInt32 reads an int32 that appears in network byte order
*/
static int32
CopyGetInt32(void)
{
uint32 buf;
CopyGetData(&buf, sizeof(buf));
return (int32) ntohl(buf);
}
/*
* CopySendInt16 sends an int16 in network byte order
*/
static void
CopySendInt16(int16 val)
{
uint16 buf;
buf = htons((uint16) val);
CopySendData(&buf, sizeof(buf));
}
/*
* CopyGetInt16 reads an int16 that appears in network byte order
*/
static int16
CopyGetInt16(void)
{
uint16 buf;
CopyGetData(&buf, sizeof(buf));
return (int16) ntohs(buf);
}
/*
* DoCopy executes the SQL COPY statement.
*
* Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = TRUE means we are inserting into the table.)
*
* If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
* Iff <binary>, unload or reload in the binary format, as opposed to the
* more wasteful but more robust and portable text format.
*
* Iff <oids>, unload or reload the format that includes OID information.
* On input, we accept OIDs whether or not the table has an OID column,
* but silently drop them if it does not. On output, we report an error
* if the user asks for OIDs in a table that has none (not providing an
* OID column might seem friendlier, but could seriously confuse programs).
*
* If in the text format, delimit columns with delimiter <delim> and print
* NULL values as <null_print>.
*
* When loading in the text format from an input stream (as opposed to
* a file), recognize a "." on a line by itself as EOF. Also recognize
* a stream EOF. When unloading in the text format to an output stream,
* write a "." on a line by itself at the end of the data.
*
* Do not allow a Postgres user without superuser privilege to read from
* or write to a file.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table.
*/
void
DoCopy(const CopyStmt *stmt)
{
2002-09-04 22:31:48 +02:00
RangeVar *relation = stmt->relation;
char *filename = stmt->filename;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
ListCell *option;
2002-09-04 22:31:48 +02:00
List *attnamelist = stmt->attlist;
List *attnumlist;
bool fe_copy = false;
2002-09-04 22:31:48 +02:00
bool binary = false;
bool oids = false;
2004-08-29 07:07:03 +02:00
bool csv_mode = false;
bool header_line = false;
2002-09-04 22:31:48 +02:00
char *delim = NULL;
char *quote = NULL;
char *escape = NULL;
2002-09-04 22:31:48 +02:00
char *null_print = NULL;
List *force_quote = NIL;
List *force_notnull = NIL;
List *force_quote_atts = NIL;
List *force_notnull_atts = NIL;
Relation rel;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "binary") == 0)
{
if (binary)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
binary = intVal(defel->arg);
}
else if (strcmp(defel->defname, "oids") == 0)
{
if (oids)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
oids = intVal(defel->arg);
}
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
delim = strVal(defel->arg);
}
else if (strcmp(defel->defname, "null") == 0)
{
if (null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
null_print = strVal(defel->arg);
}
else if (strcmp(defel->defname, "csv") == 0)
{
if (csv_mode)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
csv_mode = intVal(defel->arg);
}
else if (strcmp(defel->defname, "header") == 0)
{
if (header_line)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
header_line = intVal(defel->arg);
}
else if (strcmp(defel->defname, "quote") == 0)
{
if (quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
quote = strVal(defel->arg);
}
else if (strcmp(defel->defname, "escape") == 0)
{
if (escape)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
escape = strVal(defel->arg);
}
else if (strcmp(defel->defname, "force_quote") == 0)
{
if (force_quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
2004-08-29 07:07:03 +02:00
force_quote = (List *) defel->arg;
}
else if (strcmp(defel->defname, "force_notnull") == 0)
{
if (force_notnull)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
2004-08-29 07:07:03 +02:00
force_notnull = (List *) defel->arg;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (binary && delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify DELIMITER in BINARY mode")));
if (binary && csv_mode)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify CSV in BINARY mode")));
if (binary && null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify NULL in BINARY mode")));
/* Set defaults */
if (!delim)
delim = csv_mode ? "," : "\t";
2004-08-29 07:07:03 +02:00
if (!null_print)
null_print = csv_mode ? "" : "\\N";
if (csv_mode)
{
if (!quote)
quote = "\"";
if (!escape)
escape = quote;
}
2004-08-29 07:07:03 +02:00
/* Only single-character delimiter strings are supported. */
if (strlen(delim) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single character")));
/* Check header */
if (!csv_mode && header_line)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY HEADER available only in CSV mode")));
/* Check quote */
if (!csv_mode && quote != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote available only in CSV mode")));
if (csv_mode && strlen(quote) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote must be a single character")));
/* Check escape */
if (!csv_mode && escape != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape available only in CSV mode")));
if (csv_mode && strlen(escape) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape must be a single character")));
/* Check force_quote */
if (!csv_mode && force_quote != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force quote available only in CSV mode")));
if (force_quote != NIL && is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2004-08-29 07:07:03 +02:00
errmsg("COPY force quote only available using COPY TO")));
/* Check force_notnull */
if (!csv_mode && force_notnull != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2004-08-29 07:07:03 +02:00
errmsg("COPY force not null available only in CSV mode")));
if (force_notnull != NIL && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2004-08-29 07:07:03 +02:00
errmsg("COPY force not null only available using COPY FROM")));
/* Don't allow the delimiter to appear in the null string. */
if (strchr(null_print, delim[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must not appear in the NULL specification")));
/* Don't allow the csv quote char to appear in the null string. */
if (csv_mode && strchr(null_print, quote[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
2002-09-04 22:31:48 +02:00
/* Open and lock the relation, using the appropriate lock type. */
rel = heap_openrv(relation, (is_from ? RowExclusiveLock : AccessShareLock));
/* check read-only transaction */
if (XactReadOnly && !is_from && !isTempNamespace(RelationGetNamespace(rel)))
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("transaction is read-only")));
/* Check permissions. */
aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
required_access);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(rel));
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
2003-08-04 02:43:34 +02:00
"psql's \\copy command also works for anyone.")));
2001-03-22 05:01:46 +01:00
/* Don't allow COPY w/ OIDs to or from a table without them */
if (oids && !rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(rel))));
/* Generate or convert list of attributes to process */
attnumlist = CopyGetAttnums(rel, attnamelist);
/* Check that FORCE QUOTE references valid COPY columns */
if (force_quote)
{
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
ListCell *cur;
force_quote_atts = CopyGetAttnums(rel, force_quote);
foreach(cur, force_quote_atts)
{
int attnum = lfirst_int(cur);
if (!list_member_int(attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2004-08-29 07:07:03 +02:00
errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
}
}
2004-08-29 07:07:03 +02:00
/* Check that FORCE NOT NULL references valid COPY columns */
if (force_notnull)
{
ListCell *cur;
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
force_notnull_atts = CopyGetAttnums(rel, force_notnull);
foreach(cur, force_notnull_atts)
{
int attnum = lfirst_int(cur);
if (!list_member_int(attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
}
}
2004-08-29 07:07:03 +02:00
/* Set up variables to avoid per-attribute overhead. */
initStringInfo(&attribute_buf);
initStringInfo(&line_buf);
line_buf_converted = false;
client_encoding = pg_get_client_encoding();
server_encoding = GetDatabaseEncoding();
copy_dest = COPY_FILE; /* default */
copy_file = NULL;
copy_msgbuf = NULL;
fe_eof = false;
if (is_from)
{ /* copy from file to database */
if (rel->rd_rel->relkind != RELKIND_RELATION)
{
if (rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(rel))));
else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2003-08-04 02:43:34 +02:00
errmsg("cannot copy to non-table relation \"%s\"",
RelationGetRelationName(rel))));
}
if (pipe)
{
if (whereToSendOutput == Remote)
ReceiveCopyBegin(binary, list_length(attnumlist));
else
copy_file = stdin;
}
else
{
struct stat st;
copy_file = AllocateFile(filename, PG_BINARY_R);
if (copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
2003-08-04 02:43:34 +02:00
errmsg("could not open file \"%s\" for reading: %m",
filename)));
fstat(fileno(copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", filename)));
}
}
CopyFrom(rel, attnumlist, binary, oids, delim, null_print, csv_mode,
quote, escape, force_notnull_atts, header_line);
}
else
{ /* copy from database to file */
if (rel->rd_rel->relkind != RELKIND_RELATION)
{
if (rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(rel))));
else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2003-08-04 02:43:34 +02:00
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(rel))));
}
if (pipe)
{
if (whereToSendOutput == Remote)
fe_copy = true;
else
copy_file = stdout;
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
/*
2001-03-22 05:01:46 +01:00
* Prevent write to relative path ... too easy to shoot
* oneself in the foot by overwriting a database file ...
*/
if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
2003-08-04 02:43:34 +02:00
errmsg("relative path not allowed for COPY to file")));
oumask = umask((mode_t) 022);
copy_file = AllocateFile(filename, PG_BINARY_W);
umask(oumask);
if (copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
2003-08-04 02:43:34 +02:00
errmsg("could not open file \"%s\" for writing: %m",
filename)));
fstat(fileno(copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", filename)));
}
}
DoCopyTo(rel, attnumlist, binary, oids, delim, null_print, csv_mode,
quote, escape, force_quote_atts, header_line, fe_copy);
}
if (!pipe)
{
/* we assume only the write case could fail here */
if (FreeFile(copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
filename)));
}
pfree(attribute_buf.data);
pfree(line_buf.data);
/*
* Close the relation. If reading, we can release the AccessShareLock
* we got; if writing, we should hold the lock until end of
* transaction to ensure that updates will be committed before lock is
* released.
*/
heap_close(rel, (is_from ? NoLock : AccessShareLock));
}
/*
* This intermediate routine just exists to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static void
DoCopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *force_quote_atts, bool header_line, bool fe_copy)
{
PG_TRY();
{
if (fe_copy)
SendCopyBegin(binary, list_length(attnumlist));
CopyTo(rel, attnumlist, binary, oids, delim, null_print, csv_mode,
quote, escape, force_quote_atts, header_line);
if (fe_copy)
SendCopyEnd(binary);
}
PG_CATCH();
{
/*
2004-08-29 07:07:03 +02:00
* Make sure we turn off old-style COPY OUT mode upon error. It is
* okay to do this in all cases, since it does nothing if the mode
* is not on.
*/
pq_endcopyout(true);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* Copy from relation TO file.
*/
static void
CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *force_quote_atts, bool header_line)
{
HeapTuple tuple;
TupleDesc tupDesc;
HeapScanDesc scandesc;
int num_phys_attrs;
int attr_count;
1998-09-01 05:29:17 +02:00
Form_pg_attribute *attr;
FmgrInfo *out_functions;
bool *force_quote;
char *string;
ListCell *cur;
MemoryContext oldcontext;
MemoryContext mycontext;
tupDesc = rel->rd_att;
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(attnumlist);
/* Get info about the columns we need to process. */
out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
force_quote = (bool *) palloc(num_phys_attrs * sizeof(bool));
foreach(cur, attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
2004-08-29 07:07:03 +02:00
if (binary)
getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
else
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
2002-09-04 22:31:48 +02:00
fmgr_info(out_func_oid, &out_functions[attnum - 1]);
if (list_member_int(force_quote_atts, attnum))
force_quote[attnum - 1] = true;
else
force_quote[attnum - 1] = false;
}
/*
2003-08-04 02:43:34 +02:00
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks
* inside datatype output routines, and should be faster than retail
* pfree's anyway. (We don't need a whole econtext as CopyFrom does.)
*/
mycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (binary)
{
/* Generate header for a binary copy */
int32 tmp;
/* Signature */
CopySendData((char *) BinarySignature, 11);
/* Flags field */
tmp = 0;
if (oids)
tmp |= (1 << 16);
CopySendInt32(tmp);
/* No header extension */
tmp = 0;
CopySendInt32(tmp);
}
else
{
/*
* For non-binary copy, we need to convert null_print to client
* encoding, because it will be sent directly with CopySendString.
*/
if (server_encoding != client_encoding)
null_print = (char *)
pg_server_to_client((unsigned char *) null_print,
strlen(null_print));
/* if a header has been requested send the line */
if (header_line)
{
bool hdr_delim = false;
char *colname;
foreach(cur, attnumlist)
{
int attnum = lfirst_int(cur);
if (hdr_delim)
CopySendChar(delim[0]);
hdr_delim = true;
colname = NameStr(attr[attnum - 1]->attname);
CopyAttributeOutCSV(colname, delim, quote, escape,
strcmp(colname, null_print) == 0);
}
CopySendEndOfRow(binary);
}
}
scandesc = heap_beginscan(rel, ActiveSnapshot, 0, NULL);
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
bool need_delim = false;
2004-08-29 07:07:03 +02:00
CHECK_FOR_INTERRUPTS();
MemoryContextReset(mycontext);
oldcontext = MemoryContextSwitchTo(mycontext);
if (binary)
{
/* Binary per-tuple header */
CopySendInt16(attr_count);
/* Send OID if wanted --- note attr_count doesn't include it */
if (oids)
{
2002-09-04 22:31:48 +02:00
Oid oid = HeapTupleGetOid(tuple);
/* Hack --- assume Oid is same size as int32 */
CopySendInt32(sizeof(int32));
CopySendInt32(oid);
}
}
else
{
/* Text format has no per-tuple header, but send OID if wanted */
if (oids)
{
string = DatumGetCString(DirectFunctionCall1(oidout,
2002-09-04 22:31:48 +02:00
ObjectIdGetDatum(HeapTupleGetOid(tuple))));
CopySendString(string);
need_delim = true;
}
}
foreach(cur, attnumlist)
{
int attnum = lfirst_int(cur);
Datum value;
bool isnull;
value = heap_getattr(tuple, attnum, tupDesc, &isnull);
if (!binary)
{
if (need_delim)
CopySendChar(delim[0]);
need_delim = true;
}
if (isnull)
{
if (!binary)
2003-08-04 02:43:34 +02:00
CopySendString(null_print); /* null indicator */
else
2003-08-04 02:43:34 +02:00
CopySendInt32(-1); /* null marker */
}
else
{
if (!binary)
{
string = DatumGetCString(FunctionCall1(&out_functions[attnum - 1],
value));
if (csv_mode)
{
CopyAttributeOutCSV(string, delim, quote, escape,
2004-08-29 07:07:03 +02:00
(strcmp(string, null_print) == 0 ||
force_quote[attnum - 1]));
}
else
CopyAttributeOut(string, delim);
}
else
{
bytea *outputbytes;
outputbytes = DatumGetByteaP(FunctionCall1(&out_functions[attnum - 1],
value));
/* We assume the result will not have been toasted */
CopySendInt32(VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
}
CopySendEndOfRow(binary);
MemoryContextSwitchTo(oldcontext);
}
heap_endscan(scandesc);
if (binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(-1);
}
MemoryContextDelete(mycontext);
pfree(out_functions);
pfree(force_quote);
}
/*
* error context callback for COPY FROM
*/
static void
copy_in_error_callback(void *arg)
{
if (copy_binary)
{
/* can't usefully display the data */
if (copy_attname)
errcontext("COPY %s, line %d, column %s",
copy_relname, copy_lineno, copy_attname);
else
errcontext("COPY %s, line %d", copy_relname, copy_lineno);
}
else
{
if (copy_attname)
{
/* error is relevant to a particular column */
limit_printout_length(&attribute_buf);
errcontext("COPY %s, line %d, column %s: \"%s\"",
copy_relname, copy_lineno, copy_attname,
attribute_buf.data);
}
else
{
/* error is relevant to a particular line */
if (line_buf_converted ||
client_encoding == server_encoding)
{
limit_printout_length(&line_buf);
errcontext("COPY %s, line %d: \"%s\"",
copy_relname, copy_lineno,
line_buf.data);
}
else
{
/*
* Here, the line buffer is still in a foreign encoding,
* and indeed it's quite likely that the error is precisely
* a failure to do encoding conversion (ie, bad data). We
* dare not try to convert it, and at present there's no way
* to regurgitate it without conversion. So we have to punt
* and just report the line number.
*/
errcontext("COPY %s, line %d", copy_relname, copy_lineno);
}
}
}
}
/*
* Make sure we don't print an unreasonable amount of COPY data in a message.
*
* It would seem a lot easier to just use the sprintf "precision" limit to
* truncate the string. However, some versions of glibc have a bug/misfeature
* that vsnprintf will always fail (return -1) if it is asked to truncate
* a string that contains invalid byte sequences for the current encoding.
* So, do our own truncation. We assume we can alter the StringInfo buffer
* holding the input data.
*/
static void
limit_printout_length(StringInfo buf)
{
#define MAX_COPY_DATA_DISPLAY 100
2004-08-29 07:07:03 +02:00
int len;
/* Fast path if definitely okay */
if (buf->len <= MAX_COPY_DATA_DISPLAY)
return;
/* Apply encoding-dependent truncation */
len = pg_mbcliplen(buf->data, buf->len, MAX_COPY_DATA_DISPLAY);
if (buf->len <= len)
return; /* no need to truncate */
buf->len = len;
buf->data[len] = '\0';
/* Add "..." to show we truncated the input */
appendStringInfoString(buf, "...");
}
/*
* Copy FROM file to relation.
*/
static void
2002-09-04 22:31:48 +02:00
CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
char *delim, char *null_print, bool csv_mode, char *quote,
char *escape, List *force_notnull_atts, bool header_line)
{
HeapTuple tuple;
TupleDesc tupDesc;
1998-09-01 05:29:17 +02:00
Form_pg_attribute *attr;
2002-09-04 22:31:48 +02:00
AttrNumber num_phys_attrs,
attr_count,
num_defaults;
FmgrInfo *in_functions;
FmgrInfo oid_in_function;
Oid *typioparams;
Oid oid_typioparam;
ExprState **constraintexprs;
bool *force_notnull;
bool hasConstraints = false;
int attnum;
int i;
Oid in_func_oid;
Datum *values;
char *nulls;
bool done = false;
bool isnull;
ResultRelInfo *resultRelInfo;
2001-03-22 05:01:46 +01:00
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
TupleTableSlot *slot;
bool file_has_oids;
int *defmap;
ExprState **defexprs; /* array of default att expressions */
2002-09-04 22:31:48 +02:00
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
1998-09-01 05:29:17 +02:00
tupDesc = RelationGetDescr(rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(attnumlist);
num_defaults = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
2001-03-22 05:01:46 +01:00
* index-entry-making machinery. (There used to be a huge amount of
* code here that basically duplicated execUtils.c ...)
*/
resultRelInfo = makeNode(ResultRelInfo);
2001-03-22 05:01:46 +01:00
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
resultRelInfo->ri_RelationDesc = rel;
resultRelInfo->ri_TrigDesc = CopyTriggerDesc(rel->trigdesc);
if (resultRelInfo->ri_TrigDesc)
resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
resultRelInfo->ri_TrigInstrument = NULL;
ExecOpenIndices(resultRelInfo);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
/* Set up a tuple slot too */
slot = MakeSingleTupleTableSlot(tupDesc);
econtext = GetPerTupleExprContext(estate);
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass
* to the input function), and info about defaults and constraints.
* (Which input function we use depends on text/binary format choice.)
*/
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
constraintexprs = (ExprState **) palloc0(num_phys_attrs * sizeof(ExprState *));
force_notnull = (bool *) palloc(num_phys_attrs * sizeof(bool));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
/* We don't need info for dropped attributes */
if (attr[attnum - 1]->attisdropped)
continue;
/* Fetch the input function and typioparam info */
if (binary)
getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2004-08-29 07:07:03 +02:00
&in_func_oid, &typioparams[attnum - 1]);
else
getTypeInputInfo(attr[attnum - 1]->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
if (list_member_int(force_notnull_atts, attnum))
force_notnull[attnum - 1] = true;
else
force_notnull[attnum - 1] = false;
2004-08-29 07:07:03 +02:00
/* Get default info if needed */
if (!list_member_int(attnumlist, attnum))
{
/* attribute is NOT to be copied from input */
/* use default value if one exists */
Node *defexpr = build_column_default(rel, attnum);
if (defexpr != NULL)
{
defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
estate);
defmap[num_defaults] = attnum - 1;
num_defaults++;
}
}
/* If it's a domain type, set up to check domain constraints */
if (get_typtype(attr[attnum - 1]->atttypid) == 'd')
{
Param *prm;
Node *node;
/*
* Easiest way to do this is to use parse_coerce.c to set up
* an expression that checks the constraints. (At present,
2003-08-04 02:43:34 +02:00
* the expression might contain a length-coercion-function
* call and/or CoerceToDomain nodes.) The bottom of the
* expression is a Param node so that we can fill in the
* actual datum during the data input loop.
*/
prm = makeNode(Param);
prm->paramkind = PARAM_EXEC;
prm->paramid = 0;
prm->paramtype = getBaseType(attr[attnum - 1]->atttypid);
node = coerce_to_domain((Node *) prm,
prm->paramtype,
attr[attnum - 1]->atttypid,
COERCE_IMPLICIT_CAST, false, false);
constraintexprs[attnum - 1] = ExecPrepareExpr((Expr *) node,
2004-08-29 07:07:03 +02:00
estate);
hasConstraints = true;
}
}
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/*
2003-08-04 02:43:34 +02:00
* Check BEFORE STATEMENT insertion triggers. It's debateable whether
* we should do this for COPY, since it's not really an "INSERT"
* statement as such. However, executing these triggers maintains
* consistency with the EACH ROW triggers that we already fire on
* COPY.
*/
ExecBSInsertTriggers(estate, resultRelInfo);
if (!binary)
file_has_oids = oids; /* must rely on user to tell us this... */
else
{
/* Read and verify binary header */
char readSig[11];
int32 tmp;
/* Signature */
CopyGetData(readSig, 11);
if (CopyGetEof() || memcmp(readSig, BinarySignature, 11) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("COPY file signature not recognized")));
/* Flags field */
tmp = CopyGetInt32();
if (CopyGetEof())
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
file_has_oids = (tmp & (1 << 16)) != 0;
2001-03-22 05:01:46 +01:00
tmp &= ~(1 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2003-08-04 02:43:34 +02:00
errmsg("unrecognized critical flags in COPY file header")));
/* Header extension length */
tmp = CopyGetInt32();
if (CopyGetEof() || tmp < 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2003-08-04 02:43:34 +02:00
errmsg("invalid COPY file header (missing length)")));
/* Skip extension header, if present */
while (tmp-- > 0)
{
CopyGetData(readSig, 1);
if (CopyGetEof())
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2003-08-04 02:43:34 +02:00
errmsg("invalid COPY file header (wrong length)")));
}
}
if (file_has_oids && binary)
{
getTypeBinaryInputInfo(OIDOID,
&in_func_oid, &oid_typioparam);
fmgr_info(in_func_oid, &oid_in_function);
}
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (char *) palloc(num_phys_attrs * sizeof(char));
/* Make room for a PARAM_EXEC value for domain constraint checks */
if (hasConstraints)
econtext->ecxt_param_exec_vals = (ParamExecData *)
palloc0(sizeof(ParamExecData));
/* Initialize static variables */
fe_eof = false;
eol_type = EOL_UNKNOWN;
copy_binary = binary;
copy_relname = RelationGetRelationName(rel);
copy_lineno = 0;
copy_attname = NULL;
/* Set up callback to identify error line number */
errcontext.callback = copy_in_error_callback;
errcontext.arg = NULL;
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
/* on input just throw the header line away */
if (header_line)
{
copy_lineno++;
done = CopyReadLine(quote, escape) ;
}
while (!done)
{
2002-09-04 22:31:48 +02:00
bool skip_tuple;
Oid loaded_oid = InvalidOid;
CHECK_FOR_INTERRUPTS();
1999-09-27 22:00:44 +02:00
copy_lineno++;
2002-09-04 22:31:48 +02:00
/* Reset the per-tuple exprcontext */
ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, 'n', num_phys_attrs * sizeof(char));
if (!binary)
{
2002-09-04 22:31:48 +02:00
CopyReadResult result = NORMAL_ATTR;
char *string;
ListCell *cur;
/* Actually read the line into memory here */
done = csv_mode ?
CopyReadLine(quote, escape) : CopyReadLine(NULL, NULL);
/*
2004-08-29 07:07:03 +02:00
* EOF at start of line means we're done. If we see EOF after
* some characters, we act as though it was newline followed
* by EOF, ie, process the line and then exit loop on next
* iteration.
*/
if (done && line_buf.len == 0)
break;
if (file_has_oids)
{
/* can't be in CSV mode here */
string = CopyReadAttribute(delim, null_print,
&result, &isnull);
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data")));
else
{
copy_attname = "oid";
loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
2001-03-22 05:01:46 +01:00
CStringGetDatum(string)));
if (loaded_oid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
copy_attname = NULL;
}
}
2002-09-04 22:31:48 +02:00
/* Loop to read the user attributes on the line. */
foreach(cur, attnumlist)
{
int attnum = lfirst_int(cur);
2002-09-04 22:31:48 +02:00
int m = attnum - 1;
/*
* If prior attr on this line was ended by newline,
* complain.
*/
if (result != NORMAL_ATTR)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(attr[m]->attname))));
if (csv_mode)
{
string = CopyReadAttributeCSV(delim, null_print, quote,
2004-08-29 07:07:03 +02:00
escape, &result, &isnull);
if (result == UNTERMINATED_FIELD)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2004-08-29 07:07:03 +02:00
errmsg("unterminated CSV quoted field")));
}
else
2004-08-29 07:07:03 +02:00
string = CopyReadAttribute(delim, null_print,
&result, &isnull);
if (csv_mode && isnull && force_notnull[m])
{
2004-08-29 07:07:03 +02:00
string = null_print; /* set to NULL string */
isnull = false;
}
2004-08-29 07:07:03 +02:00
/* we read an SQL NULL, no need to do anything */
if (!isnull)
{
copy_attname = NameStr(attr[m]->attname);
values[m] = FunctionCall3(&in_functions[m],
CStringGetDatum(string),
2004-08-29 07:07:03 +02:00
ObjectIdGetDatum(typioparams[m]),
2002-09-04 22:31:48 +02:00
Int32GetDatum(attr[m]->atttypmod));
nulls[m] = ' ';
copy_attname = NULL;
}
}
/*
* Complain if there are more fields on the input line.
*
2003-08-04 02:43:34 +02:00
* Special case: if we're reading a zero-column table, we won't
* yet have called CopyReadAttribute() at all; so no error if
* line is empty.
*/
if (result == NORMAL_ATTR && line_buf.len != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2004-08-29 07:07:03 +02:00
errmsg("extra data after last expected column")));
}
else
{
/* binary */
int16 fld_count;
ListCell *cur;
fld_count = CopyGetInt16();
if (CopyGetEof() || fld_count == -1)
{
done = true;
break;
}
if (fld_count != attr_count)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("row field count is %d, expected %d",
(int) fld_count, attr_count)));
if (file_has_oids)
{
copy_attname = "oid";
loaded_oid =
DatumGetObjectId(CopyReadBinaryAttribute(0,
2004-08-29 07:07:03 +02:00
&oid_in_function,
oid_typioparam,
&isnull));
if (isnull || loaded_oid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
copy_attname = NULL;
}
i = 0;
foreach(cur, attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
copy_attname = NameStr(attr[m]->attname);
i++;
values[m] = CopyReadBinaryAttribute(i,
&in_functions[m],
typioparams[m],
&isnull);
nulls[m] = isnull ? 'n' : ' ';
copy_attname = NULL;
}
}
/*
2002-09-04 22:31:48 +02:00
* Now compute and insert any defaults available for the columns
* not provided by the input data. Anything not processed here or
* above will remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&isnull, NULL);
if (!isnull)
nulls[defmap[i]] = ' ';
}
/* Next apply any domain constraints */
if (hasConstraints)
{
ParamExecData *prmdata = &econtext->ecxt_param_exec_vals[0];
for (i = 0; i < num_phys_attrs; i++)
{
ExprState *exprstate = constraintexprs[i];
if (exprstate == NULL)
continue; /* no constraint for this attr */
/* Insert current row's value into the Param value */
prmdata->value = values[i];
prmdata->isnull = (nulls[i] == 'n');
/*
2003-08-04 02:43:34 +02:00
* Execute the constraint expression. Allow the
* expression to replace the value (consider e.g. a
* timestamp precision restriction).
*/
values[i] = ExecEvalExpr(exprstate, econtext,
&isnull, NULL);
nulls[i] = isnull ? 'n' : ' ';
}
}
/* And now we can form the input tuple. */
tuple = heap_formtuple(tupDesc, values, nulls);
2002-09-04 22:31:48 +02:00
if (oids && file_has_oids)
HeapTupleSetOid(tuple, loaded_oid);
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
{
HeapTuple newtuple;
newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
if (newtuple == NULL) /* "do nothing" */
skip_tuple = true;
else if (newtuple != tuple) /* modified by Trigger(s) */
{
heap_freetuple(tuple);
tuple = newtuple;
}
}
if (!skip_tuple)
{
/* Place tuple in tuple slot */
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
/* Check the constraints of the tuple */
if (rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* OK, store the tuple and create index entries for it */
simple_heap_insert(rel, tuple);
if (resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple);
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
MemoryContextSwitchTo(oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
/* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate);
pfree(values);
1998-02-10 17:04:38 +01:00
pfree(nulls);
1999-05-25 18:15:34 +02:00
pfree(in_functions);
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
pfree(constraintexprs);
pfree(force_notnull);
ExecDropSingleTupleTableSlot(slot);
ExecCloseIndices(resultRelInfo);
FreeExecutorState(estate);
}
/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
*
* Result is true if read was terminated by EOF, false if terminated
* by newline.
*/
static bool
CopyReadLine(char * quote, char * escape)
{
bool result;
bool change_encoding = (client_encoding != server_encoding);
int c;
int mblen;
int j;
unsigned char s[2];
char *cvt;
bool in_quote = false, last_was_esc = false, csv_mode = false;
char quotec = '\0', escapec = '\0';
if (quote)
{
csv_mode = true;
quotec = quote[0];
escapec = escape[0];
/* ignore special escape processing if it's the same as quotec */
if (quotec == escapec)
escapec = '\0';
}
s[1] = 0;
/* reset line_buf to empty */
line_buf.len = 0;
line_buf.data[0] = '\0';
line_buf.cursor = 0;
/* mark that encoding conversion hasn't occurred yet */
line_buf_converted = false;
/* set default status */
result = false;
/*
2004-08-29 07:07:03 +02:00
* In this loop we only care for detecting newlines (\r and/or \n) and
* the end-of-copy marker (\.).
*
* In Text mode, for backwards compatibility we allow
2004-08-29 07:07:03 +02:00
* backslashes to escape newline characters. Backslashes other than
* the end marker get put into the line_buf, since CopyReadAttribute
* does its own escape processing.
*
* In CSV mode, CR and NL inside q quoted field are just part of the
* data value and are put in line_buf. We keep just enough state
* to know if we are currently in a quoted field or not.
*
* These four characters, and only these four, are assumed the same in
* frontend and backend encodings.
*
2004-08-29 07:07:03 +02:00
* We do not assume that second and later bytes of a frontend
* multibyte character couldn't look like ASCII characters.
*/
for (;;)
{
c = CopyGetChar();
if (c == EOF)
{
result = true;
break;
}
if (csv_mode)
{
/*
* Dealing with quotes and escapes here is mildly tricky. If the
* quote char is also the escape char, there's no problem - we
* just use the char as a toggle. If they are different, we need
* to ensure that we only take account of an escape inside a quoted
* field and immediately preceding a quote char, and not the
* second in a escape-escape sequence.
*/
if (in_quote && c == escapec)
last_was_esc = ! last_was_esc;
if (c == quotec && ! last_was_esc)
in_quote = ! in_quote;
if (c != escapec)
last_was_esc = false;
/*
* updating the line count for embedded CR and/or LF chars is
* necessarily a little fragile - this test is probably about
* the best we can do.
*/
if (in_quote && c == (eol_type == EOL_CR ? '\r' : '\n'))
copy_lineno++;
}
if (!in_quote && c == '\r')
{
if (eol_type == EOL_NL)
{
if (! csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("literal carriage return found in data"),
errhint("Use \"\\r\" to represent carriage return.")));
else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unquoted carriage return found in CSV data"),
errhint("Use quoted CSV field to represent carriage return.")));
}
2003-08-04 02:43:34 +02:00
/* Check for \r\n on first line, _and_ handle \r\n. */
if (eol_type == EOL_UNKNOWN || eol_type == EOL_CRNL)
{
2003-08-04 02:43:34 +02:00
int c2 = CopyPeekChar();
if (c2 == '\n')
{
CopyDonePeek(c2, true); /* eat newline */
eol_type = EOL_CRNL;
}
else
{
/* found \r, but no \n */
if (eol_type == EOL_CRNL)
{
if (!csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("literal carriage return found in data"),
errhint("Use \"\\r\" to represent carriage return.")));
else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unquoted carriage return found in data"),
errhint("Use quoted CSV field to represent carriage return.")));
}
2003-08-04 02:43:34 +02:00
/*
* if we got here, it is the first line and we didn't
* get \n, so put it back
*/
CopyDonePeek(c2, false);
eol_type = EOL_CR;
}
}
break;
}
if (!in_quote && c == '\n')
{
if (eol_type == EOL_CR || eol_type == EOL_CRNL)
{
if (!csv_mode)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("literal newline found in data"),
errhint("Use \"\\n\" to represent newline.")));
else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unquoted newline found in data"),
errhint("Use quoted CSV field to represent newline.")));
}
eol_type = EOL_NL;
break;
}
if ((line_buf.len == 0 || !csv_mode) && c == '\\')
{
int c2;
if (csv_mode)
c2 = CopyPeekChar();
else
c2 = c = CopyGetChar();
if (c2 == EOF)
{
result = true;
if (csv_mode)
CopyDonePeek(c2, true);
break;
}
if (c2 == '.')
{
if (csv_mode)
CopyDonePeek(c2, true); /* allow keep calling GetChar() */
if (eol_type == EOL_CRNL)
{
c = CopyGetChar();
if (c == '\n')
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker does not match previous newline style")));
if (c != '\r')
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker corrupt")));
}
c = CopyGetChar();
if (c != '\r' && c != '\n')
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker corrupt")));
if ((eol_type == EOL_NL && c != '\n') ||
(eol_type == EOL_CRNL && c != '\n') ||
(eol_type == EOL_CR && c != '\r'))
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("end-of-copy marker does not match previous newline style")));
/*
2004-08-29 07:07:03 +02:00
* In protocol version 3, we should ignore anything after
* \. up to the protocol end of copy data. (XXX maybe
* better not to treat \. as special?)
*/
if (copy_dest == COPY_NEW_FE)
{
while (c != EOF)
c = CopyGetChar();
}
result = true; /* report EOF */
break;
}
if (csv_mode)
CopyDonePeek(c2, false); /* not a dot, so put it back */
else
/* not EOF mark, so emit \ and following char literally */
appendStringInfoCharMacro(&line_buf, '\\');
}
appendStringInfoCharMacro(&line_buf, c);
/*
* When client encoding != server, must be careful to read the
2004-08-29 07:07:03 +02:00
* extra bytes of a multibyte character exactly, since the
* encoding might not ensure they don't look like ASCII. When the
* encodings are the same, we need not do this, since no server
* encoding we use has ASCII-like following bytes.
*/
if (change_encoding)
{
s[0] = c;
mblen = pg_encoding_mblen(client_encoding, s);
for (j = 1; j < mblen; j++)
{
c = CopyGetChar();
if (c == EOF)
{
result = true;
break;
}
appendStringInfoCharMacro(&line_buf, c);
}
if (result)
break; /* out of outer loop */
}
2004-08-29 07:07:03 +02:00
} /* end of outer loop */
/* Done reading the line. Convert it to server encoding. */
if (change_encoding)
{
cvt = (char *) pg_client_to_server((unsigned char *) line_buf.data,
line_buf.len);
if (cvt != line_buf.data)
{
/* transfer converted data back to line_buf */
line_buf.len = 0;
line_buf.data[0] = '\0';
appendBinaryStringInfo(&line_buf, cvt, strlen(cvt));
}
}
/* Now it's safe to use the buffer in error messages */
line_buf_converted = true;
return result;
}
/*
* Return decimal value for a hexadecimal digit
*/
static
int GetDecimalFromHex(char hex)
{
if (isdigit(hex))
return hex - '0';
else
return tolower(hex) - 'a' + 10;
}
/*----------
* Read the value of a single attribute, performing de-escaping as needed.
*
* delim is the column delimiter string (must be just one byte for now).
* null_print is the null marker string. Note that this is compared to
* the pre-de-escaped input string.
*
* *result is set to indicate what terminated the read:
* NORMAL_ATTR: column delimiter
* END_OF_LINE: end of line
* In either case, the string read up to the terminator is returned.
*
* *isnull is set true or false depending on whether the input matched
* the null marker. Note that the caller cannot check this since the
* returned string will be the post-de-escaping equivalent, which may
* look the same as some valid data string.
*----------
*/
static char *
CopyReadAttribute(const char *delim, const char *null_print,
CopyReadResult *result, bool *isnull)
{
char c;
char delimc = delim[0];
int start_cursor = line_buf.cursor;
int end_cursor;
int input_len;
/* reset attribute_buf to empty */
attribute_buf.len = 0;
attribute_buf.data[0] = '\0';
/* set default status */
*result = END_OF_LINE;
for (;;)
{
end_cursor = line_buf.cursor;
if (line_buf.cursor >= line_buf.len)
break;
c = line_buf.data[line_buf.cursor++];
if (c == delimc)
{
*result = NORMAL_ATTR;
break;
}
if (c == '\\')
{
if (line_buf.cursor >= line_buf.len)
break;
c = line_buf.data[line_buf.cursor++];
switch (c)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
/* handle \013 */
{
int val;
val = OCTVALUE(c);
if (line_buf.cursor < line_buf.len)
{
c = line_buf.data[line_buf.cursor];
1999-05-25 18:15:34 +02:00
if (ISOCTAL(c))
{
line_buf.cursor++;
val = (val << 3) + OCTVALUE(c);
if (line_buf.cursor < line_buf.len)
{
c = line_buf.data[line_buf.cursor];
if (ISOCTAL(c))
{
line_buf.cursor++;
val = (val << 3) + OCTVALUE(c);
}
}
}
}
c = val & 0377;
}
break;
case 'x':
/* Handle \x3F */
if (line_buf.cursor < line_buf.len)
{
char hexchar = line_buf.data[line_buf.cursor];
if (isxdigit(hexchar))
{
int val = GetDecimalFromHex(hexchar);
line_buf.cursor++;
if (line_buf.cursor < line_buf.len)
{
hexchar = line_buf.data[line_buf.cursor];
if (isxdigit(hexchar))
{
line_buf.cursor++;
val = (val << 4) + GetDecimalFromHex(hexchar);
}
}
c = val & 0xff;
}
}
break;
case 'b':
c = '\b';
break;
case 'f':
c = '\f';
break;
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case 'v':
c = '\v';
break;
2004-08-29 07:07:03 +02:00
/*
* in all other cases, take the char after '\'
* literally
*/
}
}
appendStringInfoCharMacro(&attribute_buf, c);
}
/* check whether raw input matched null marker */
input_len = end_cursor - start_cursor;
if (input_len == strlen(null_print) &&
strncmp(&line_buf.data[start_cursor], null_print, input_len) == 0)
*isnull = true;
else
*isnull = false;
return attribute_buf.data;
}
/*
2004-08-29 07:07:03 +02:00
* Read the value of a single attribute in CSV mode,
* performing de-escaping as needed. Escaping does not follow the normal
* PostgreSQL text mode, but instead "standard" (i.e. common) CSV usage.
*
* Quoted fields can span lines, in which case the line end is embedded
* in the returned string.
*
* null_print is the null marker string. Note that this is compared to
* the pre-de-escaped input string (thus if it is quoted it is not a NULL).
*
* *result is set to indicate what terminated the read:
* NORMAL_ATTR: column delimiter
* END_OF_LINE: end of line
2004-08-29 07:07:03 +02:00
* UNTERMINATED_FIELD no quote detected at end of a quoted field
*
* In any case, the string read up to the terminator (or end of file)
* is returned.
*
* *isnull is set true or false depending on whether the input matched
* the null marker. Note that the caller cannot check this since the
* returned string will be the post-de-escaping equivalent, which may
* look the same as some valid data string.
*----------
*/
static char *
CopyReadAttributeCSV(const char *delim, const char *null_print, char *quote,
char *escape, CopyReadResult *result, bool *isnull)
{
2004-08-29 07:07:03 +02:00
char delimc = delim[0];
char quotec = quote[0];
char escapec = escape[0];
char c;
int start_cursor = line_buf.cursor;
int end_cursor = start_cursor;
int input_len;
2004-08-29 07:07:03 +02:00
bool in_quote = false;
bool saw_quote = false;
/* reset attribute_buf to empty */
attribute_buf.len = 0;
attribute_buf.data[0] = '\0';
/* set default status */
*result = END_OF_LINE;
for (;;)
{
end_cursor = line_buf.cursor;
if (line_buf.cursor >= line_buf.len)
break;
c = line_buf.data[line_buf.cursor++];
2004-08-29 07:07:03 +02:00
/* unquoted field delimiter */
if (!in_quote && c == delimc)
{
*result = NORMAL_ATTR;
break;
}
2004-08-29 07:07:03 +02:00
/* start of quoted field (or part of field) */
if (!in_quote && c == quotec)
{
saw_quote = true;
in_quote = true;
continue;
}
2004-08-29 07:07:03 +02:00
/* escape within a quoted field */
if (in_quote && c == escapec)
{
2004-08-29 07:07:03 +02:00
/*
* peek at the next char if available, and escape it if it is
* an escape char or a quote char
*/
if (line_buf.cursor <= line_buf.len)
{
2004-08-29 07:07:03 +02:00
char nextc = line_buf.data[line_buf.cursor];
if (nextc == escapec || nextc == quotec)
{
appendStringInfoCharMacro(&attribute_buf, nextc);
line_buf.cursor++;
continue;
}
}
}
2004-08-29 07:07:03 +02:00
/*
2004-08-29 07:07:03 +02:00
* end of quoted field. Must do this test after testing for escape
* in case quote char and escape char are the same (which is the
* common case).
*/
if (in_quote && c == quotec)
{
in_quote = false;
continue;
}
appendStringInfoCharMacro(&attribute_buf, c);
}
if (in_quote)
*result = UNTERMINATED_FIELD;
/* check whether raw input matched null marker */
input_len = end_cursor - start_cursor;
if (!saw_quote && input_len == strlen(null_print) &&
strncmp(&line_buf.data[start_cursor], null_print, input_len) == 0)
*isnull = true;
else
*isnull = false;
return attribute_buf.data;
}
/*
* Read a binary attribute
*/
static Datum
CopyReadBinaryAttribute(int column_no, FmgrInfo *flinfo, Oid typioparam,
bool *isnull)
{
int32 fld_size;
Datum result;
fld_size = CopyGetInt32();
if (CopyGetEof())
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unexpected EOF in COPY data")));
if (fld_size == -1)
{
*isnull = true;
return (Datum) 0;
}
if (fld_size < 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid field size")));
/* reset attribute_buf to empty, and load raw data in it */
attribute_buf.len = 0;
attribute_buf.data[0] = '\0';
attribute_buf.cursor = 0;
enlargeStringInfo(&attribute_buf, fld_size);
CopyGetData(attribute_buf.data, fld_size);
if (CopyGetEof())
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unexpected EOF in COPY data")));
attribute_buf.len = fld_size;
attribute_buf.data[fld_size] = '\0';
/* Call the column type's binary input converter */
result = FunctionCall2(flinfo,
PointerGetDatum(&attribute_buf),
ObjectIdGetDatum(typioparam));
/* Trouble if it didn't eat the whole buffer */
if (attribute_buf.cursor != attribute_buf.len)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format")));
*isnull = false;
return result;
}
/*
* Send text representation of one attribute, with conversion and escaping
*/
static void
CopyAttributeOut(char *server_string, char *delim)
{
char *string;
1998-07-27 21:38:40 +02:00
char c;
char delimc = delim[0];
bool same_encoding;
int mblen;
int i;
same_encoding = (server_encoding == client_encoding);
if (!same_encoding)
string = (char *) pg_server_to_client((unsigned char *) server_string,
strlen(server_string));
else
string = server_string;
for (; (c = *string) != '\0'; string += mblen)
{
mblen = 1;
switch (c)
{
case '\b':
CopySendString("\\b");
break;
case '\f':
CopySendString("\\f");
break;
case '\n':
CopySendString("\\n");
break;
case '\r':
CopySendString("\\r");
break;
case '\t':
CopySendString("\\t");
break;
case '\v':
CopySendString("\\v");
break;
case '\\':
CopySendString("\\\\");
break;
default:
if (c == delimc)
CopySendChar('\\');
CopySendChar(c);
/*
* We can skip pg_encoding_mblen() overhead when encoding
* is same, because in valid backend encodings, extra
* bytes of a multibyte character never look like ASCII.
*/
if (!same_encoding)
{
/* send additional bytes of the char, if any */
mblen = pg_encoding_mblen(client_encoding, string);
for (i = 1; i < mblen; i++)
CopySendChar(string[i]);
}
break;
}
}
}
/*
2004-08-29 07:07:03 +02:00
* Send CSV representation of one attribute, with conversion and
* CSV type escaping
*/
static void
CopyAttributeOutCSV(char *server_string, char *delim, char *quote,
char *escape, bool use_quote)
{
char *string;
char c;
char delimc = delim[0];
2004-08-29 07:07:03 +02:00
char quotec = quote[0];
char escapec = escape[0];
char *test_string;
bool same_encoding;
int mblen;
int i;
same_encoding = (server_encoding == client_encoding);
if (!same_encoding)
string = (char *) pg_server_to_client((unsigned char *) server_string,
strlen(server_string));
else
string = server_string;
2004-08-29 07:07:03 +02:00
/*
* have to run through the string twice, first time to see if it needs
* quoting, second to actually send it
*/
2004-08-29 07:07:03 +02:00
for (test_string = string;
!use_quote && (c = *test_string) != '\0';
test_string += mblen)
{
if (c == delimc || c == quotec || c == '\n' || c == '\r')
use_quote = true;
if (!same_encoding)
mblen = pg_encoding_mblen(client_encoding, test_string);
else
mblen = 1;
}
if (use_quote)
CopySendChar(quotec);
for (; (c = *string) != '\0'; string += mblen)
{
if (use_quote && (c == quotec || c == escapec))
CopySendChar(escapec);
CopySendChar(c);
if (!same_encoding)
{
/* send additional bytes of the char, if any */
mblen = pg_encoding_mblen(client_encoding, string);
for (i = 1; i < mblen; i++)
CopySendChar(string[i]);
}
else
mblen = 1;
}
if (use_quote)
CopySendChar(quotec);
}
/*
* CopyGetAttnums - build an integer list of attnums to be copied
*
* The input attnamelist is either the user-specified column list,
* or NIL if there was none (in which case we want all the non-dropped
* columns).
*/
static List *
CopyGetAttnums(Relation rel, List *attnamelist)
{
2002-09-04 22:31:48 +02:00
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
2002-09-04 22:31:48 +02:00
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
2002-09-04 22:31:48 +02:00
int attr_count = tupDesc->natts;
int i;
for (i = 0; i < attr_count; i++)
{
if (attr[i]->attisdropped)
continue;
attnums = lappend_int(attnums, i + 1);
}
}
else
{
/* Validate the user-supplied list and extract attnums */
ListCell *l;
foreach(l, attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
/* Lookup column name, ereport on failure */
/* Note we disallow system columns here */
attnum = attnameAttNum(rel, name, false);
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
2004-08-29 07:07:03 +02:00
errmsg("column \"%s\" specified more than once",
name)));
attnums = lappend_int(attnums, attnum);
}
}
return attnums;
}