diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 105da98c9b..200e93a9e9 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -589,7 +589,9 @@ logicalrep_partmap_init(void) * logicalrep_partition_open * * Returned entry reuses most of the values of the root table's entry, save - * the attribute map, which can be different for the partition. + * the attribute map, which can be different for the partition. However, + * we must physically copy all the data, in case the root table's entry + * gets freed/rebuilt. * * Note there's no logicalrep_partition_close, because the caller closes the * the component relation. @@ -625,7 +627,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, part_entry->partoid = partOid; - /* Remote relation is used as-is from the root entry. */ + /* Remote relation is copied as-is from the root entry. */ entry = &part_entry->relmapentry; entry->remoterel.remoteid = remoterel->remoteid; entry->remoterel.nspname = pstrdup(remoterel->nspname); @@ -668,7 +670,12 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, } } else - entry->attrmap = attrmap; + { + /* Lacking copy_attmap, do this the hard way. */ + entry->attrmap = make_attrmap(attrmap->maplen); + memcpy(entry->attrmap->attnums, attrmap->attnums, + attrmap->maplen * sizeof(AttrNumber)); + } entry->updatable = root->updatable; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eb013c4608..da748668ec 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -898,12 +898,13 @@ apply_handle_update_internal(ResultRelInfo *relinfo, else { /* - * The tuple to be updated could not be found. + * The tuple to be updated could not be found. Do nothing except for + * emitting a log message. * - * TODO what to do here, change the log level to LOG perhaps? + * XXX should this be promoted to ereport(LOG) perhaps? */ elog(DEBUG1, - "logical replication did not find row for update " + "logical replication did not find row to be updated " "in replication target relation \"%s\"", RelationGetRelationName(localrel)); } @@ -1001,9 +1002,14 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, } else { - /* The tuple to be deleted could not be found. */ + /* + * The tuple to be deleted could not be found. Do nothing except for + * emitting a log message. + * + * XXX should this be promoted to ereport(LOG) perhaps? + */ elog(DEBUG1, - "logical replication could not find row for delete " + "logical replication did not find row to be deleted " "in replication target relation \"%s\"", RelationGetRelationName(localrel)); } @@ -1145,30 +1151,31 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, found = FindReplTupleInLocalRel(estate, partrel, &part_entry->remoterel, remoteslot_part, &localslot); - - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - if (found) - { - /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, - part_entry, - newtup->values, newtup->changed); - MemoryContextSwitchTo(oldctx); - } - else + if (!found) { /* - * The tuple to be updated could not be found. + * The tuple to be updated could not be found. Do nothing + * except for emitting a log message. * - * TODO what to do here, change the log level to LOG - * perhaps? + * XXX should this be promoted to ereport(LOG) perhaps? */ elog(DEBUG1, - "logical replication did not find row for update " - "in replication target relation \"%s\"", + "logical replication did not find row to be updated " + "in replication target relation's partition \"%s\"", RelationGetRelationName(partrel)); + return; } + /* + * Apply the update to the local tuple, putting the result in + * remoteslot_part. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(remoteslot_part, localslot, + part_entry, + newtup->values, newtup->changed); + MemoryContextSwitchTo(oldctx); + /* * Does the updated tuple still satisfy the current * partition's constraint? diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d9e45bab4a..5b33b31515 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -452,8 +452,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Convert tuples if needed. */ if (relentry->map) { - oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); - newtuple = execute_attr_map_tuple(newtuple, relentry->map); + if (oldtuple) + oldtuple = execute_attr_map_tuple(oldtuple, + relentry->map); + newtuple = execute_attr_map_tuple(newtuple, + relentry->map); } } diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 3d90f81e7e..33a773843f 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 24; +use Test::More tests => 27; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -115,7 +115,7 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"); $node_publisher->safe_psql('postgres', - "INSERT INTO tab_full_pk VALUES (1, 'foo')"); + "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_nothing VALUES (generate_series(1,20))"); @@ -197,6 +197,38 @@ is( $result, qq(local|1.1|baz|1 local|2.2|bar|2), 'update works with different column order and subscriber local values'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_full_pk ORDER BY a"); +is( $result, qq(1|bar +2|baz), + 'update works with REPLICA IDENTITY FULL and a primary key'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +my $logfile = slurp_file($node_subscriber->logfile()); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + 'update target row is missing'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + 'delete target row is missing'); + +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber->reload; + # check behavior with toasted values $node_publisher->safe_psql('postgres', diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 046e18c700..52b1f3a003 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 56; +use Test::More tests => 62; # setup @@ -344,6 +344,46 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab1"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET b = 'quux' WHERE a = 4"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $logfile = slurp_file($node_subscriber1->logfile()); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + 'update target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + 'delete target row is missing in tab1_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + 'delete target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + 'delete target row is missing in tab1_def'); + +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + # Tests for replication using root table identity and schema # publisher @@ -647,3 +687,32 @@ is( $result, qq(pub_tab2|1|xxx pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET b = 'quux' WHERE a = 5"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$logfile = slurp_file($node_subscriber1->logfile()); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + 'update target row is missing in tab2_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + 'delete target row is missing in tab2_1'); + +# No need for this until more tests are added. +# $node_subscriber1->append_conf('postgresql.conf', +# "log_min_messages = warning"); +# $node_subscriber1->reload;