diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fafde03921..1dd9207ef0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -363,13 +363,19 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } /* - * Modify slot with user data provided as C strings. + * Replace selected columns with user data provided as C strings. * This is somewhat similar to heap_modify_tuple but also calls the type - * input function on the user data as the input is the text representation - * of the types. + * input functions on the user data. + * "slot" is filled with a copy of the tuple in "srcslot", with + * columns selected by the "replaces" array replaced with data values + * from "values". + * Caution: unreplaced pass-by-ref columns in "slot" will point into the + * storage for "srcslot". This is OK for current usage, but someday we may + * need to materialize "slot" at the end to make it independent of "srcslot". */ static void -slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, +slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, char **values, bool *replaces) { int natts = slot->tts_tupleDescriptor->natts; @@ -377,10 +383,19 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, SlotErrCallbackArg errarg; ErrorContextCallback errcallback; - slot_getallattrs(slot); + /* We'll fill "slot" with a virtual tuple, so we must start with ... */ ExecClearTuple(slot); - /* Push callback + info on the error context stack */ + /* + * Copy all the column data from srcslot, so that we'll have valid values + * for unreplaced columns. + */ + Assert(natts == srcslot->tts_tupleDescriptor->natts); + slot_getallattrs(srcslot); + memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum)); + memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); + + /* For error reporting, push callback + info on the error context stack */ errarg.rel = rel; errarg.local_attnum = -1; errarg.remote_attnum = -1; @@ -428,6 +443,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, /* Pop the error context stack */ error_context_stack = errcallback.previous; + /* And finally, declare that "slot" contains a valid virtual tuple */ ExecStoreVirtualTuple(slot); } @@ -740,8 +756,8 @@ apply_handle_update(StringInfo s) { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - ExecCopySlot(remoteslot, localslot); - slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed); + slot_modify_cstrings(remoteslot, localslot, rel, + newtup.values, newtup.changed); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 40e306a810..f2be0e21ea 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 17; +use Test::More tests => 20; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -28,9 +28,9 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary key)"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab_mixed (a int primary key, b text)"); + "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)"); $node_publisher->safe_psql('postgres', - "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')"); + "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" ); @@ -45,7 +45,8 @@ $node_subscriber->safe_psql('postgres', # different column count and order than on publisher $node_subscriber->safe_psql('postgres', - "CREATE TABLE tab_mixed (c text, b text, a int primary key)"); + "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)" +); # replication of the table with included index $node_subscriber->safe_psql('postgres', @@ -94,7 +95,7 @@ $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); $node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); $node_publisher->safe_psql('postgres', - "INSERT INTO tab_mixed VALUES (2, 'bar')"); + "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_include SELECT generate_series(1,50)"); @@ -112,10 +113,9 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); -$result = - $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed"); -is( $result, qq(|foo|1 -|bar|2), 'check replicated changes with different column order'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed"); +is( $result, qq(local|1.1|foo|1 +local|2.2|bar|2), 'check replicated changes with different column order'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_include"); @@ -139,11 +139,14 @@ $node_publisher->safe_psql('postgres', "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); $node_subscriber->safe_psql('postgres', "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +# tab_mixed can use DEFAULT, since it has a primary key # and do the updates $node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); $node_publisher->safe_psql('postgres', "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'baz' WHERE a = 1"); $node_publisher->wait_for_catchup('tap_sub'); @@ -159,6 +162,40 @@ bb bb), 'update works with REPLICA IDENTITY FULL and text datums'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|1.1|baz|1 +local|2.2|bar|2), + 'update works with different column order and subscriber local values'); + +# check behavior with dropped columns + +$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 11.11 WHERE a = 1"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|11.11|baz|1 +local|2.2|bar|2), + 'update works with dropped publisher column'); + +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_mixed DROP COLUMN d"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 22.22 WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(11.11|baz|1 +22.22|bar|2), + 'update works with dropped subscriber column'); + # check that change of connection string and/or publication list causes # restart of subscription workers. Not all of these are registered as tests # as we need to poll for a change but the test suite will fail none the less