Fix assertion during streaming of multi-insert toast changes.

While decoding the multi-insert WAL we can't clean the toast untill we get
the last insert of that WAL record. Now if we stream the changes before we
get the last change, the memory for toast chunks won't be released and we
expect the txn to have streamed all changes after streaming.  This
restriction is mainly to ensure the correctness of streamed transactions
and it doesn't seem worth uplifting such a restriction just to allow this
case because anyway we will stream the transaction once such an insert is
complete.

Previously we were using two different flags (one for toast tuples and
another for speculative inserts) to indicate partial changes. Now instead
we replaced both of them with a single flag to indicate partial changes.

Reported-by: Pavan Deolasee
Author: Dilip Kumar
Reviewed-by: Pavan Deolasee, Amit Kapila
Discussion: https://postgr.es/m/CABOikdN-_858zojYN-2tNcHiVTw-nhxPwoQS4quExeweQfG1Ug@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-05-27 07:59:43 +05:30
parent 190fa5a00a
commit 6f4bdf8152
4 changed files with 73 additions and 42 deletions

View File

@ -82,6 +82,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
committing streamed transaction
(13 rows)
-- streaming test for toast with multi-insert
\COPY stream_test FROM STDIN
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
------------------------------------------
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
opening a streamed block for transaction
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
(17 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot

File diff suppressed because one or more lines are too long

View File

@ -705,31 +705,35 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
toptxn = txn;
/*
* Set the toast insert bit whenever we get toast insert to indicate a
* partial change and clear it when we get the insert or update on main
* table (Both update and insert will do the insert in the toast table).
* Indicate a partial change for toast inserts. The change will be
* considered as complete once we get the insert or update on the main
* table and we are sure that the pending toast chunks are not required
* anymore.
*
* If we allow streaming when there are pending toast chunks then such
* chunks won't be released till the insert (multi_insert) is complete and
* we expect the txn to have streamed all changes after streaming. This
* restriction is mainly to ensure the correctness of streamed
* transactions and it doesn't seem worth uplifting such a restriction
* just to allow this case because anyway we will stream the transaction
* once such an insert is complete.
*/
if (toast_insert)
toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
else if (rbtxn_has_toast_insert(toptxn) &&
IsInsertOrUpdate(change->action))
toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
else if (rbtxn_has_partial_change(toptxn) &&
IsInsertOrUpdate(change->action) &&
change->data.tp.clear_toast_afterwards)
toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
/*
* Set the spec insert bit whenever we get the speculative insert to
* indicate the partial change and clear the same on speculative confirm.
* Indicate a partial change for speculative inserts. The change will be
* considered as complete once we get the speculative confirm token.
*/
if (IsSpecInsert(change->action))
toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
else if (IsSpecConfirm(change->action))
{
/*
* Speculative confirm change must be preceded by speculative
* insertion.
*/
Assert(rbtxn_has_spec_insert(toptxn));
toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
}
toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
else if (rbtxn_has_partial_change(toptxn) &&
IsSpecConfirm(change->action))
toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
/*
* Stream the transaction if it is serialized before and the changes are
@ -741,7 +745,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
* changes. Delaying such transactions would increase apply lag for them.
*/
if (ReorderBufferCanStartStreaming(rb) &&
!(rbtxn_has_incomplete_tuple(toptxn)) &&
!(rbtxn_has_partial_change(toptxn)) &&
rbtxn_is_serialized(txn))
ReorderBufferStreamTXN(rb, toptxn);
}
@ -3399,7 +3403,7 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
Assert(txn->base_snapshot != NULL);
if ((largest == NULL || txn->total_size > largest_size) &&
(txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
{
largest = txn;
largest_size = txn->total_size;

View File

@ -172,10 +172,9 @@ typedef struct ReorderBufferChange
#define RBTXN_IS_SERIALIZED 0x0004
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_TOAST_INSERT 0x0020
#define RBTXN_HAS_SPEC_INSERT 0x0040
#define RBTXN_PREPARE 0x0080
#define RBTXN_SKIPPED_PREPARE 0x0100
#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@ -201,24 +200,10 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
/* This transaction's changes has toast insert, without main table insert. */
#define rbtxn_has_toast_insert(txn) \
/* Has this transaction contains partial changes? */
#define rbtxn_has_partial_change(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
)
/*
* This transaction's changes has speculative insert, without speculative
* confirm.
*/
#define rbtxn_has_spec_insert(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
)
/* Check whether this transaction has an incomplete change. */
#define rbtxn_has_incomplete_tuple(txn) \
( \
rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
/*