postgresql/src/backend/commands/copyto.c

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1311 lines
34 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* copyto.c
* COPY <table> TO file/program/client
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/copyto.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
/*
* Represents the different dest cases we need to worry about at
* the bottom level
*/
typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
} CopyDest;
/*
* This struct contains all the state variables used throughout a COPY TO
* operation.
*
* Multi-byte encodings: all supported client-side encodings encode multi-byte
* characters by having the first byte's high bit set. Subsequent bytes of the
* character can have the high bit not set. When scanning data in such an
* encoding to look for a match to a single-byte (ie ASCII) character, we must
* use the full pg_encoding_mblen() machinery to skip over multibyte
* characters, else we might find a false match to a trailing byte. In
* supported server encodings, there is no possibility of a false match, and
* it's faster to make useless comparisons to trailing bytes than it is to
* invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
* when we have to do it the hard way.
*/
typedef struct CopyToStateData
{
/* low-level state data */
CopyDest copy_dest; /* type of copy source/destination */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
StringInfo fe_msgbuf; /* used for all dests during COPY TO */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
/* parameters from the COPY command */
Relation rel; /* relation to copy to */
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
/*
* Working state
*/
MemoryContext copycontext; /* per-copy execution context */
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData;
/* DestReceiver for COPY (query) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyToState cstate; /* CopyToStateData for the command */
uint64 processed; /* # of tuples processed */
} DR_copy;
/* NOTE: there's a copy of this in copyfromparse.c */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static void EndCopy(CopyToState cstate);
static void ClosePipeToProgram(CopyToState cstate);
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
static void CopyAttributeOutText(CopyToState cstate, char *string);
static void CopyAttributeOutCSV(CopyToState cstate, char *string,
bool use_quote, bool single_attr);
/* Low-level communications functions */
static void SendCopyBegin(CopyToState cstate);
static void SendCopyEnd(CopyToState cstate);
static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
static void CopySendString(CopyToState cstate, const char *str);
static void CopySendChar(CopyToState cstate, char c);
static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
*/
static void
SendCopyBegin(CopyToState cstate)
{
StringInfoData buf;
int natts = list_length(cstate->attnumlist);
int16 format = (cstate->opts.binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
pq_sendint16(&buf, format); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_dest = COPY_FRONTEND;
}
static void
SendCopyEnd(CopyToState cstate)
{
/* Shouldn't have any unsent data */
Assert(cstate->fe_msgbuf->len == 0);
/* Send Copy Done message */
pq_putemptymessage('c');
}
/*----------
* CopySendData sends output data to the destination (file or frontend)
* CopySendString does the same for null-terminated strings
* CopySendChar does the same for single characters
* CopySendEndOfRow does the appropriate thing at end of each data row
* (data is not actually flushed except by CopySendEndOfRow)
*
* NB: no data conversion is applied by these functions
*----------
*/
static void
CopySendData(CopyToState cstate, const void *databuf, int datasize)
{
appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
}
static void
CopySendString(CopyToState cstate, const char *str)
{
appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
}
static void
CopySendChar(CopyToState cstate, char c)
{
appendStringInfoCharMacro(cstate->fe_msgbuf, c);
}
static void
CopySendEndOfRow(CopyToState cstate)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;
switch (cstate->copy_dest)
{
case COPY_FILE:
if (!cstate->opts.binary)
{
/* Default line termination depends on platform */
#ifndef WIN32
CopySendChar(cstate, '\n');
#else
CopySendString(cstate, "\r\n");
#endif
}
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
{
if (cstate->is_program)
{
if (errno == EPIPE)
{
/*
* The pipe will be closed automatically on error at
* the end of transaction, but we might get a better
* error message from the subprocess' exit code than
* just "Broken Pipe"
*/
ClosePipeToProgram(cstate);
/*
* If ClosePipeToProgram() didn't throw an error, the
* program terminated normally, but closed the pipe
* first. Restore errno, and throw an error.
*/
errno = EPIPE;
}
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY program: %m")));
}
else
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
}
break;
case COPY_FRONTEND:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!cstate->opts.binary)
CopySendChar(cstate, '\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
}
/* Update the progress */
cstate->bytes_processed += fe_msgbuf->len;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
resetStringInfo(fe_msgbuf);
}
/*
* These functions do apply some data conversion
*/
/*
* CopySendInt32 sends an int32 in network byte order
*/
static inline void
CopySendInt32(CopyToState cstate, int32 val)
{
uint32 buf;
buf = pg_hton32((uint32) val);
CopySendData(cstate, &buf, sizeof(buf));
}
/*
* CopySendInt16 sends an int16 in network byte order
*/
static inline void
CopySendInt16(CopyToState cstate, int16 val)
{
uint16 buf;
buf = pg_hton16((uint16) val);
CopySendData(cstate, &buf, sizeof(buf));
}
/*
* Closes the pipe to an external program, checking the pclose() return code.
*/
static void
ClosePipeToProgram(CopyToState cstate)
{
int pclose_rc;
Assert(cstate->is_program);
pclose_rc = ClosePipeStream(cstate->copy_file);
if (pclose_rc == -1)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close pipe to external command: %m")));
else if (pclose_rc != 0)
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("program \"%s\" failed",
cstate->filename),
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
}
}
/*
* Release resources allocated in a cstate for COPY TO/FROM.
*/
static void
EndCopy(CopyToState cstate)
{
if (cstate->is_program)
{
ClosePipeToProgram(cstate);
}
else
{
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
}
pgstat_progress_end_command();
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
/*
* Setup CopyToState to read tuples from a table or a query for COPY TO.
*/
CopyToState
BeginCopyTo(ParseState *pstate,
Relation rel,
RawStmt *raw_query,
Oid queryRelId,
const char *filename,
bool is_program,
List *attnamelist,
List *options)
{
CopyToState cstate;
bool pipe = (filename == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_TO,
0
};
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
if (rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from materialized view \"%s\"",
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from foreign table \"%s\"",
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(rel))));
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from partitioned table \"%s\"",
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(rel))));
}
/* Allocate workspace and zero all fields */
cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
/*
* We allocate everything used by a cstate in a new memory context. This
* avoids memory leaks during repeated use of COPY in a query.
*/
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY",
ALLOCSET_DEFAULT_SIZES);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
/* Process the source/target relation or query */
if (rel)
{
Assert(!raw_query);
cstate->rel = rel;
tupDesc = RelationGetDescr(cstate->rel);
}
else
{
List *rewritten;
Query *query;
PlannedStmt *plan;
DestReceiver *dest;
cstate->rel = NULL;
/*
* Run parse analysis and rewrite. Note this also acquires sufficient
* locks on the source table(s).
*/
Centralize the logic for protective copying of utility statements. In the "simple Query" code path, it's fine for parse analysis or execution of a utility statement to scribble on the statement's node tree, since that'll just be thrown away afterwards. However it's not fine if the node tree is in the plan cache, as then it'd be corrupted for subsequent executions. Up to now we've dealt with that by having individual utility-statement functions apply copyObject() if they were going to modify the tree. But that's prone to errors of omission. Bug #17053 from Charles Samborski shows that CREATE/ALTER DOMAIN didn't get this memo, and can crash if executed repeatedly from plan cache. In the back branches, we'll just apply a narrow band-aid for that, but in HEAD it seems prudent to have a more principled fix that will close off the possibility of other similar bugs in future. Hence, let's hoist the responsibility for doing copyObject up into ProcessUtility from its children, thus ensuring that it happens for all utility statement types. Also, modify ProcessUtility's API so that its callers can tell it whether a copy step is necessary. It turns out that in all cases, the immediate caller knows whether the node tree is transient, so this doesn't involve a huge amount of code thrashing. In this way, while we lose a little bit in the execute-from-cache code path due to sometimes copying node trees that wouldn't be mutated anyway, we gain something in the simple-Query code path by not copying throwaway node trees. Statements that are complex enough to be expensive to copy are almost certainly ones that would have to be copied anyway, so the loss in the cache code path shouldn't be much. (Note that this whole problem applies only to utility statements. Optimizable statements don't have the issue because we long ago made the executor treat Plan trees as read-only. Perhaps someday we will make utility statement execution act likewise, but I'm not holding my breath.) Discussion: https://postgr.es/m/931771.1623893989@sss.pgh.pa.us Discussion: https://postgr.es/m/17053-3ca3f501bbc212b4@postgresql.org
2021-06-18 17:22:58 +02:00
rewritten = pg_analyze_and_rewrite(raw_query,
pstate->p_sourcetext, NULL, 0,
NULL);
/* check that we got back something we can work with */
if (rewritten == NIL)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
}
else if (list_length(rewritten) > 1)
{
ListCell *lc;
/* examine queries to determine which error message to issue */
foreach(lc, rewritten)
{
Query *q = lfirst_node(Query, lc);
if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("conditional DO INSTEAD rules are not supported for COPY")));
if (q->querySource == QSRC_NON_INSTEAD_RULE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DO ALSO rules are not supported for the COPY")));
}
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
}
query = linitial_node(Query, rewritten);
/* The grammar allows SELECT INTO, but we don't support that */
if (query->utilityStmt != NULL &&
IsA(query->utilityStmt, CreateTableAsStmt))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT INTO) is not supported")));
Assert(query->utilityStmt == NULL);
/*
* Similarly the grammar doesn't enforce the presence of a RETURNING
* clause, but this is required here.
*/
if (query->commandType != CMD_SELECT &&
query->returningList == NIL)
{
Assert(query->commandType == CMD_INSERT ||
query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY query must have a RETURNING clause")));
}
/* plan the query */
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, NULL);
/*
2021-04-21 08:14:43 +02:00
* With row-level security and a user using "COPY relation TO", we
* have to convert the "COPY relation TO" to a query-based COPY (eg:
* "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
* in any RLS clauses.
*
* When this happens, we are passed in the relid of the originally
* found relation (which we have locked). As the planner will look up
* the relation again, we double-check here to make sure it found the
* same one that we have locked.
*/
if (queryRelId != InvalidOid)
{
/*
* Note that with RLS involved there may be multiple relations,
* and while the one we need is almost certainly first, we don't
* make any guarantees of that in the planner, so check the whole
* list and make sure we find the original relation.
*/
if (!list_member_oid(plan->relationOids, queryRelId))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("relation referenced by COPY statement has changed")));
}
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries.
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/* Create dest receiver for COPY OUT */
dest = CreateDestReceiver(DestCopyOut);
((DR_copy *) dest)->cstate = cstate;
/* Create a QueryDesc requesting no output */
cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
GetActiveSnapshot(),
InvalidSnapshot,
dest, NULL, NULL, 0);
/*
* Call ExecutorStart to prepare the plan for execution.
*
* ExecutorStart computes a result tupdesc for us
*/
ExecutorStart(cstate->queryDesc, 0);
tupDesc = cstate->queryDesc->tupDesc;
}
/* Generate or convert list of attributes to process */
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
num_phys_attrs = tupDesc->natts;
/* Convert FORCE_QUOTE name list to per-column flags, check validity */
cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_quote_all)
{
int i;
for (i = 0; i < num_phys_attrs; i++)
cstate->opts.force_quote_flags[i] = true;
}
else if (cstate->opts.force_quote)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->opts.force_quote_flags[attnum - 1] = true;
}
}
/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_notnull)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->opts.force_notnull_flags[attnum - 1] = true;
}
}
/* Convert FORCE_NULL name list to per-column flags, check validity */
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_null)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->opts.force_null_flags[attnum - 1] = true;
}
}
/* Use client encoding when ENCODING option is not specified. */
if (cstate->opts.file_encoding < 0)
cstate->file_encoding = pg_get_client_encoding();
else
cstate->file_encoding = cstate->opts.file_encoding;
/*
* Set up encoding conversion info. Even if the file and server encodings
* are the same, we must apply pg_any_to_server() to validate data in
* multibyte encodings.
*/
cstate->need_transcoding =
(cstate->file_encoding != GetDatabaseEncoding() ||
pg_database_encoding_max_length() > 1);
/* See Multibyte encoding comment above */
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
cstate->copy_dest = COPY_FILE; /* default */
MemoryContextSwitchTo(oldcontext);
if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
else
{
cstate->filename = pstrdup(filename);
cstate->is_program = is_program;
if (is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not execute command \"%s\": %m",
cstate->filename)));
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
*/
if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
oumask = umask(S_IWGRP | S_IWOTH);
PG_TRY();
{
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
}
PG_FINALLY();
{
umask(oumask);
}
PG_END_TRY();
if (cstate->copy_file == NULL)
{
/* copy errno because ereport subfunctions might change it */
int save_errno = errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
cstate->filename),
(save_errno == ENOENT || save_errno == EACCES) ?
errhint("COPY TO instructs the PostgreSQL server process to write a file. "
"You may want a client-side facility such as psql's \\copy.") : 0));
}
if (fstat(fileno(cstate->copy_file), &st))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
cstate->filename)));
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
}
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
cstate->bytes_processed = 0;
MemoryContextSwitchTo(oldcontext);
return cstate;
}
/*
* Clean up storage and release resources for COPY TO.
*/
void
EndCopyTo(CopyToState cstate)
{
if (cstate->queryDesc != NULL)
{
/* Close down the query and free resources. */
ExecutorFinish(cstate->queryDesc);
ExecutorEnd(cstate->queryDesc);
FreeQueryDesc(cstate->queryDesc);
PopActiveSnapshot();
}
/* Clean up storage */
EndCopy(cstate);
}
/*
* Copy from relation or query TO file.
*/
uint64
DoCopyTo(CopyToState cstate)
{
bool pipe = (cstate->filename == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
ListCell *cur;
uint64 processed;
if (fe_copy)
SendCopyBegin(cstate);
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
/* Get info about the columns we need to process. */
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (cstate->opts.binary)
getTypeBinaryOutputInfo(attr->atttypid,
&out_func_oid,
&isvarlena);
else
getTypeOutputInfo(attr->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype output routines, and should be faster than retail pfree's
* anyway. (We don't need a whole econtext as CopyFrom does.)
*/
cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
if (cstate->opts.binary)
{
/* Generate header for a binary copy */
int32 tmp;
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
tmp = 0;
CopySendInt32(cstate, tmp);
/* No header extension */
tmp = 0;
CopySendInt32(cstate, tmp);
}
else
{
/*
* For non-binary copy, we need to convert null_print to file
* encoding, because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
cstate->opts.null_print_len,
cstate->file_encoding);
/* if a header has been requested send the line */
if (cstate->opts.header_line)
{
bool hdr_delim = false;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
hdr_delim = true;
colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
CopyAttributeOutCSV(cstate, colname, false,
list_length(cstate->attnumlist) == 1);
}
CopySendEndOfRow(cstate);
}
}
if (cstate->rel)
{
TupleTableSlot *slot;
TableScanDesc scandesc;
scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
slot = table_slot_create(cstate->rel, NULL);
processed = 0;
while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
{
CHECK_FOR_INTERRUPTS();
/* Deconstruct the tuple ... */
slot_getallattrs(slot);
/* Format and send the data */
CopyOneRowTo(cstate, slot);
/*
* Increment the number of processed tuples, and report the
* progress.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}
ExecDropSingleTupleTableSlot(slot);
table_endscan(scandesc);
}
else
{
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
if (cstate->opts.binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
MemoryContextDelete(cstate->rowcontext);
if (fe_copy)
SendCopyEnd(cstate);
return processed;
}
/*
* Emit one row during DoCopyTo().
*/
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
ListCell *cur;
char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
if (cstate->opts.binary)
{
/* Binary per-tuple header */
CopySendInt16(cstate, list_length(cstate->attnumlist));
}
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Datum value = slot->tts_values[attnum - 1];
bool isnull = slot->tts_isnull[attnum - 1];
if (!cstate->opts.binary)
{
if (need_delim)
CopySendChar(cstate, cstate->opts.delim[0]);
need_delim = true;
}
if (isnull)
{
if (!cstate->opts.binary)
CopySendString(cstate, cstate->opts.null_print_client);
else
CopySendInt32(cstate, -1);
}
else
{
if (!cstate->opts.binary)
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (cstate->opts.csv_mode)
CopyAttributeOutCSV(cstate, string,
cstate->opts.force_quote_flags[attnum - 1],
list_length(cstate->attnumlist) == 1);
else
CopyAttributeOutText(cstate, string);
}
else
{
bytea *outputbytes;
outputbytes = SendFunctionCall(&out_functions[attnum - 1],
value);
CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(cstate, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
}
CopySendEndOfRow(cstate);
MemoryContextSwitchTo(oldcontext);
}
/*
* Send text representation of one attribute, with conversion and escaping
*/
#define DUMPSOFAR() \
do { \
if (ptr > start) \
CopySendData(cstate, start, ptr - start); \
} while (0)
static void
CopyAttributeOutText(CopyToState cstate, char *string)
{
char *ptr;
char *start;
char c;
char delimc = cstate->opts.delim[0];
if (cstate->need_transcoding)
ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
else
ptr = string;
/*
* We have to grovel through the string searching for control characters
* and instances of the delimiter character. In most cases, though, these
* are infrequent. To avoid overhead from calling CopySendData once per
* character, we dump out all characters between escaped characters in a
* single call. The loop invariant is that the data from "start" to "ptr"
* can be sent literally, but hasn't yet been.
*
* We can skip pg_encoding_mblen() overhead when encoding is safe, because
* in valid backend encodings, extra bytes of a multibyte character never
* look like ASCII. This loop is sufficiently performance-critical that
* it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
* of the normal safe-encoding path.
*/
if (cstate->encoding_embeds_ascii)
{
start = ptr;
while ((c = *ptr) != '\0')
{
if ((unsigned char) c < (unsigned char) 0x20)
{
/*
* \r and \n must be escaped, the others are traditional. We
* prefer to dump these using the C-like notation, rather than
* a backslash and the literal character, because it makes the
* dump file a bit more proof against Microsoftish data
* mangling.
*/
switch (c)
{
case '\b':
c = 'b';
break;
case '\f':
c = 'f';
break;
case '\n':
c = 'n';
break;
case '\r':
c = 'r';
break;
case '\t':
c = 't';
break;
case '\v':
c = 'v';
break;
default:
/* If it's the delimiter, must backslash it */
if (c == delimc)
break;
/* All ASCII control chars are length 1 */
ptr++;
continue; /* fall to end of loop */
}
/* if we get here, we need to convert the control char */
DUMPSOFAR();
CopySendChar(cstate, '\\');
CopySendChar(cstate, c);
start = ++ptr; /* do not include char in next run */
}
else if (c == '\\' || c == delimc)
{
DUMPSOFAR();
CopySendChar(cstate, '\\');
start = ptr++; /* we include char in next run */
}
else if (IS_HIGHBIT_SET(c))
ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
else
ptr++;
}
}
else
{
start = ptr;
while ((c = *ptr) != '\0')
{
if ((unsigned char) c < (unsigned char) 0x20)
{
/*
* \r and \n must be escaped, the others are traditional. We
* prefer to dump these using the C-like notation, rather than
* a backslash and the literal character, because it makes the
* dump file a bit more proof against Microsoftish data
* mangling.
*/
switch (c)
{
case '\b':
c = 'b';
break;
case '\f':
c = 'f';
break;
case '\n':
c = 'n';
break;
case '\r':
c = 'r';
break;
case '\t':
c = 't';
break;
case '\v':
c = 'v';
break;
default:
/* If it's the delimiter, must backslash it */
if (c == delimc)
break;
/* All ASCII control chars are length 1 */
ptr++;
continue; /* fall to end of loop */
}
/* if we get here, we need to convert the control char */
DUMPSOFAR();
CopySendChar(cstate, '\\');
CopySendChar(cstate, c);
start = ++ptr; /* do not include char in next run */
}
else if (c == '\\' || c == delimc)
{
DUMPSOFAR();
CopySendChar(cstate, '\\');
start = ptr++; /* we include char in next run */
}
else
ptr++;
}
}
DUMPSOFAR();
}
/*
* Send text representation of one attribute, with conversion and
* CSV-style escaping
*/
static void
CopyAttributeOutCSV(CopyToState cstate, char *string,
bool use_quote, bool single_attr)
{
char *ptr;
char *start;
char c;
char delimc = cstate->opts.delim[0];
char quotec = cstate->opts.quote[0];
char escapec = cstate->opts.escape[0];
/* force quoting if it matches null_print (before conversion!) */
if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
use_quote = true;
if (cstate->need_transcoding)
ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
else
ptr = string;
/*
* Make a preliminary pass to discover if it needs quoting
*/
if (!use_quote)
{
/*
* Because '\.' can be a data value, quote it if it appears alone on a
* line so it is not interpreted as the end-of-data marker.
*/
if (single_attr && strcmp(ptr, "\\.") == 0)
use_quote = true;
else
{
char *tptr = ptr;
while ((c = *tptr) != '\0')
{
if (c == delimc || c == quotec || c == '\n' || c == '\r')
{
use_quote = true;
break;
}
if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
else
tptr++;
}
}
}
if (use_quote)
{
CopySendChar(cstate, quotec);
/*
* We adopt the same optimization strategy as in CopyAttributeOutText
*/
start = ptr;
while ((c = *ptr) != '\0')
{
if (c == quotec || c == escapec)
{
DUMPSOFAR();
CopySendChar(cstate, escapec);
start = ptr; /* we include char in next run */
}
if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
else
ptr++;
}
DUMPSOFAR();
CopySendChar(cstate, quotec);
}
else
{
/* If it doesn't need quoting, we can just dump it as-is */
CopySendString(cstate, ptr);
}
}
/*
* copy_dest_startup --- executor startup
*/
static void
copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* no-op */
}
/*
* copy_dest_receive --- receive one tuple
*/
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
CopyToState cstate = myState->cstate;
/* Send the data */
CopyOneRowTo(cstate, slot);
/* Increment the number of processed tuples, and report the progress */
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++myState->processed);
return true;
}
/*
* copy_dest_shutdown --- executor end
*/
static void
copy_dest_shutdown(DestReceiver *self)
{
/* no-op */
}
/*
* copy_dest_destroy --- release DestReceiver object
*/
static void
copy_dest_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* CreateCopyDestReceiver -- create a suitable DestReceiver object
*/
DestReceiver *
CreateCopyDestReceiver(void)
{
DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
self->pub.receiveSlot = copy_dest_receive;
self->pub.rStartup = copy_dest_startup;
self->pub.rShutdown = copy_dest_shutdown;
self->pub.rDestroy = copy_dest_destroy;
self->pub.mydest = DestCopyOut;
self->cstate = NULL; /* will be set later */
self->processed = 0;
return (DestReceiver *) self;
}