diff --git a/contrib/test_decoding/expected/toast.out b/contrib/test_decoding/expected/toast.out index 735b14c978..389748ff8c 100644 --- a/contrib/test_decoding/expected/toast.out +++ b/contrib/test_decoding/expected/toast.out @@ -292,6 +292,64 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', COMMIT (235 rows) +-- test we can decode "old" tuples bigger than the max heap tuple size correctly +DROP TABLE IF EXISTS toasted_several; +NOTICE: table "toasted_several" does not exist, skipping +CREATE TABLE toasted_several ( + id serial unique not null, + toasted_key text primary key, + toasted_col1 text, + toasted_col2 text +); +ALTER TABLE toasted_several REPLICA IDENTITY FULL; +ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL; +INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000)); +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null + COMMIT +(3 rows) + +-- test update of a toasted key without changing it +UPDATE toasted_several SET toasted_col1 = toasted_key; +UPDATE toasted_several SET toasted_col2 = toasted_col1; +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null + COMMIT + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null + COMMIT + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210' + COMMIT +(9 rows) + +/* + * update with large tuplebuf, in a transaction large enough to force to spool to disk + */ +BEGIN; +INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234); +UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1; +DELETE FROM toasted_several WHERE id = 1; +COMMIT; +DROP TABLE toasted_several; +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') +WHERE data NOT LIKE '%INSERT: %'; + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum + table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210' + COMMIT +(4 rows) + SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- diff --git a/contrib/test_decoding/sql/toast.sql b/contrib/test_decoding/sql/toast.sql index 26d6b4fbdd..dcb74e38c7 100644 --- a/contrib/test_decoding/sql/toast.sql +++ b/contrib/test_decoding/sql/toast.sql @@ -265,4 +265,41 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL; 203 untoasted200 \. SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test we can decode "old" tuples bigger than the max heap tuple size correctly +DROP TABLE IF EXISTS toasted_several; +CREATE TABLE toasted_several ( + id serial unique not null, + toasted_key text primary key, + toasted_col1 text, + toasted_col2 text +); +ALTER TABLE toasted_several REPLICA IDENTITY FULL; +ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL; + +INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000)); + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test update of a toasted key without changing it +UPDATE toasted_several SET toasted_col1 = toasted_key; +UPDATE toasted_several SET toasted_col2 = toasted_col1; + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +/* + * update with large tuplebuf, in a transaction large enough to force to spool to disk + */ +BEGIN; +INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234); +UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1; +DELETE FROM toasted_several WHERE id = 1; +COMMIT; + +DROP TABLE toasted_several; + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') +WHERE data NOT LIKE '%INSERT: %'; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 33e4343cc2..36e6d9a5c9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } @@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { data = XLogRecGetBlockData(r, 0, &datalen); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } @@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) data = XLogRecGetData(r) + SizeOfHeapUpdate; datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } @@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { + Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - XLogRecGetDataLen(r) - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - - tuple = change->data.tp.newtuple; - - /* not a disk based tuple */ - ItemPointerSetInvalid(&tuple->tuple.t_self); + HeapTupleHeader header; xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); data = ((char *) xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); + + tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; + + /* not a disk based tuple */ + ItemPointerSetInvalid(&tuple->tuple.t_self); + /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, (char *) data, datalen); data += datalen; - tuple->t_data.header.t_infomask = xlhdr->t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2; - tuple->t_data.header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->t_data.header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader); + memset(header, 0, SizeofHeapTupleHeader); - memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader, + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, data + SizeOfHeapHeader, datalen); - tuple->t_data.header.t_infomask = xlhdr.t_infomask; - tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2; - tuple->t_data.header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e20c5114e2..570400ffb7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -444,27 +444,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) /* - * Get an unused, possibly preallocated, ReorderBufferTupleBuf + * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at + * least a tuple of size tuple_len (excluding header overhead). */ ReorderBufferTupleBuf * -ReorderBufferGetTupleBuf(ReorderBuffer *rb) +ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) { ReorderBufferTupleBuf *tuple; + Size alloc_len; - /* check the slab cache */ - if (rb->nr_cached_tuplebufs) + alloc_len = tuple_len + SizeofHeapTupleHeader; + + /* + * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for + * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples + * tuples generated for oldtuples can be bigger, as they don't have + * out-of-line toast columns. + */ + if (alloc_len < MaxHeapTupleSize) + alloc_len = MaxHeapTupleSize; + + + /* if small enough, check the slab cache */ + if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs) { rb->nr_cached_tuplebufs--; tuple = slist_container(ReorderBufferTupleBuf, node, slist_pop_head_node(&rb->cached_tuplebufs)); #ifdef USE_ASSERT_CHECKING - memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf)); + memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData)); +#endif + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); +#ifdef USE_ASSERT_CHECKING + memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size); #endif } else { tuple = (ReorderBufferTupleBuf *) - MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf)); + MemoryContextAlloc(rb->context, + sizeof(ReorderBufferTupleBuf) + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); } return tuple; @@ -479,13 +500,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb) void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) { - /* check whether to put into the slab cache */ - if (rb->nr_cached_tuplebufs < max_cached_tuplebufs) + /* check whether to put into the slab cache, oversized tuples never are */ + if (tuple->alloc_tuple_size == MaxHeapTupleSize && + rb->nr_cached_tuplebufs < max_cached_tuplebufs) { rb->nr_cached_tuplebufs++; slist_push_head(&rb->cached_tuplebufs, &tuple->node); + VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size); VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf)); VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node)); + VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size)); } else { @@ -2092,15 +2116,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, newtup = change->data.tp.newtuple; if (oldtup) - oldlen = offsetof(ReorderBufferTupleBuf, t_data) + - oldtup->tuple.t_len; + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } if (newtup) - newlen = offsetof(ReorderBufferTupleBuf, t_data) + - newtup->tuple.t_len; - - sz += oldlen; - sz += newlen; + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); @@ -2111,14 +2138,20 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, oldtup, oldlen); + memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, oldtup->tuple.t_data, oldlen); data += oldlen; } if (newlen) { - memcpy(data, newtup, newlen); - data += newlen; + memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, newtup->tuple.t_data, newlen); + data += oldlen; } break; } @@ -2337,27 +2370,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.oldtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.oldtuple, data, len); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.tp.oldtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.oldtuple->tuple.t_data = - &change->data.tp.oldtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.oldtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } if (change->data.tp.newtuple) { - Size len = offsetof(ReorderBufferTupleBuf, t_data) + - ((ReorderBufferTupleBuf *) data)->tuple.t_len; + Size tuplelen = ((HeapTuple) data)->t_len; - change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.newtuple, data, len); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.tp.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.newtuple->tuple.t_data = - &change->data.tp.newtuple->t_data.header; - data += len; + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { @@ -2734,7 +2786,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, */ tmphtup = heap_form_tuple(desc, attrs, isnull); Assert(newtup->tuple.t_len <= MaxHeapTupleSize); - Assert(&newtup->t_data.header == newtup->tuple.t_data); + Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); newtup->tuple.t_len = tmphtup->t_len; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b25eae832a..b52d06af92 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -23,16 +23,19 @@ typedef struct ReorderBufferTupleBuf /* position in preallocated list */ slist_node node; - /* tuple, stored sequentially */ + /* tuple header, the interesting bit for users of logical decoding */ HeapTupleData tuple; - union - { - HeapTupleHeaderData header; - char data[MaxHeapTupleSize]; - double align_it; /* ensure t_data is MAXALIGN'd */ - } t_data; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ } ReorderBufferTupleBuf; +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferTupleBufData(p) \ + ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) + /* * Types of the change passed to a 'change' callback. * @@ -341,7 +344,7 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); -ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);