diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 2e5a01bd73..438be44afc 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,7 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared regresscheck: all | submake-regress submake-test_decoding $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out new file mode 100644 index 0000000000..2671258f5d --- /dev/null +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -0,0 +1,84 @@ +-- test that we can insert the result of a get_changes call into a +-- logged relation. That's really not a good idea in practical terms, +-- but provides a nice test. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- create some changes +CREATE TABLE somechange(id serial primary key); +INSERT INTO somechange DEFAULT VALUES; +CREATE TABLE changeresult AS + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT * FROM changeresult; + data +------------------------------------------------ + BEGIN + table public.somechange: INSERT: id[integer]:1 + COMMIT +(3 rows) + +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT * FROM changeresult; + data +-------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.somechange: INSERT: id[integer]:1 + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT +(20 rows) + +DROP TABLE changeresult; +DROP TABLE somechange; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT +(14 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql new file mode 100644 index 0000000000..3704821bcc --- /dev/null +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -0,0 +1,27 @@ +-- test that we can insert the result of a get_changes call into a +-- logged relation. That's really not a good idea in practical terms, +-- but provides a nice test. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- create some changes +CREATE TABLE somechange(id serial primary key); +INSERT INTO somechange DEFAULT VALUES; + +CREATE TABLE changeresult AS + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT * FROM changeresult; + +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO changeresult + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT * FROM changeresult; +DROP TABLE changeresult; +DROP TABLE somechange; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ece1bc8064..7d8f40738d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1264,8 +1264,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, volatile CommandId command_id = FirstCommandId; volatile Snapshot snapshot_now = NULL; - volatile bool txn_started = false; - volatile bool subtxn_started = false; + volatile bool using_subtxn = false; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1305,7 +1304,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { - txn_started = false; /* * Decoding needs access to syscaches et al., which in turn use @@ -1317,16 +1315,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * When we're called via the SQL SRF there's already a transaction * started, so start an explicit subtransaction there. */ - if (IsTransactionOrTransactionBlock()) - { + using_subtxn = IsTransactionOrTransactionBlock(); + + if (using_subtxn) BeginInternalSubTransaction("replay"); - subtxn_started = true; - } else - { StartTransactionCommand(); - txn_started = true; - } rb->begin(rb, txn); @@ -1489,22 +1483,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); - /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); - /* cleanup */ TeardownHistoricSnapshot(false); /* - * Abort subtransaction or the transaction as a whole has the right + * Aborting the current (sub-)transaction as a whole has the right * semantics. We want all locks acquired in here to be released, not * reassigned to the parent and we do not want any database access * have persistent effects. */ - if (subtxn_started) + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + + if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - else if (txn_started) - AbortCurrentTransaction(); if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); @@ -1520,20 +1514,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, TeardownHistoricSnapshot(true); + /* + * Force cache invalidation to happen outside of a valid transaction + * to prevent catalog access as we just caught an error. + */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); - if (subtxn_started) - RollbackAndReleaseCurrentSubTransaction(); - else if (txn_started) - AbortCurrentTransaction(); - - /* - * Invalidations in an aborted transactions aren't allowed to do - * catalog access, so we don't need to still have the snapshot setup. - */ - ReorderBufferExecuteInvalidations(rb, txn); - /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); @@ -1645,20 +1640,24 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) { - /* setup snapshot to perform the invalidations in */ - SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash); - PG_TRY(); - { - ReorderBufferExecuteInvalidations(rb, txn); - TeardownHistoricSnapshot(false); - } - PG_CATCH(); - { - /* cleanup */ - TeardownHistoricSnapshot(true); - PG_RE_THROW(); - } - PG_END_TRY(); + bool use_subtxn = IsTransactionOrTransactionBlock(); + + if (use_subtxn) + BeginInternalSubTransaction("replay"); + + /* + * Force invalidations to happen outside of a valid transaction - that + * way entries will just be marked as invalid without accessing the + * catalog. That's advantageous because we don't need to setup the + * full state necessary for catalog access. + */ + if (use_subtxn) + AbortCurrentTransaction(); + + ReorderBufferExecuteInvalidations(rb, txn); + + if (use_subtxn) + RollbackAndReleaseCurrentSubTransaction(); } else Assert(txn->ninvalidations == 0);