diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..4985c2abe0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -800,19 +800,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1351b33011..b904697f18 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -397,6 +397,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -467,6 +470,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1677,8 +1681,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1720,6 +1724,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. + * + * Note that we get the spec abort change for each toast + * entry but we need to perform the cleanup only the first + * time we get it for the main table. + */ + if (specinsert != NULL) + { + /* + * We must clean the toast hash before processing a + * completely new tuple to avoid confusion about the + * previous tuple's toast chunks. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* We don't need this record anymore. */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -1827,16 +1857,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } - /* - * There's a speculative insertion remaining, just clean in up, it - * can't have been successful, otherwise we'd gotten a confirmation - * record. - */ - if (specinsert) - { - ReorderBufferReturnChange(rb, specinsert); - specinsert = NULL; - } + /* speculative insertion record must be freed by now */ + Assert(!specinsert); /* clean up the iterator */ ReorderBufferIterTXNFinish(rb, iterstate); @@ -2640,6 +2662,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2747,6 +2770,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -3032,6 +3056,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 019bd382de..676491d3bc 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -46,10 +46,10 @@ typedef struct ReorderBufferTupleBuf * changes. Users of the decoding facilities will never see changes with * *_INTERNAL_* actions. * - * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern - * "speculative insertions", and their confirmation respectively. They're - * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't - * have to care about these. + * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT + * changes concern "speculative insertions", their confirmation, and abort + * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of + * logical decoding don't have to care about these. */ enum ReorderBufferChangeType { @@ -62,6 +62,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE };