Refactor pgoutput_change().

Instead of mostly-duplicate code for different operation
(insert/update/delete) types, write a common code to compute old/new
tuples, and check the row filter.

Author: Hou Zhijie
Reviewed-by: Peter Smith, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB5716194A47FFA8D91133687D94BF9@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila 2023-03-30 11:10:38 +05:30
parent 902ecd3bd4
commit da324d6cd4
1 changed files with 83 additions and 160 deletions

View File

@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
return;
/*
* This is only possible if deletes are allowed even when replica
* identity is not defined for a table. Since the DELETE action
* can't be published, we simply return.
*/
if (!change->data.tp.oldtuple)
{
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
return;
}
break;
default:
Assert(false);
@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
}
if (change->data.tp.oldtuple)
{
old_slot = relentry->old_slot;
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
&TTSOpsVirtual);
old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
}
}
if (change->data.tp.newtuple)
{
new_slot = relentry->new_slot;
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
&TTSOpsVirtual);
new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
}
}
/*
* Check row filter.
*
* Updates could be transformed to inserts or deletes based on the results
* of the row filter for old and new tuple.
*/
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
goto cleanup;
/*
* Send BEGIN if we haven't yet.
*
* We send the BEGIN message after ensuring that we will actually send the
* change. This avoids sending a pair of BEGIN/COMMIT messages for empty
* transactions.
*/
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
/*
* Schema should be sent using the original relation because it also sends
* the ancestor's relation.
*/
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
/* Send the data */
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
new_slot = relentry->new_slot;
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
new_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleDesc tupdesc = RelationGetDescr(targetrel);
new_slot = execute_attr_map_slot(relentry->attrmap,
new_slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
}
/* Check row filter */
if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
&action))
break;
/*
* Send BEGIN if we haven't yet.
*
* We send the BEGIN message after ensuring that we will actually
* send the change. This avoids sending a pair of BEGIN/COMMIT
* messages for empty transactions.
*/
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
/*
* Schema should be sent using the original relation because it
* also sends the ancestor's relation.
*/
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary, relentry->columns);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (change->data.tp.oldtuple)
{
old_slot = relentry->old_slot;
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
old_slot, false);
}
new_slot = relentry->new_slot;
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
new_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
/* Convert tuples if needed. */
if (relentry->attrmap)
{
TupleDesc tupdesc = RelationGetDescr(targetrel);
if (old_slot)
old_slot = execute_attr_map_slot(relentry->attrmap,
old_slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
new_slot = execute_attr_map_slot(relentry->attrmap,
new_slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
}
/* Check row filter */
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
relentry, &action))
break;
/* Send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
/*
* Updates could be transformed to inserts or deletes based on the
* results of the row filter for old and new tuple.
*/
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel,
new_slot, data->binary,
relentry->columns);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel,
old_slot, new_slot, data->binary,
relentry->columns);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel,
old_slot, data->binary,
relentry->columns);
break;
default:
Assert(false);
}
OutputPluginWrite(ctx, true);
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
new_slot, data->binary, relentry->columns);
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
old_slot = relentry->old_slot;
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
old_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleDesc tupdesc = RelationGetDescr(targetrel);
old_slot = execute_attr_map_slot(relentry->attrmap,
old_slot,
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
}
/* Check row filter */
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
relentry, &action))
break;
/* Send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, xid, targetrel,
old_slot, data->binary,
relentry->columns);
OutputPluginWrite(ctx, true);
}
else
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
data->binary, relentry->columns);
break;
default:
Assert(false);
}
OutputPluginWrite(ctx, true);
cleanup:
if (RelationIsValid(ancestor))
{
RelationClose(ancestor);
ancestor = NULL;
}
/* Cleanup */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}