postgresql/src/backend/commands/copyfrom.c

1620 lines
49 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* copyfrom.c
* COPY <table> FROM file/program/client
*
* This file contains routines needed to efficiently load tuples into a
* table. That includes looking up the correct partition, firing triggers,
* calling the table AM function to insert the data, and updating indexes.
* Reading data from the input file or client and parsing it into Datums
* is handled in copyfromparse.c.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/copyfrom.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "executor/tuptable.h"
#include "foreign/fdwapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
/*
* No more than this many tuples per CopyMultiInsertBuffer
*
* Caution: Don't make this too big, as we could end up with this many
* CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
* multiInsertBuffers list. Increasing this can cause quadratic growth in
* memory requirements during copies into partitioned tables with a large
* number of partitions.
*/
#define MAX_BUFFERED_TUPLES 1000
/*
* Flush buffers if there are >= this many bytes, as counted by the input
* size, of tuples stored.
*/
#define MAX_BUFFERED_BYTES 65535
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
/*
* Stores one or many CopyMultiInsertBuffers and details about the size and
* number of tuples which are stored in them. This allows multiple buffers to
* exist at once when COPYing into a partitioned table.
*/
typedef struct CopyMultiInsertInfo
{
List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
int bufferedTuples; /* number of tuples buffered over all buffers */
int bufferedBytes; /* number of bytes from all buffered tuples */
CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
EState *estate; /* Executor state used for COPY */
CommandId mycid; /* Command Id used for COPY */
int ti_options; /* table insert options */
} CopyMultiInsertInfo;
/* non-export function prototypes */
static char *limit_printout_length(const char *str);
static void ClosePipeFromProgram(CopyFromState cstate);
/*
* error context callback for COPY FROM
*
* The argument for the error context must be CopyFromState.
*/
void
CopyFromErrorCallback(void *arg)
{
CopyFromState cstate = (CopyFromState) arg;
char curlineno_str[32];
snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
cstate->cur_lineno);
if (cstate->opts.binary)
{
/* can't usefully display the data */
if (cstate->cur_attname)
errcontext("COPY %s, line %s, column %s",
cstate->cur_relname, curlineno_str,
cstate->cur_attname);
else
errcontext("COPY %s, line %s",
cstate->cur_relname, curlineno_str);
}
else
{
if (cstate->cur_attname && cstate->cur_attval)
{
/* error is relevant to a particular column */
char *attval;
attval = limit_printout_length(cstate->cur_attval);
errcontext("COPY %s, line %s, column %s: \"%s\"",
cstate->cur_relname, curlineno_str,
cstate->cur_attname, attval);
pfree(attval);
}
else if (cstate->cur_attname)
{
/* error is relevant to a particular column, value is NULL */
errcontext("COPY %s, line %s, column %s: null input",
cstate->cur_relname, curlineno_str,
cstate->cur_attname);
}
else
{
/*
* Error is relevant to a particular line.
*
* If line_buf still contains the correct line, print it.
*/
if (cstate->line_buf_valid)
{
char *lineval;
lineval = limit_printout_length(cstate->line_buf.data);
errcontext("COPY %s, line %s: \"%s\"",
cstate->cur_relname, curlineno_str, lineval);
pfree(lineval);
}
else
{
errcontext("COPY %s, line %s",
cstate->cur_relname, curlineno_str);
}
}
}
}
/*
* Make sure we don't print an unreasonable amount of COPY data in a message.
*
* Returns a pstrdup'd copy of the input.
*/
static char *
limit_printout_length(const char *str)
{
#define MAX_COPY_DATA_DISPLAY 100
int slen = strlen(str);
int len;
char *res;
/* Fast path if definitely okay */
if (slen <= MAX_COPY_DATA_DISPLAY)
return pstrdup(str);
/* Apply encoding-dependent truncation */
len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
/*
* Truncate, and add "..." to show we truncated the input.
*/
res = (char *) palloc(len + 4);
memcpy(res, str, len);
strcpy(res + len, "...");
return res;
}
/*
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
CopyMultiInsertBufferInit(ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
return buffer;
}
/*
* Make a new buffer for this ResultRelInfo.
*/
static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = CopyMultiInsertBufferInit(rri);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
/* Record that we're tracking this buffer */
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
}
/*
* Initialize an already allocated CopyMultiInsertInfo.
*
* If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
* for that table.
*/
static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyFromState cstate, EState *estate, CommandId mycid,
int ti_options)
{
miinfo->multiInsertBuffers = NIL;
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
miinfo->cstate = cstate;
miinfo->estate = estate;
miinfo->mycid = mycid;
miinfo->ti_options = ti_options;
/*
* Only setup the buffer when not dealing with a partitioned table.
* Buffers for partitioned tables will just be setup when we need to send
* tuples their way for the first time.
*/
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
CopyMultiInsertInfoSetupBuffer(miinfo, rri);
}
/*
* Returns true if the buffers are full
*/
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
return true;
return false;
}
/*
* Returns true if we have no buffered tuples
*/
static inline bool
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
{
return miinfo->bufferedTuples == 0;
}
/*
* Write the tuples stored in 'buffer' out to the table.
*/
static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
MemoryContext oldcontext;
int i;
uint64 save_cur_lineno;
CopyFromState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
int nused = buffer->nused;
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
TupleTableSlot **slots = buffer->slots;
/*
* Print error context information correctly, if one of the operations
* below fail.
*/
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
/*
* table_multi_insert may leak memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
table_multi_insert(resultRelInfo->ri_RelationDesc,
slots,
nused,
mycid,
ti_options,
buffer->bistate);
MemoryContextSwitchTo(oldcontext);
for (i = 0; i < nused; i++)
{
/*
* If there are any indexes, update them for all the inserted tuples,
* and run AFTER ROW INSERT triggers.
*/
if (resultRelInfo->ri_NumIndices > 0)
{
List *recheckIndexes;
cstate->cur_lineno = buffer->linenos[i];
recheckIndexes =
ExecInsertIndexTuples(resultRelInfo,
buffer->slots[i], estate, false, false,
NULL, NIL);
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], recheckIndexes,
cstate->transition_capture);
list_free(recheckIndexes);
}
/*
* There's no indexes, but see if we need to run AFTER ROW INSERT
* triggers anyway.
*/
else if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
{
cstate->cur_lineno = buffer->linenos[i];
ExecARInsertTriggers(estate, resultRelInfo,
slots[i], NIL, cstate->transition_capture);
}
ExecClearTuple(slots[i]);
}
/* Mark that all slots are free */
buffer->nused = 0;
/* reset cur_lineno and line_buf_valid to what they were */
cstate->line_buf_valid = line_buf_valid;
cstate->cur_lineno = save_cur_lineno;
}
/*
* Drop used slots and free member for this buffer.
*
* The buffer must be flushed before cleanup.
*/
static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
int i;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
/* Remove back-link to ourself */
buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
miinfo->ti_options);
pfree(buffer);
}
/*
* Write out all stored tuples in all buffers out to the tables.
*
* Once flushed we also trim the tracked buffers list down to size by removing
* the buffers created earliest first.
*
* Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
* used. When cleaning up old buffers we'll never remove the one for
* 'curr_rri'.
*/
static inline void
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
{
ListCell *lc;
foreach(lc, miinfo->multiInsertBuffers)
{
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
CopyMultiInsertBufferFlush(miinfo, buffer);
}
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
/*
* Trim the list of tracked buffers down if it exceeds the limit. Here we
* remove buffers starting with the ones we created first. It seems less
* likely that these older ones will be needed than the ones that were
* just created.
*/
while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
/*
* We never want to remove the buffer that's currently being used, so
* if we happen to find that then move it to the end of the list.
*/
if (buffer->resultRelInfo == curr_rri)
{
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
}
CopyMultiInsertBufferCleanup(miinfo, buffer);
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
}
}
/*
* Cleanup allocated buffers and free memory
*/
static inline void
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
{
ListCell *lc;
foreach(lc, miinfo->multiInsertBuffers)
CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
list_free(miinfo->multiInsertBuffers);
}
/*
* Get the next TupleTableSlot that the next tuple should be stored in.
*
* Callers must ensure that the buffer is not full.
*
* Note: 'miinfo' is unused but has been included for consistency with the
* other functions in this area.
*/
static inline TupleTableSlot *
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
int nused = buffer->nused;
Assert(buffer != NULL);
Assert(nused < MAX_BUFFERED_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
return buffer->slots[nused];
}
/*
* Record the previously reserved TupleTableSlot that was reserved by
* CopyMultiInsertInfoNextFreeSlot as being consumed.
*/
static inline void
CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
TupleTableSlot *slot, int tuplen, uint64 lineno)
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
Assert(slot == buffer->slots[buffer->nused]);
/* Store the line number so we can properly report any errors later */
buffer->linenos[buffer->nused] = lineno;
/* Record this slot as being used */
buffer->nused++;
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
miinfo->bufferedBytes += tuplen;
}
/*
* Copy FROM file to relation.
*/
uint64
CopyFrom(CopyFromState cstate)
{
ResultRelInfo *resultRelInfo;
ResultRelInfo *target_resultRelInfo;
ResultRelInfo *prevResultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ModifyTableState *mtstate;
ExprContext *econtext;
TupleTableSlot *singleslot = NULL;
MemoryContext oldcontext = CurrentMemoryContext;
PartitionTupleRouting *proute = NULL;
ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true);
int ti_options = 0; /* start with default options for insert */
BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
int64 processed = 0;
int64 excluded = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
Assert(cstate->rel);
Assert(list_length(cstate->range_table) == 1);
/*
* The target must be a plain, foreign, or partitioned relation, or have
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
* allowed on views, so we only hint about them in the view case.)
*/
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
!(cstate->rel->trigdesc &&
cstate->rel->trigdesc->trig_insert_instead_row))
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(cstate->rel)),
errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to materialized view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
RelationGetRelationName(cstate->rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
/*
* If the target file is new-in-transaction, we assume that checking FSM
* for free space is a waste of time. This could possibly be wrong, but
* it's unlikely.
*/
if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
ti_options |= TABLE_INSERT_SKIP_FSM;
/*
* Optimize if new relfilenode was created in this subxact or one of its
* committed children and we won't see those rows later as part of an
* earlier scan or command. The subxact test ensures that if this subxact
* aborts then the frozen rows won't be visible after xact cleanup. Note
* that the stronger test of exactly which subtransaction created it is
* crucial for correctness of this optimization. The test for an earlier
* scan or command tolerates false negatives. FREEZE causes other sessions
* to see rows they would not see under MVCC, and a false negative merely
* spreads that anomaly to the current session.
*/
if (cstate->opts.freeze)
{
/*
* We currently disallow COPY FREEZE on partitioned tables. The
* reason for this is that we've simply not yet opened the partitions
* to determine if the optimization can be applied to them. We could
* go and open them all here, but doing so may be quite a costly
* overhead for small copies. In any case, we may just end up routing
* tuples to a small number of partitions. It seems better just to
* raise an ERROR for partitioned tables.
*/
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform COPY FREEZE on a partitioned table")));
}
/*
* Tolerate one registration for the benefit of FirstXactSnapshot.
* Scan-bearing queries generally create at least two registrations,
* though relying on that is fragile, as is ignoring ActiveSnapshot.
* Clear CatalogSnapshot to avoid counting its registration. We'll
* still detect ongoing catalog scans, each of which separately
* registers the snapshot it uses.
*/
InvalidateCatalogSnapshot();
if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
ti_options |= TABLE_INSERT_FROZEN;
}
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
* here that basically duplicated execUtils.c ...)
*/
ExecInitRangeTable(estate, cstate->range_table);
resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
ExecInitResultRelation(estate, resultRelInfo, 1);
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel(resultRelInfo, CMD_INSERT);
ExecOpenIndices(resultRelInfo, false);
/*
* Set up a ModifyTableState so we can let FDW(s) init themselves for
* foreign-table result relation(s).
*/
mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = CMD_INSERT;
Rework planning and execution of UPDATE and DELETE. This patch makes two closely related sets of changes: 1. For UPDATE, the subplan of the ModifyTable node now only delivers the new values of the changed columns (i.e., the expressions computed in the query's SET clause) plus row identity information such as CTID. ModifyTable must re-fetch the original tuple to merge in the old values of any unchanged columns. The core advantage of this is that the changed columns are uniform across all tables of an inherited or partitioned target relation, whereas the other columns might not be. A secondary advantage, when the UPDATE involves joins, is that less data needs to pass through the plan tree. The disadvantage of course is an extra fetch of each tuple to be updated. However, that seems to be very nearly free in context; even worst-case tests don't show it to add more than a couple percent to the total query cost. At some point it might be interesting to combine the re-fetch with the tuple access that ModifyTable must do anyway to mark the old tuple dead; but that would require a good deal of refactoring and it seems it wouldn't buy all that much, so this patch doesn't attempt it. 2. For inherited UPDATE/DELETE, instead of generating a separate subplan for each target relation, we now generate a single subplan that is just exactly like a SELECT's plan, then stick ModifyTable on top of that. To let ModifyTable know which target relation a given incoming row refers to, a tableoid junk column is added to the row identity information. This gets rid of the horrid hack that was inheritance_planner(), eliminating O(N^2) planning cost and memory consumption in cases where there were many unprunable target relations. Point 2 of course requires point 1, so that there is a uniform definition of the non-junk columns to be returned by the subplan. We can't insist on uniform definition of the row identity junk columns however, if we want to keep the ability to have both plain and foreign tables in a partitioning hierarchy. Since it wouldn't scale very far to have every child table have its own row identity column, this patch includes provisions to merge similar row identity columns into one column of the subplan result. In particular, we can merge the whole-row Vars typically used as row identity by FDWs into one column by pretending they are type RECORD. (It's still okay for the actual composite Datums to be labeled with the table's rowtype OID, though.) There is more that can be done to file down residual inefficiencies in this patch, but it seems to be committable now. FDW authors should note several API changes: * The argument list for AddForeignUpdateTargets() has changed, and so has the method it must use for adding junk columns to the query. Call add_row_identity_var() instead of manipulating the parse tree directly. You might want to reconsider exactly what you're adding, too. * PlanDirectModify() must now work a little harder to find the ForeignScan plan node; if the foreign table is part of a partitioning hierarchy then the ForeignScan might not be the direct child of ModifyTable. See postgres_fdw for sample code. * To check whether a relation is a target relation, it's no longer sufficient to compare its relid to root->parse->resultRelation. Instead, check it against all_result_relids or leaf_result_relids, as appropriate. Amit Langote and Tom Lane Discussion: https://postgr.es/m/CA+HiwqHpHdqdDn48yCEhynnniahH78rwcrv1rEX65-fsZGBOLQ@mail.gmail.com
2021-03-31 17:52:34 +02:00
mtstate->mt_nrels = 1;
mtstate->resultRelInfo = resultRelInfo;
Fix permission checks on constraint violation errors on partitions. If a cross-partition UPDATE violates a constraint on the target partition, and the columns in the new partition are in different physical order than in the parent, the error message can reveal columns that the user does not have SELECT permission on. A similar bug was fixed earlier in commit 804b6b6db4. The cause of the bug is that the callers of the ExecBuildSlotValueDescription() function got confused when constructing the list of modified columns. If the tuple was routed from a parent, we converted the tuple to the parent's format, but the list of modified columns was grabbed directly from the child's RTE entry. ExecUpdateLockMode() had a similar issue. That lead to confusion on which columns are key columns, leading to wrong tuple lock being taken on tables referenced by foreign keys, when a row is updated with INSERT ON CONFLICT UPDATE. A new isolation test is added for that corner case. With this patch, the ri_RangeTableIndex field is no longer set for partitions that don't have an entry in the range table. Previously, it was set to the RTE entry of the parent relation, but that was confusing. NOTE: This modifies the ResultRelInfo struct, replacing the ri_PartitionRoot field with ri_RootResultRelInfo. That's a bit risky to backpatch, because it breaks any extensions accessing the field. The change that ri_RangeTableIndex is not set for partitions could potentially break extensions, too. The ResultRelInfos are visible to FDWs at least, and this patch required small changes to postgres_fdw. Nevertheless, this seem like the least bad option. I don't think these fields widely used in extensions; I don't think there are FDWs out there that uses the FDW "direct update" API, other than postgres_fdw. If there is, you will get a compilation error, so hopefully it is caught quickly. Backpatch to 11, where support for both cross-partition UPDATEs, and unique indexes on partitioned tables, were added. Reviewed-by: Amit Langote Security: CVE-2021-3393
2021-02-08 10:01:51 +01:00
mtstate->rootResultRelInfo = resultRelInfo;
if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/*
* If there are any triggers with transition tables on the named relation,
* we need to be prepared to capture transition tuples.
*
* Because partition tuple routing would like to know about whether
* transition capture is active, we also set it in mtstate, which is
* passed to ExecFindPartition() below.
*/
cstate->transition_capture = mtstate->mt_transition_capture =
MakeTransitionCaptureState(cstate->rel->trigdesc,
RelationGetRelid(cstate->rel),
CMD_INSERT);
/*
* If the named relation is a partitioned table, initialize state for
* CopyFrom tuple routing.
*/
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
Postpone some stuff out of ExecInitModifyTable. Arrange to do some things on-demand, rather than immediately during executor startup, because there's a fair chance of never having to do them at all: * Don't open result relations' indexes until needed. * Don't initialize partition tuple routing, nor the child-to-root tuple conversion map, until needed. This wins in UPDATEs on partitioned tables when only some of the partitions will actually receive updates; with larger partition counts the savings is quite noticeable. Also, we can remove some sketchy heuristics in ExecInitModifyTable about whether to set up tuple routing. Also, remove execPartition.c's private hash table tracking which partitions were already opened by the ModifyTable node. Instead use the hash added to ModifyTable itself by commit 86dc90056. To allow lazy computation of the conversion maps, we now set ri_RootResultRelInfo in all child ResultRelInfos. We formerly set it only in some, not terribly well-defined, cases. This has user-visible side effects in that now more error messages refer to the root relation instead of some partition (and provide error data in the root's column order, too). It looks to me like this is a strict improvement in consistency, so I don't have a problem with the output changes visible in this commit. Extracted from a larger patch, which seemed to me to be too messy to push in one commit. Amit Langote, reviewed at different times by Heikki Linnakangas and myself Discussion: https://postgr.es/m/CA+HiwqG7ZruBmmih3wPsBZ4s0H2EhywrnXEduckY5Hr3fWzPWA@mail.gmail.com
2021-04-06 21:56:55 +02:00
proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
if (cstate->whereClause)
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
&mtstate->ps);
/*
* It's generally more efficient to prepare a bunch of tuples for
* insertion, and insert them in one table_multi_insert() call, than call
* table_tuple_insert() separately for every tuple. However, there are a
* number of reasons why we might not be able to do this. These are
* explained below.
*/
if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
{
/*
* Can't support multi-inserts when there are any BEFORE/INSTEAD OF
* triggers on the table. Such triggers might query the table we're
* inserting into and act differently if the tuples that have already
* been processed and prepared for insertion are not there.
*/
insertMethod = CIM_SINGLE;
}
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
{
/*
* For partitioned tables we can't support multi-inserts when there
* are any statement level insert triggers. It might be possible to
* allow partitioned tables with such triggers in the future, but for
* now, CopyMultiInsertInfoFlush expects that any before row insert
* and statement level insert triggers are on the same relation.
*/
insertMethod = CIM_SINGLE;
}
else if (resultRelInfo->ri_FdwRoutine != NULL ||
cstate->volatile_defexprs)
{
/*
* Can't support multi-inserts to foreign tables or if there are any
* volatile default expressions in the table. Similarly to the
* trigger case above, such expressions may query the table we're
* inserting into.
*
* Note: It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
* COPY command.
*/
insertMethod = CIM_SINGLE;
}
else if (contain_volatile_functions(cstate->whereClause))
{
/*
* Can't support multi-inserts if there are any volatile function
* expressions in WHERE clause. Similarly to the trigger case above,
* such expressions may query the table we're inserting into.
*/
insertMethod = CIM_SINGLE;
}
else
{
/*
* For partitioned tables, we may still be able to perform bulk
* inserts. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert
* or insert instead triggers (same as we checked above for the parent
* table). Since the partition's resultRelInfos are initialized only
* when we actually need to insert the first tuple into them, we must
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* flag that we must later determine if we can use bulk-inserts for
* the partition being inserted into.
*/
if (proute)
insertMethod = CIM_MULTI_CONDITIONAL;
else
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
estate, mycid, ti_options);
}
/*
* If not using batch mode (which allocates slots as needed) set up a
* tuple slot too. When inserting into a partitioned table, we also need
* one, even if we might batch insert, to read the tuple in the root
* partition's form.
*/
if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
{
singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
&estate->es_tupleTable);
bistate = GetBulkInsertState();
}
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
/*
* Check BEFORE STATEMENT insertion triggers. It's debatable whether we
* should do this for COPY, since it's not really an "INSERT" statement as
* such. However, executing these triggers maintains consistency with the
* EACH ROW triggers that we already fire on COPY.
*/
ExecBSInsertTriggers(estate, resultRelInfo);
econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */
errcallback.callback = CopyFromErrorCallback;
errcallback.arg = (void *) cstate;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
for (;;)
{
TupleTableSlot *myslot;
bool skip_tuple;
CHECK_FOR_INTERRUPTS();
/*
* Reset the per-tuple exprcontext. We do this after every tuple, to
* clean-up after expression evaluations etc.
*/
ResetPerTupleExprContext(estate);
/* select slot to (initially) load row into */
if (insertMethod == CIM_SINGLE || proute)
{
myslot = singleslot;
Assert(myslot != NULL);
}
else
{
Assert(resultRelInfo == target_resultRelInfo);
Assert(insertMethod == CIM_MULTI);
myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
}
/*
* Switch to per-tuple context before calling NextCopyFrom, which does
* evaluate default expressions etc. and requires per-tuple context.
*/
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
ExecClearTuple(myslot);
/* Directly store the values/nulls array in the slot */
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
break;
ExecStoreVirtualTuple(myslot);
/*
* Constraints and where clause might reference the tableoid column,
* so (re-)initialize tts_tableOid before evaluating them.
*/
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
if (cstate->whereClause)
{
econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
{
/*
* Report that this tuple was filtered out by the WHERE
* clause.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
++excluded);
continue;
}
}
/* Determine the partition to insert the tuple into */
if (proute)
{
TupleConversionMap *map;
/*
* Attempt to find a partition suitable for this tuple.
* ExecFindPartition() will raise an error if none can be found or
* if the found partition is not suitable for INSERTs.
*/
resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
proute, myslot, estate);
if (prevResultRelInfo != resultRelInfo)
{
/* Determine which triggers exist on this partition */
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
/*
* Disable multi-inserts when the partition has BEFORE/INSTEAD
* OF triggers, or if the partition is a foreign partition.
*/
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
resultRelInfo->ri_FdwRoutine == NULL;
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
resultRelInfo);
}
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
{
/*
* Flush pending inserts if this partition can't use
* batching, so rows are visible to triggers etc.
*/
CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
}
if (bistate != NULL)
ReleaseBulkInsertStatePin(bistate);
prevResultRelInfo = resultRelInfo;
}
/*
* If we're capturing transition tuples, we might need to convert
* from the partition rowtype to root rowtype. But if there are no
* BEFORE triggers on the partition that could change the tuple,
* we can just remember the original unconverted tuple to avoid a
* needless round trip conversion.
*/
if (cstate->transition_capture != NULL)
cstate->transition_capture->tcs_original_insert_tuple =
!has_before_insert_row_trig ? myslot : NULL;
/*
* We might need to convert from the root rowtype to the partition
* rowtype.
*/
map = resultRelInfo->ri_RootToPartitionMap;
if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
{
/* non batch insert */
if (map != NULL)
{
TupleTableSlot *new_slot;
new_slot = resultRelInfo->ri_PartitionTupleSlot;
myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
}
}
else
{
/*
* Prepare to queue up tuple for later batch insert into
* current partition.
*/
TupleTableSlot *batchslot;
/* no other path available for partitioned table */
Assert(insertMethod == CIM_MULTI_CONDITIONAL);
batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
if (map != NULL)
myslot = execute_attr_map_slot(map->attrMap, myslot,
batchslot);
else
{
/*
* This looks more expensive than it is (Believe me, I
* optimized it away. Twice.). The input is in virtual
* form, and we'll materialize the slot below - for most
* slot types the copy performs the work materialization
* would later require anyway.
*/
ExecCopySlot(batchslot, myslot);
myslot = batchslot;
}
}
/* ensure that triggers etc see the right relation */
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (has_before_insert_row_trig)
{
if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
skip_tuple = true; /* "do nothing" */
}
if (!skip_tuple)
{
/*
* If there is an INSTEAD OF INSERT ROW trigger, let it handle the
* tuple. Otherwise, proceed with inserting the tuple into the
* table or foreign table.
*/
if (has_instead_insert_row_trig)
{
ExecIRInsertTriggers(estate, resultRelInfo, myslot);
}
else
{
/* Compute stored generated columns */
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
CMD_INSERT);
/*
* If the target is a plain table, check the constraints of
* the tuple.
*/
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, myslot, estate);
/*
* Also check the tuple against the partition constraint, if
* there is one; except that if we got here via tuple-routing,
* we don't need to if there's no BR trigger defined on the
* partition.
*/
if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
(proute == NULL || has_before_insert_row_trig))
ExecPartitionCheck(resultRelInfo, myslot, estate, true);
/* Store the slot in the multi-insert buffer, when enabled. */
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{
/*
* The slot previously might point into the per-tuple
* context. For batching it needs to be longer lived.
*/
ExecMaterializeSlot(myslot);
/* Add this tuple to the tuple buffer */
CopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo, myslot,
cstate->line_buf.len,
cstate->cur_lineno);
/*
* If enough inserts have queued up, then flush all
* buffers out to their tables.
*/
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
}
else
{
List *recheckIndexes = NIL;
/* OK, store the tuple */
if (resultRelInfo->ri_FdwRoutine != NULL)
{
myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
resultRelInfo,
myslot,
NULL);
if (myslot == NULL) /* "do nothing" */
continue; /* next tuple please */
/*
* AFTER ROW Triggers might reference the tableoid
* column, so (re-)initialize tts_tableOid before
* evaluating them.
*/
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
else
{
/* OK, store the tuple and create index entries for it */
table_tuple_insert(resultRelInfo->ri_RelationDesc,
myslot, mycid, ti_options, bistate);
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
myslot,
estate,
false,
false,
NULL,
NIL);
}
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, myslot,
recheckIndexes, cstate->transition_capture);
list_free(recheckIndexes);
}
}
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}
}
/* Flush any remaining buffered tuples */
if (insertMethod != CIM_SINGLE)
{
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
}
/* Done, clean up */
error_context_stack = errcallback.previous;
if (bistate != NULL)
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
/* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate);
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
if (target_resultRelInfo->ri_FdwRoutine != NULL &&
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
target_resultRelInfo);
/* Tear down the multi-insert buffer data */
if (insertMethod != CIM_SINGLE)
CopyMultiInsertInfoCleanup(&multiInsertInfo);
/* Close all the partitioned tables, leaf partitions, and their indices */
if (proute)
ExecCleanupTupleRouting(mtstate, proute);
/* Close the result relations, including any trigger target relations */
ExecCloseResultRelations(estate);
ExecCloseRangeTableRelations(estate);
FreeExecutorState(estate);
return processed;
}
/*
* Setup to read tuples from a file for COPY FROM.
*
* 'rel': Used as a template for the tuples
* 'whereClause': WHERE clause from the COPY FROM command
* 'filename': Name of server-local file to read, NULL for STDIN
* 'is_program': true if 'filename' is program to execute
* 'data_source_cb': callback that provides the input data
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
*
* Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
*/
CopyFromState
BeginCopyFrom(ParseState *pstate,
Relation rel,
Node *whereClause,
const char *filename,
bool is_program,
copy_data_source_cb data_source_cb,
List *attnamelist,
List *options)
{
CopyFromState cstate;
bool pipe = (filename == NULL);
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
num_defaults;
FmgrInfo *in_functions;
Oid *typioparams;
int attnum;
Oid in_func_oid;
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE,
PROGRESS_COPY_BYTES_TOTAL
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_FROM,
0,
0
};
/* Allocate workspace and zero all fields */
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
/*
* We allocate everything used by a cstate in a new memory context. This
* avoids memory leaks during repeated use of COPY in a query.
*/
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY",
ALLOCSET_DEFAULT_SIZES);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Extract options from the statement node tree */
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
/* Process the target relation */
cstate->rel = rel;
tupDesc = RelationGetDescr(cstate->rel);
/* process commmon options or initialization */
/* Generate or convert list of attributes to process */
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
num_phys_attrs = tupDesc->natts;
/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_notnull)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->opts.force_notnull_flags[attnum - 1] = true;
}
}
/* Convert FORCE_NULL name list to per-column flags, check validity */
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->opts.force_null)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->opts.force_null_flags[attnum - 1] = true;
}
}
/* Convert convert_selectively name list to per-column flags */
if (cstate->opts.convert_selectively)
{
List *attnums;
ListCell *cur;
cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg_internal("selected column \"%s\" not referenced by COPY",
NameStr(attr->attname))));
cstate->convert_select_flags[attnum - 1] = true;
}
}
/* Use client encoding when ENCODING option is not specified. */
if (cstate->opts.file_encoding < 0)
cstate->file_encoding = pg_get_client_encoding();
else
cstate->file_encoding = cstate->opts.file_encoding;
/*
* Look up encoding conversion function.
*/
if (cstate->file_encoding == GetDatabaseEncoding() ||
cstate->file_encoding == PG_SQL_ASCII ||
GetDatabaseEncoding() == PG_SQL_ASCII)
{
cstate->need_transcoding = false;
}
else
{
cstate->need_transcoding = true;
cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
GetDatabaseEncoding());
}
cstate->copy_src = COPY_FILE; /* default */
cstate->whereClause = whereClause;
MemoryContextSwitchTo(oldcontext);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Initialize state variables */
cstate->eol_type = EOL_UNKNOWN;
cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
/*
* Allocate buffers for the input pipeline.
*
* attribute_buf and raw_buf are used in both text and binary modes, but
* input_buf and line_buf only in text mode.
*/
cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->raw_reached_eof = false;
if (!cstate->opts.binary)
{
/*
* If encoding conversion is needed, we need another buffer to hold
* the converted input data. Otherwise, we can just point input_buf
* to the same buffer as raw_buf.
*/
if (cstate->need_transcoding)
{
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
cstate->input_buf_index = cstate->input_buf_len = 0;
}
else
cstate->input_buf = cstate->raw_buf;
cstate->input_reached_eof = false;
initStringInfo(&cstate->line_buf);
}
initStringInfo(&cstate->attribute_buf);
/* Assign range table, we'll need it in CopyFrom. */
if (pstate)
cstate->range_table = pstate->p_rtable;
tupDesc = RelationGetDescr(cstate->rel);
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
volatile_defexprs = false;
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass to
* the input function), and info about defaults and constraints. (Which
* input function we use depends on text/binary format choice.)
*/
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
/* We don't need info for dropped attributes */
if (att->attisdropped)
continue;
/* Fetch the input function and typioparam info */
if (cstate->opts.binary)
getTypeBinaryInputInfo(att->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
else
getTypeInputInfo(att->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
/* Get default info if needed */
if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
{
/* attribute is NOT to be copied from input */
/* use default value if one exists */
Expr *defexpr = (Expr *) build_column_default(cstate->rel,
attnum);
if (defexpr != NULL)
{
/* Run the expression through planner */
defexpr = expression_planner(defexpr);
/* Initialize executable expression in copycontext */
defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
/*
* If a default expression looks at the table being loaded,
* then it could give the wrong answer when using
* multi-insert. Since database access can be dynamic this is
* hard to test for exactly, so we use the much wider test of
* whether the default expression is volatile. We allow for
* the special case of when the default expression is the
* nextval() of a sequence which in this specific case is
* known to be safe for use with the multi-insert
* optimization. Hence we use this special case function
* checker rather than the standard check for
* contain_volatile_functions().
*/
if (!volatile_defexprs)
volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
}
}
}
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
cstate->bytes_processed = 0;
/* We keep those variables in cstate. */
cstate->in_functions = in_functions;
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
if (data_source_cb)
{
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
cstate->copy_src = COPY_CALLBACK;
cstate->data_source_cb = data_source_cb;
}
else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
cstate->filename = pstrdup(filename);
if (cstate->is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not execute command \"%s\": %m",
cstate->filename)));
}
else
{
struct stat st;
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
{
/* copy errno because ereport subfunctions might change it */
int save_errno = errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
cstate->filename),
(save_errno == ENOENT || save_errno == EACCES) ?
errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
"You may want a client-side facility such as psql's \\copy.") : 0));
}
if (fstat(fileno(cstate->copy_file), &st))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
cstate->filename)));
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
progress_vals[2] = st.st_size;
}
}
pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
if (cstate->opts.binary)
{
/* Read and verify binary header */
ReceiveCopyBinaryHeader(cstate);
}
/* create workspace for CopyReadAttributes results */
if (!cstate->opts.binary)
{
AttrNumber attr_count = list_length(cstate->attnumlist);
cstate->max_fields = attr_count;
cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
}
MemoryContextSwitchTo(oldcontext);
return cstate;
}
/*
* Clean up storage and release resources for COPY FROM.
*/
void
EndCopyFrom(CopyFromState cstate)
{
/* No COPY FROM related resources except memory. */
if (cstate->is_program)
{
ClosePipeFromProgram(cstate);
}
else
{
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
}
pgstat_progress_end_command();
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
}
/*
* Closes the pipe from an external program, checking the pclose() return code.
*/
static void
ClosePipeFromProgram(CopyFromState cstate)
{
int pclose_rc;
Assert(cstate->is_program);
pclose_rc = ClosePipeStream(cstate->copy_file);
if (pclose_rc == -1)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close pipe to external command: %m")));
else if (pclose_rc != 0)
{
/*
* If we ended a COPY FROM PROGRAM before reaching EOF, then it's
* expectable for the called program to fail with SIGPIPE, and we
* should not report that as an error. Otherwise, SIGPIPE indicates a
* problem.
*/
if (!cstate->raw_reached_eof &&
wait_result_is_signal(pclose_rc, SIGPIPE))
return;
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("program \"%s\" failed",
cstate->filename),
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
}
}