diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2bd61d9d20..32437202fe 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -27,6 +27,7 @@ #include "pgstat.h" #include "funcapi.h" +#include "access/sysattr.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -706,6 +707,8 @@ apply_handle_update(StringInfo s) bool has_oldtup; TupleTableSlot *localslot; TupleTableSlot *remoteslot; + RangeTblEntry *target_rte; + int i; bool found; MemoryContext oldctx; @@ -735,6 +738,21 @@ apply_handle_update(StringInfo s) RelationGetDescr(rel->localrel)); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + /* + * Populate updatedCols so that per-column triggers can fire. This could + * include more columns than were actually changed on the publisher + * because the logical replication protocol doesn't contain that + * information. But it would for example exclude columns that only exist + * on the subscriber, since we are not touching those. + */ + target_rte = list_nth(estate->es_range_table, 0); + for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) + { + if (newtup.changed[i]) + target_rte->updatedCols = bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } + PushActiveSnapshot(GetTransactionSnapshot()); ExecOpenIndices(estate->es_result_relation_info, false); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index a5b548ecee..62123792e6 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 4; +use Test::More tests => 6; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -82,6 +82,8 @@ BEGIN ELSE RETURN NULL; END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; ELSE RAISE WARNING 'Unknown action'; RETURN NULL; @@ -89,7 +91,7 @@ BEGIN END; \$\$ LANGUAGE plpgsql; CREATE TRIGGER filter_basic_dml_trg - BEFORE INSERT ON tab_fk_ref + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; }); @@ -100,10 +102,32 @@ $node_publisher->safe_psql('postgres', $node_publisher->wait_for_catchup($appname); -# The row should be skipped on subscriber +# The trigger should cause the insert to be skipped on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); -is($result, qq(2|1|2), 'check replica trigger applied on subscriber'); +is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber'); + +# Update data +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"); + +$node_publisher->wait_for_catchup($appname); + +# The trigger should cause the update to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber'); + +# Update on a column not specified in the trigger, but it will trigger +# anyway because logical replication ships all columns in an update. +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check column trigger applied on even for other column'); $node_subscriber->stop('fast'); $node_publisher->stop('fast');