From eb89cb43a0d0e401e71b8e2345b5f5bc8b2755a1 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 1 Jun 2021 14:14:02 +0530 Subject: [PATCH] pgoutput: Fix memory leak due to RelationSyncEntry.map. Release memory allocated when creating the tuple-conversion map and its component TupleDescs when its owning sync entry is invalidated. TupleDescs must also be freed when no map is deemed necessary, to begin with. Reported-by: Andres Freund Author: Amit Langote Reviewed-by: Takamichi Osumi, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/MEYP282MB166933B1AB02B4FE56E82453B64D9@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM --- src/backend/replication/pgoutput/pgoutput.c | 46 +++++++++++++++++---- src/test/subscription/t/013_partition.pl | 28 ++++++++++++- 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f68348dcf4..fe12d08a94 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -74,7 +74,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, /* * Entry in the map used to remember which relation schemas we sent. * - * The schema_sent flag determines if the current schema record was already + * The schema_sent flag determines if the current schema record for the + * relation (and for its ancestor if publish_as_relid is set) was already * sent to the subscriber (in which case we don't need to send it again). * * The schema cache on downstream is however updated only at commit time, @@ -92,10 +93,6 @@ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - /* - * Did we send the schema? If ancestor relid is set, its schema must also - * have been sent for this to be true. - */ bool schema_sent; List *streamed_txns; /* streamed toplevel transactions with this * schema */ @@ -437,10 +434,17 @@ maybe_send_schema(LogicalDecodingContext *ctx, else schema_sent = relentry->schema_sent; + /* Nothing to do if we already sent the schema. */ if (schema_sent) return; - /* If needed, send the ancestor's schema first. */ + /* + * Nope, so send the schema. If the changes will be published using an + * ancestor's schema, not the relation's own, send that ancestor's schema + * before sending relation's own (XXX - maybe sending only the former + * suffices?). This is also a good place to set the map that will be used + * to convert the relation's tuples into the ancestor's format, if needed. + */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); @@ -450,8 +454,21 @@ maybe_send_schema(LogicalDecodingContext *ctx, /* Map must live as long as the session does. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc), - CreateTupleDescCopy(outdesc)); + + /* + * Make copies of the TupleDescs that will live as long as the map + * does before putting into the map. + */ + indesc = CreateTupleDescCopy(indesc); + outdesc = CreateTupleDescCopy(outdesc); + relentry->map = convert_tuples_by_name(indesc, outdesc); + if (relentry->map == NULL) + { + /* Map not necessary, so free the TupleDescs too. */ + FreeTupleDesc(indesc); + FreeTupleDesc(outdesc); + } + MemoryContextSwitchTo(oldctx); send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); @@ -1011,6 +1028,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; + entry->map = NULL; /* will be set by maybe_send_schema() if needed */ } /* Validate the entry */ @@ -1191,12 +1209,24 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) /* * Reset schema sent status as the relation definition may have changed. + * Also free any objects that depended on the earlier definition. */ if (entry != NULL) { entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + if (entry->map) + { + /* + * Must free the TupleDescs contained in the map explicitly, + * because free_conversion_map() doesn't. + */ + FreeTupleDesc(entry->map->indesc); + FreeTupleDesc(entry->map->outdesc); + free_conversion_map(entry->map); + } + entry->map = NULL; } } diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 6daf8daa3c..9de01017be 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 54; +use Test::More tests => 56; # setup @@ -624,3 +624,29 @@ is($result, qq(), 'truncate of tab3 replicated'); $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1"); is($result, qq(), 'truncate of tab3_1 replicated'); + +# check that the map to convert tuples from leaf partition to the root +# table is correctly rebuilt when a new column is added +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated');