diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cc9e39c4a5..9746998751 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8608,6 +8608,39 @@ select tableoid::regclass, * FROM remp1;
remp1 | 1 | bar
(2 rows)
+delete from ctrtest;
+-- Test copy tuple routing with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+copy ctrtest from stdin;
+select tableoid::regclass, * FROM ctrtest;
+ tableoid | a | b
+----------+---+-------
+ remp1 | 1 | foo
+ remp1 | 1 | bar
+ remp1 | 1 | test1
+ remp2 | 2 | baz
+ remp2 | 2 | qux
+ remp2 | 2 | test2
+(6 rows)
+
+select tableoid::regclass, * FROM remp1;
+ tableoid | a | b
+----------+---+-------
+ remp1 | 1 | foo
+ remp1 | 1 | bar
+ remp1 | 1 | test1
+(3 rows)
+
+select tableoid::regclass, * FROM remp2;
+ tableoid | b | a
+----------+-------+---
+ remp2 | baz | 2
+ remp2 | qux | 2
+ remp2 | test2 | 2
+(3 rows)
+
+delete from ctrtest;
+alter server loopback options (drop batch_size);
drop table ctrtest;
drop table loct1;
drop table loct2;
@@ -8771,6 +8804,78 @@ select * from rem3;
drop foreign table rem3;
drop table loc3;
+-- Test COPY FROM with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+-- Test basic functionality
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+ 3 | baz
+(3 rows)
+
+delete from rem2;
+-- Test check constraints
+alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
+alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
+-- check constraint is enforced on the remote side, not locally
+copy rem2 from stdin;
+copy rem2 from stdin; -- ERROR
+ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
+DETAIL: Failing row contains (-1, xyzzy).
+CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
+COPY rem2
+select * from rem2;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+ 3 | baz
+(3 rows)
+
+alter foreign table rem2 drop constraint rem2_f1positive;
+alter table loc2 drop constraint loc2_f1positive;
+delete from rem2;
+-- Test remote triggers
+create trigger trig_row_before_insert before insert on loc2
+ for each row execute procedure trig_row_before_insupdate();
+-- The new values are concatenated with ' triggered !'
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2
+----+-----------------
+ 1 | foo triggered !
+ 2 | bar triggered !
+ 3 | baz triggered !
+(3 rows)
+
+drop trigger trig_row_before_insert on loc2;
+delete from rem2;
+create trigger trig_null before insert on loc2
+ for each row execute procedure trig_null();
+-- Nothing happens
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2
+----+----
+(0 rows)
+
+drop trigger trig_null on loc2;
+delete from rem2;
+-- Check with zero-column foreign table; batch insert will be disabled
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(3 rows)
+
+delete from rem2;
+alter server loopback options (drop batch_size);
-- ===================================================================
-- test for TRUNCATE
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8d013f5b1a..d98709e5e8 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2057,6 +2057,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
return 1;
+ /*
+ * If the foreign table has no columns, disable batching as the INSERT
+ * syntax doesn't allow batching multiple empty rows into a zero-column
+ * table in a single statement. This is needed for COPY FROM, in which
+ * case fmstate must be non-NULL.
+ */
+ if (fmstate && list_length(fmstate->target_attrs) == 0)
+ return 1;
+
/*
* Otherwise use the batch size specified for server/table. The number of
* parameters in a batch is limited to 65535 (uint16), so make sure we
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e48ccd286b..1962051e54 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2373,6 +2373,28 @@ copy remp1 from stdin;
select tableoid::regclass, * FROM remp1;
+delete from ctrtest;
+
+-- Test copy tuple routing with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+
+copy ctrtest from stdin;
+1 foo
+1 bar
+2 baz
+2 qux
+1 test1
+2 test2
+\.
+
+select tableoid::regclass, * FROM ctrtest;
+select tableoid::regclass, * FROM remp1;
+select tableoid::regclass, * FROM remp2;
+
+delete from ctrtest;
+
+alter server loopback options (drop batch_size);
+
drop table ctrtest;
drop table loct1;
drop table loct2;
@@ -2527,6 +2549,86 @@ select * from rem3;
drop foreign table rem3;
drop table loc3;
+-- Test COPY FROM with the batch_size option enabled
+alter server loopback options (add batch_size '2');
+
+-- Test basic functionality
+copy rem2 from stdin;
+1 foo
+2 bar
+3 baz
+\.
+select * from rem2;
+
+delete from rem2;
+
+-- Test check constraints
+alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
+alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
+
+-- check constraint is enforced on the remote side, not locally
+copy rem2 from stdin;
+1 foo
+2 bar
+3 baz
+\.
+copy rem2 from stdin; -- ERROR
+-1 xyzzy
+\.
+select * from rem2;
+
+alter foreign table rem2 drop constraint rem2_f1positive;
+alter table loc2 drop constraint loc2_f1positive;
+
+delete from rem2;
+
+-- Test remote triggers
+create trigger trig_row_before_insert before insert on loc2
+ for each row execute procedure trig_row_before_insupdate();
+
+-- The new values are concatenated with ' triggered !'
+copy rem2 from stdin;
+1 foo
+2 bar
+3 baz
+\.
+select * from rem2;
+
+drop trigger trig_row_before_insert on loc2;
+
+delete from rem2;
+
+create trigger trig_null before insert on loc2
+ for each row execute procedure trig_null();
+
+-- Nothing happens
+copy rem2 from stdin;
+1 foo
+2 bar
+3 baz
+\.
+select * from rem2;
+
+drop trigger trig_null on loc2;
+
+delete from rem2;
+
+-- Check with zero-column foreign table; batch insert will be disabled
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+
+\.
+select * from rem2;
+
+delete from rem2;
+
+alter server loopback options (drop batch_size);
+
-- ===================================================================
-- test for TRUNCATE
-- ===================================================================
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index d0b5951019..94263c628f 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -665,7 +665,9 @@ ExecForeignBatchInsert(EState *estate,
Note that this function is also called when inserting routed tuples into
- a foreign-table partition. See the callback functions
+ a foreign-table partition or executing COPY FROM on
+ a foreign table, in which case it is called in a different way than it
+ is in the INSERT case. See the callback functions
described below that allow the FDW to support that.
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index bfd344cdc0..527f4deaaa 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -398,6 +398,10 @@ OPTIONS (ADD password_required 'false');
exceeds the limit, the batch_size will be adjusted to
avoid an error.
+
+
+ This option also applies when copying into foreign tables.
+
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 175aa837f2..a079c70152 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -78,7 +78,8 @@ typedef struct CopyMultiInsertBuffer
{
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
- BulkInsertState bistate; /* BulkInsertState for this rel */
+ BulkInsertState bistate; /* BulkInsertState for this rel if plain
+ * table; NULL if foreign table */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
@@ -116,6 +117,12 @@ CopyFromErrorCallback(void *arg)
{
CopyFromState cstate = (CopyFromState) arg;
+ if (cstate->relname_only)
+ {
+ errcontext("COPY %s",
+ cstate->cur_relname);
+ return;
+ }
if (cstate->opts.binary)
{
/* can't usefully display the data */
@@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
- buffer->bistate = GetBulkInsertState();
+ buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
@@ -299,83 +306,171 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
*/
static inline void
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
- CopyMultiInsertBuffer *buffer)
+ CopyMultiInsertBuffer *buffer,
+ int64 *processed)
{
- MemoryContext oldcontext;
- int i;
- uint64 save_cur_lineno;
CopyFromState cstate = miinfo->cstate;
EState *estate = miinfo->estate;
- CommandId mycid = miinfo->mycid;
- int ti_options = miinfo->ti_options;
- bool line_buf_valid = cstate->line_buf_valid;
int nused = buffer->nused;
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
TupleTableSlot **slots = buffer->slots;
+ int i;
- /*
- * Print error context information correctly, if one of the operations
- * below fails.
- */
- cstate->line_buf_valid = false;
- save_cur_lineno = cstate->cur_lineno;
-
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
-
- for (i = 0; i < nused; i++)
+ if (resultRelInfo->ri_FdwRoutine)
{
- /*
- * If there are any indexes, update them for all the inserted tuples,
- * and run AFTER ROW INSERT triggers.
- */
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
+ int batch_size = resultRelInfo->ri_BatchSize;
+ int sent = 0;
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(resultRelInfo,
- buffer->slots[i], estate, false, false,
- NULL, NIL);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ Assert(buffer->bistate == NULL);
+
+ /* Ensure that the FDW supports batching and it's enabled */
+ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
+ Assert(batch_size > 1);
/*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
+ * We suppress error context information other than the relation name,
+ * if one of the operations below fails.
*/
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ Assert(!cstate->relname_only);
+ cstate->relname_only = true;
+
+ while (sent < nused)
{
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL, cstate->transition_capture);
+ int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
+ int inserted = size;
+ TupleTableSlot **rslots;
+
+ /* insert into foreign table: let the FDW do it */
+ rslots =
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+ resultRelInfo,
+ &slots[sent],
+ NULL,
+ &inserted);
+
+ sent += size;
+
+ /* No need to do anything if there are no inserted rows */
+ if (inserted <= 0)
+ continue;
+
+ /* Triggers on foreign tables should not have transition tables */
+ Assert(resultRelInfo->ri_TrigDesc == NULL ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
+
+ /* Run AFTER ROW INSERT triggers */
+ if (resultRelInfo->ri_TrigDesc != NULL &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ {
+ Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ for (i = 0; i < inserted; i++)
+ {
+ TupleTableSlot *slot = rslots[i];
+
+ /*
+ * AFTER ROW Triggers might reference the tableoid column,
+ * so (re-)initialize tts_tableOid before evaluating them.
+ */
+ slot->tts_tableOid = relid;
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ cstate->transition_capture);
+ }
+ }
+
+ /* Update the row counter and progress of the COPY command */
+ *processed += inserted;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *processed);
}
- ExecClearTuple(slots[i]);
+ for (i = 0; i < nused; i++)
+ ExecClearTuple(slots[i]);
+
+ /* reset relname_only */
+ cstate->relname_only = false;
+ }
+ else
+ {
+ CommandId mycid = miinfo->mycid;
+ int ti_options = miinfo->ti_options;
+ bool line_buf_valid = cstate->line_buf_valid;
+ uint64 save_cur_lineno = cstate->cur_lineno;
+ MemoryContext oldcontext;
+
+ Assert(buffer->bistate != NULL);
+
+ /*
+ * Print error context information correctly, if one of the operations
+ * below fails.
+ */
+ cstate->line_buf_valid = false;
+
+ /*
+ * table_multi_insert may leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ table_multi_insert(resultRelInfo->ri_RelationDesc,
+ slots,
+ nused,
+ mycid,
+ ti_options,
+ buffer->bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < nused; i++)
+ {
+ /*
+ * If there are any indexes, update them for all the inserted
+ * tuples, and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[i];
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ buffer->slots[i], estate, false,
+ false, NULL, NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], recheckIndexes,
+ cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT
+ * triggers anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[i];
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], NIL,
+ cstate->transition_capture);
+ }
+
+ ExecClearTuple(slots[i]);
+ }
+
+ /* Update the row counter and progress of the COPY command */
+ *processed += nused;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *processed);
+
+ /* reset cur_lineno and line_buf_valid to what they were */
+ cstate->line_buf_valid = line_buf_valid;
+ cstate->cur_lineno = save_cur_lineno;
}
/* Mark that all slots are free */
buffer->nused = 0;
-
- /* reset cur_lineno and line_buf_valid to what they were */
- cstate->line_buf_valid = line_buf_valid;
- cstate->cur_lineno = save_cur_lineno;
}
/*
@@ -387,22 +482,30 @@ static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
+ ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
int i;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
/* Remove back-link to ourself */
- buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+ resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
- FreeBulkInsertState(buffer->bistate);
+ if (resultRelInfo->ri_FdwRoutine == NULL)
+ {
+ Assert(buffer->bistate != NULL);
+ FreeBulkInsertState(buffer->bistate);
+ }
+ else
+ Assert(buffer->bistate == NULL);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
- table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
+ if (resultRelInfo->ri_FdwRoutine == NULL)
+ table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+ miinfo->ti_options);
pfree(buffer);
}
@@ -418,7 +521,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
* 'curr_rri'.
*/
static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
+ int64 *processed)
{
ListCell *lc;
@@ -426,7 +530,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
{
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
- CopyMultiInsertBufferFlush(miinfo, buffer);
+ CopyMultiInsertBufferFlush(miinfo, buffer, processed);
}
miinfo->bufferedTuples = 0;
@@ -679,6 +783,23 @@ CopyFrom(CopyFromState cstate)
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
+ /*
+ * Also, if the named relation is a foreign table, determine if the FDW
+ * supports batch insert and determine the batch size (a FDW may support
+ * batching, but it may be disabled for the server/table).
+ *
+ * If the FDW does not support batching, we set the batch size to 1.
+ */
+ if (resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+ resultRelInfo->ri_BatchSize =
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+ else
+ resultRelInfo->ri_BatchSize = 1;
+
+ Assert(resultRelInfo->ri_BatchSize >= 1);
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -708,10 +829,11 @@ CopyFrom(CopyFromState cstate)
/*
* It's generally more efficient to prepare a bunch of tuples for
- * insertion, and insert them in one table_multi_insert() call, than call
- * table_tuple_insert() separately for every tuple. However, there are a
- * number of reasons why we might not be able to do this. These are
- * explained below.
+ * insertion, and insert them in one
+ * table_multi_insert()/ExecForeignBatchInsert() call, than call
+ * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
+ * However, there are a number of reasons why we might not be able to do
+ * this. These are explained below.
*/
if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -725,6 +847,15 @@ CopyFrom(CopyFromState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_BatchSize == 1)
+ {
+ /*
+ * Can't support multi-inserts to a foreign table if the FDW does not
+ * support batching, or it's disabled for the server or foreign table.
+ */
+ insertMethod = CIM_SINGLE;
+ }
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
{
@@ -737,14 +868,12 @@ CopyFrom(CopyFromState cstate)
*/
insertMethod = CIM_SINGLE;
}
- else if (resultRelInfo->ri_FdwRoutine != NULL ||
- cstate->volatile_defexprs)
+ else if (cstate->volatile_defexprs)
{
/*
- * Can't support multi-inserts to foreign tables or if there are any
- * volatile default expressions in the table. Similarly to the
- * trigger case above, such expressions may query the table we're
- * inserting into.
+ * Can't support multi-inserts if there are any volatile default
+ * expressions in the table. Similarly to the trigger case above,
+ * such expressions may query the table we're inserting into.
*
* Note: It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
@@ -767,13 +896,14 @@ CopyFrom(CopyFromState cstate)
* For partitioned tables, we may still be able to perform bulk
* inserts. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts
- * if the partition is a foreign table or it has any before row insert
- * or insert instead triggers (same as we checked above for the parent
- * table). Since the partition's resultRelInfos are initialized only
- * when we actually need to insert the first tuple into them, we must
- * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
- * flag that we must later determine if we can use bulk-inserts for
- * the partition being inserted into.
+ * if the partition is a foreign table that can't use batching or it
+ * has any before row insert or insert instead triggers (same as we
+ * checked above for the parent table). Since the partition's
+ * resultRelInfos are initialized only when we actually need to insert
+ * the first tuple into them, we must have the intermediate insert
+ * method of CIM_MULTI_CONDITIONAL to flag that we must later
+ * determine if we can use bulk-inserts for the partition being
+ * inserted into.
*/
if (proute)
insertMethod = CIM_MULTI_CONDITIONAL;
@@ -910,12 +1040,14 @@ CopyFrom(CopyFromState cstate)
/*
* Disable multi-inserts when the partition has BEFORE/INSTEAD
- * OF triggers, or if the partition is a foreign partition.
+ * OF triggers, or if the partition is a foreign table that
+ * can't use batching.
*/
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
- resultRelInfo->ri_FdwRoutine == NULL;
+ (resultRelInfo->ri_FdwRoutine == NULL ||
+ resultRelInfo->ri_BatchSize > 1);
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
@@ -931,7 +1063,9 @@ CopyFrom(CopyFromState cstate)
* Flush pending inserts if this partition can't use
* batching, so rows are visible to triggers etc.
*/
- CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+ CopyMultiInsertInfoFlush(&multiInsertInfo,
+ resultRelInfo,
+ &processed);
}
if (bistate != NULL)
@@ -1067,7 +1201,17 @@ CopyFrom(CopyFromState cstate)
* buffers out to their tables.
*/
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
- CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+ CopyMultiInsertInfoFlush(&multiInsertInfo,
+ resultRelInfo,
+ &processed);
+
+ /*
+ * We delay updating the row counter and progress of the
+ * COPY command until after writing the tuples stored in
+ * the buffer out to the table, as in single insert mode.
+ * See CopyMultiInsertBufferFlush().
+ */
+ continue; /* next tuple please */
}
else
{
@@ -1130,7 +1274,7 @@ CopyFrom(CopyFromState cstate)
if (insertMethod != CIM_SINGLE)
{
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
- CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+ CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
}
/* Done, clean up */
@@ -1348,6 +1492,7 @@ BeginCopyFrom(ParseState *pstate,
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
+ cstate->relname_only = false;
/*
* Allocate buffers for the input pipeline.
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37c6032ae..8d9cc5accd 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -40,13 +40,16 @@ typedef enum EolType
} EolType;
/*
- * Represents the heap insert method to be used during COPY FROM.
+ * Represents the insert method to be used during COPY FROM.
*/
typedef enum CopyInsertMethod
{
- CIM_SINGLE, /* use table_tuple_insert or fdw routine */
- CIM_MULTI, /* always use table_multi_insert */
- CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
+ CIM_SINGLE, /* use table_tuple_insert or
+ * ExecForeignInsert */
+ CIM_MULTI, /* always use table_multi_insert or
+ * ExecForeignBatchInsert */
+ CIM_MULTI_CONDITIONAL /* use table_multi_insert or
+ * ExecForeignBatchInsert only if valid */
} CopyInsertMethod;
/*
@@ -81,6 +84,7 @@ typedef struct CopyFromStateData
uint64 cur_lineno; /* line number for error messages */
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
+ bool relname_only; /* don't output line number, att, etc. */
/*
* Working state