pgoutput: Fix memory leak due to RelationSyncEntry.map.

Release memory allocated when creating the tuple-conversion map and its
component TupleDescs when its owning sync entry is invalidated.
TupleDescs must also be freed when no map is deemed necessary, to begin
with.

Reported-by: Andres Freund
Author: Amit Langote
Reviewed-by: Takamichi Osumi, Amit Kapila
Backpatch-through: 13, where it was introduced
Discussion: https://postgr.es/m/MEYP282MB166933B1AB02B4FE56E82453B64D9@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM
This commit is contained in:
Amit Kapila 2021-06-01 14:14:02 +05:30
parent a40646e30d
commit eb89cb43a0
2 changed files with 65 additions and 9 deletions

View File

@ -74,7 +74,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
/*
* Entry in the map used to remember which relation schemas we sent.
*
* The schema_sent flag determines if the current schema record was already
* The schema_sent flag determines if the current schema record for the
* relation (and for its ancestor if publish_as_relid is set) was already
* sent to the subscriber (in which case we don't need to send it again).
*
* The schema cache on downstream is however updated only at commit time,
@ -92,10 +93,6 @@ typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
/*
* Did we send the schema? If ancestor relid is set, its schema must also
* have been sent for this to be true.
*/
bool schema_sent;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
@ -437,10 +434,17 @@ maybe_send_schema(LogicalDecodingContext *ctx,
else
schema_sent = relentry->schema_sent;
/* Nothing to do if we already sent the schema. */
if (schema_sent)
return;
/* If needed, send the ancestor's schema first. */
/*
* Nope, so send the schema. If the changes will be published using an
* ancestor's schema, not the relation's own, send that ancestor's schema
* before sending relation's own (XXX - maybe sending only the former
* suffices?). This is also a good place to set the map that will be used
* to convert the relation's tuples into the ancestor's format, if needed.
*/
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
@ -450,8 +454,21 @@ maybe_send_schema(LogicalDecodingContext *ctx,
/* Map must live as long as the session does. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
CreateTupleDescCopy(outdesc));
/*
* Make copies of the TupleDescs that will live as long as the map
* does before putting into the map.
*/
indesc = CreateTupleDescCopy(indesc);
outdesc = CreateTupleDescCopy(outdesc);
relentry->map = convert_tuples_by_name(indesc, outdesc);
if (relentry->map == NULL)
{
/* Map not necessary, so free the TupleDescs too. */
FreeTupleDesc(indesc);
FreeTupleDesc(outdesc);
}
MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
@ -1011,6 +1028,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->publish_as_relid = InvalidOid;
entry->map = NULL; /* will be set by maybe_send_schema() if needed */
}
/* Validate the entry */
@ -1191,12 +1209,24 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
/*
* Reset schema sent status as the relation definition may have changed.
* Also free any objects that depended on the earlier definition.
*/
if (entry != NULL)
{
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
if (entry->map)
{
/*
* Must free the TupleDescs contained in the map explicitly,
* because free_conversion_map() doesn't.
*/
FreeTupleDesc(entry->map->indesc);
FreeTupleDesc(entry->map->outdesc);
free_conversion_map(entry->map);
}
entry->map = NULL;
}
}

View File

@ -6,7 +6,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 54;
use Test::More tests => 56;
# setup
@ -624,3 +624,29 @@ is($result, qq(), 'truncate of tab3 replicated');
$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
is($result, qq(), 'truncate of tab3_1 replicated');
# check that the map to convert tuples from leaf partition to the root
# table is correctly rebuilt when a new column is added
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')");
$node_publisher->wait_for_catchup('sub_viaroot');
$node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber1->safe_psql('postgres',
"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
is( $result, qq(pub_tab2|1|xxx
pub_tab2|3|yyy
pub_tab2|5|zzz
xxx_c|6|aaa), 'inserts into tab2 replicated');
$result = $node_subscriber2->safe_psql('postgres',
"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
is( $result, qq(pub_tab2|1|xxx
pub_tab2|3|yyy
pub_tab2|5|zzz
xxx_c|6|aaa), 'inserts into tab2 replicated');