diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 40432fe640..3ea80253d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -114,6 +114,7 @@ static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = - AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn); + AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, + need_full_snapshot); ctx->reorder->private_data = ctx; @@ -297,7 +299,8 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + need_full_snapshot, read_page, prepare_write, + do_write); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -386,7 +389,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, } ctx = StartupDecodingContext(output_plugin_options, - start_lsn, InvalidTransactionId, + start_lsn, InvalidTransactionId, false, read_page, prepare_write, do_write); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 727ef77853..9f0796a0ec 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -164,6 +164,9 @@ struct SnapBuild */ TransactionId initial_xmin_horizon; + /* Indicates if we are building full snapshot or just catalog one .*/ + bool building_full_snapshot; + /* * Snapshot that's valid to see the catalog state seen at this moment. */ @@ -280,7 +283,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, - XLogRecPtr start_lsn) + XLogRecPtr start_lsn, + bool need_full_snapshot) { MemoryContext context; MemoryContext oldcontext; @@ -307,6 +311,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; + builder->building_full_snapshot = need_full_snapshot; MemoryContextSwitchTo(oldcontext); @@ -1227,7 +1232,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) and c). + * state we were waiting for b) or c). * * b) Wait for all toplevel transactions that were running to end. We * simply track the number of in-progress toplevel transactions and @@ -1242,7 +1247,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * at all. * * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. + * snapshot to disk that we can use. Can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use, as that snapshot will only contain catalog + * modifying transactions. * --- */ @@ -1297,8 +1305,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state */ - else if (SnapBuildRestore(builder, lsn)) + /* c) valid on disk state and not building full snapshot */ + else if (!builder->building_full_snapshot && + SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index df229a895c..1df4f37634 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -54,7 +54,8 @@ struct xl_running_xacts; extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, - TransactionId xmin_horizon, XLogRecPtr start_lsn); + TransactionId xmin_horizon, XLogRecPtr start_lsn, + bool need_full_snapshot); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap);