mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-08-30 12:17:19 +02:00
pgoutput: Fix memory leak due to RelationSyncEntry.map.
Release memory allocated when creating the tuple-conversion map and its component TupleDescs when its owning sync entry is invalidated. TupleDescs must also be freed when no map is deemed necessary, to begin with. Reported-by: Andres Freund Author: Amit Langote Reviewed-by: Takamichi Osumi, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/MEYP282MB166933B1AB02B4FE56E82453B64D9@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM
This commit is contained in:
parent
e2f21ff606
commit
d250568121
@ -57,6 +57,10 @@ static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *c
|
|||||||
/*
|
/*
|
||||||
* Entry in the map used to remember which relation schemas we sent.
|
* Entry in the map used to remember which relation schemas we sent.
|
||||||
*
|
*
|
||||||
|
* The schema_sent flag determines if the current schema record for the
|
||||||
|
* relation (and for its ancestor if publish_as_relid is set) was already sent
|
||||||
|
* to the subscriber (in which case we don't need to send it again).
|
||||||
|
*
|
||||||
* For partitions, 'pubactions' considers not only the table's own
|
* For partitions, 'pubactions' considers not only the table's own
|
||||||
* publications, but also those of all of its ancestors.
|
* publications, but also those of all of its ancestors.
|
||||||
*/
|
*/
|
||||||
@ -64,10 +68,6 @@ typedef struct RelationSyncEntry
|
|||||||
{
|
{
|
||||||
Oid relid; /* relation oid */
|
Oid relid; /* relation oid */
|
||||||
|
|
||||||
/*
|
|
||||||
* Did we send the schema? If ancestor relid is set, its schema must also
|
|
||||||
* have been sent for this to be true.
|
|
||||||
*/
|
|
||||||
bool schema_sent;
|
bool schema_sent;
|
||||||
|
|
||||||
bool replicate_valid;
|
bool replicate_valid;
|
||||||
@ -292,10 +292,17 @@ static void
|
|||||||
maybe_send_schema(LogicalDecodingContext *ctx,
|
maybe_send_schema(LogicalDecodingContext *ctx,
|
||||||
Relation relation, RelationSyncEntry *relentry)
|
Relation relation, RelationSyncEntry *relentry)
|
||||||
{
|
{
|
||||||
|
/* Nothing to do if we already sent the schema. */
|
||||||
if (relentry->schema_sent)
|
if (relentry->schema_sent)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* If needed, send the ancestor's schema first. */
|
/*
|
||||||
|
* Nope, so send the schema. If the changes will be published using an
|
||||||
|
* ancestor's schema, not the relation's own, send that ancestor's schema
|
||||||
|
* before sending relation's own (XXX - maybe sending only the former
|
||||||
|
* suffices?). This is also a good place to set the map that will be used
|
||||||
|
* to convert the relation's tuples into the ancestor's format, if needed.
|
||||||
|
*/
|
||||||
if (relentry->publish_as_relid != RelationGetRelid(relation))
|
if (relentry->publish_as_relid != RelationGetRelid(relation))
|
||||||
{
|
{
|
||||||
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
|
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
|
||||||
@ -305,8 +312,21 @@ maybe_send_schema(LogicalDecodingContext *ctx,
|
|||||||
|
|
||||||
/* Map must live as long as the session does. */
|
/* Map must live as long as the session does. */
|
||||||
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
||||||
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
|
|
||||||
CreateTupleDescCopy(outdesc));
|
/*
|
||||||
|
* Make copies of the TupleDescs that will live as long as the map
|
||||||
|
* does before putting into the map.
|
||||||
|
*/
|
||||||
|
indesc = CreateTupleDescCopy(indesc);
|
||||||
|
outdesc = CreateTupleDescCopy(outdesc);
|
||||||
|
relentry->map = convert_tuples_by_name(indesc, outdesc);
|
||||||
|
if (relentry->map == NULL)
|
||||||
|
{
|
||||||
|
/* Map not necessary, so free the TupleDescs too. */
|
||||||
|
FreeTupleDesc(indesc);
|
||||||
|
FreeTupleDesc(outdesc);
|
||||||
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldctx);
|
MemoryContextSwitchTo(oldctx);
|
||||||
send_relation_and_attrs(ancestor, ctx);
|
send_relation_and_attrs(ancestor, ctx);
|
||||||
RelationClose(ancestor);
|
RelationClose(ancestor);
|
||||||
@ -759,6 +779,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
|
|||||||
list_free(pubids);
|
list_free(pubids);
|
||||||
|
|
||||||
entry->publish_as_relid = publish_as_relid;
|
entry->publish_as_relid = publish_as_relid;
|
||||||
|
entry->map = NULL; /* will be set by maybe_send_schema() if needed */
|
||||||
entry->replicate_valid = true;
|
entry->replicate_valid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -801,9 +822,23 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Reset schema sent status as the relation definition may have changed.
|
* Reset schema sent status as the relation definition may have changed.
|
||||||
|
* Also, free any objects that depended on the earlier definition.
|
||||||
*/
|
*/
|
||||||
if (entry != NULL)
|
if (entry != NULL)
|
||||||
|
{
|
||||||
entry->schema_sent = false;
|
entry->schema_sent = false;
|
||||||
|
if (entry->map)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Must free the TupleDescs contained in the map explicitly,
|
||||||
|
* because free_conversion_map() doesn't.
|
||||||
|
*/
|
||||||
|
FreeTupleDesc(entry->map->indesc);
|
||||||
|
FreeTupleDesc(entry->map->outdesc);
|
||||||
|
free_conversion_map(entry->map);
|
||||||
|
}
|
||||||
|
entry->map = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -3,7 +3,7 @@ use strict;
|
|||||||
use warnings;
|
use warnings;
|
||||||
use PostgresNode;
|
use PostgresNode;
|
||||||
use TestLib;
|
use TestLib;
|
||||||
use Test::More tests => 54;
|
use Test::More tests => 56;
|
||||||
|
|
||||||
# setup
|
# setup
|
||||||
|
|
||||||
@ -621,3 +621,29 @@ is($result, qq(), 'truncate of tab3 replicated');
|
|||||||
|
|
||||||
$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
|
$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
|
||||||
is($result, qq(), 'truncate of tab3_1 replicated');
|
is($result, qq(), 'truncate of tab3_1 replicated');
|
||||||
|
|
||||||
|
# check that the map to convert tuples from leaf partition to the root
|
||||||
|
# table is correctly rebuilt when a new column is added
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')");
|
||||||
|
$node_publisher->safe_psql('postgres',
|
||||||
|
"INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('sub_viaroot');
|
||||||
|
$node_publisher->wait_for_catchup('sub2');
|
||||||
|
|
||||||
|
$result = $node_subscriber1->safe_psql('postgres',
|
||||||
|
"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
|
||||||
|
is( $result, qq(pub_tab2|1|xxx
|
||||||
|
pub_tab2|3|yyy
|
||||||
|
pub_tab2|5|zzz
|
||||||
|
xxx_c|6|aaa), 'inserts into tab2 replicated');
|
||||||
|
|
||||||
|
$result = $node_subscriber2->safe_psql('postgres',
|
||||||
|
"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
|
||||||
|
is( $result, qq(pub_tab2|1|xxx
|
||||||
|
pub_tab2|3|yyy
|
||||||
|
pub_tab2|5|zzz
|
||||||
|
xxx_c|6|aaa), 'inserts into tab2 replicated');
|
||||||
|
Loading…
Reference in New Issue
Block a user