From da324d6cd45bbbcc1682cc2fcbc4f575281916af Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 30 Mar 2023 11:10:38 +0530 Subject: [PATCH] Refactor pgoutput_change(). Instead of mostly-duplicate code for different operation (insert/update/delete) types, write a common code to compute old/new tuples, and check the row filter. Author: Hou Zhijie Reviewed-by: Peter Smith, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716194A47FFA8D91133687D94BF9@OS0PR01MB5716.jpnprd01.prod.outlook.com --- src/backend/replication/pgoutput/pgoutput.c | 243 +++++++------------- 1 file changed, 83 insertions(+), 160 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3a2d2e357e..ebaf555d56 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) return; + + /* + * This is only possible if deletes are allowed even when replica + * identity is not defined for a table. Since the DELETE action + * can't be published, we simply return. + */ + if (!change->data.tp.oldtuple) + { + elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + return; + } break; default: Assert(false); @@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + targetrel = ancestor; + } + + if (change->data.tp.oldtuple) + { + old_slot = relentry->old_slot; + ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false); + + /* Convert tuple if needed. */ + if (relentry->attrmap) + { + TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel), + &TTSOpsVirtual); + + old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot); + } + } + + if (change->data.tp.newtuple) + { + new_slot = relentry->new_slot; + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false); + + /* Convert tuple if needed. */ + if (relentry->attrmap) + { + TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel), + &TTSOpsVirtual); + + new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot); + } + } + + /* + * Check row filter. + * + * Updates could be transformed to inserts or deletes based on the results + * of the row filter for old and new tuple. + */ + if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + goto cleanup; + + /* + * Send BEGIN if we haven't yet. + * + * We send the BEGIN message after ensuring that we will actually send the + * change. This avoids sending a pair of BEGIN/COMMIT messages for empty + * transactions. + */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* + * Schema should be sent using the original relation because it also sends + * the ancestor's relation. + */ + maybe_send_schema(ctx, change, relation, relentry); + + OutputPluginPrepareWrite(ctx, true); + /* Send the data */ switch (action) { case REORDER_BUFFER_CHANGE_INSERT: - new_slot = relentry->new_slot; - ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, - new_slot, false); - - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) - { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - targetrel = ancestor; - /* Convert tuple if needed. */ - if (relentry->attrmap) - { - TupleDesc tupdesc = RelationGetDescr(targetrel); - - new_slot = execute_attr_map_slot(relentry->attrmap, - new_slot, - MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); - } - } - - /* Check row filter */ - if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, - &action)) - break; - - /* - * Send BEGIN if we haven't yet. - * - * We send the BEGIN message after ensuring that we will actually - * send the change. This avoids sending a pair of BEGIN/COMMIT - * messages for empty transactions. - */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - - /* - * Schema should be sent using the original relation because it - * also sends the ancestor's relation. - */ - maybe_send_schema(ctx, change, relation, relentry); - - OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary, relentry->columns); - OutputPluginWrite(ctx, true); break; case REORDER_BUFFER_CHANGE_UPDATE: - if (change->data.tp.oldtuple) - { - old_slot = relentry->old_slot; - ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, - old_slot, false); - } - - new_slot = relentry->new_slot; - ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, - new_slot, false); - - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) - { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - targetrel = ancestor; - /* Convert tuples if needed. */ - if (relentry->attrmap) - { - TupleDesc tupdesc = RelationGetDescr(targetrel); - - if (old_slot) - old_slot = execute_attr_map_slot(relentry->attrmap, - old_slot, - MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); - - new_slot = execute_attr_map_slot(relentry->attrmap, - new_slot, - MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); - } - } - - /* Check row filter */ - if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, - relentry, &action)) - break; - - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - - maybe_send_schema(ctx, change, relation, relentry); - - OutputPluginPrepareWrite(ctx, true); - - /* - * Updates could be transformed to inserts or deletes based on the - * results of the row filter for old and new tuple. - */ - switch (action) - { - case REORDER_BUFFER_CHANGE_INSERT: - logicalrep_write_insert(ctx->out, xid, targetrel, - new_slot, data->binary, - relentry->columns); - break; - case REORDER_BUFFER_CHANGE_UPDATE: - logicalrep_write_update(ctx->out, xid, targetrel, - old_slot, new_slot, data->binary, - relentry->columns); - break; - case REORDER_BUFFER_CHANGE_DELETE: - logicalrep_write_delete(ctx->out, xid, targetrel, - old_slot, data->binary, - relentry->columns); - break; - default: - Assert(false); - } - - OutputPluginWrite(ctx, true); + logicalrep_write_update(ctx->out, xid, targetrel, old_slot, + new_slot, data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: - if (change->data.tp.oldtuple) - { - old_slot = relentry->old_slot; - - ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, - old_slot, false); - - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) - { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - targetrel = ancestor; - /* Convert tuple if needed. */ - if (relentry->attrmap) - { - TupleDesc tupdesc = RelationGetDescr(targetrel); - - old_slot = execute_attr_map_slot(relentry->attrmap, - old_slot, - MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); - } - } - - /* Check row filter */ - if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, - relentry, &action)) - break; - - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - - maybe_send_schema(ctx, change, relation, relentry); - - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, xid, targetrel, - old_slot, data->binary, - relentry->columns); - OutputPluginWrite(ctx, true); - } - else - elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, + data->binary, relentry->columns); break; default: Assert(false); } + OutputPluginWrite(ctx, true); + +cleanup: if (RelationIsValid(ancestor)) { RelationClose(ancestor); ancestor = NULL; } - /* Cleanup */ MemoryContextSwitchTo(old); MemoryContextReset(data->context); }