diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index de7522ea01..d9ac42c394 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -957,8 +957,6 @@ hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record) OffsetNumber hoffnum; TransactionId latestRemovedXid = InvalidTransactionId; int i; - char *ptr; - Size len; xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record); @@ -976,13 +974,21 @@ hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record) if (CountDBBackends(InvalidOid) == 0) return latestRemovedXid; + /* + * Check if WAL replay has reached a consistent database state. If not, + * we must PANIC. See the definition of btree_xlog_delete_get_latestRemovedXid + * for more details. + */ + if (!reachedConsistency) + elog(PANIC, "hash_xlog_vacuum_get_latestRemovedXid: cannot operate with inconsistent data"); + /* * Get index page. If the DB is consistent, this should not fail, nor * should any of the heap page fetches below. If one does, we return * InvalidTransactionId to cancel all HS transactions. That's probably * overkill, but it's safe, and certainly better than panicking here. */ - XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno); + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL); if (!BufferIsValid(ibuffer)) @@ -994,9 +1000,7 @@ hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record) * Loop through the deleted index items to obtain the TransactionId from * the heap items they point to. */ - ptr = XLogRecGetBlockData(record, 1, &len); - - unused = (OffsetNumber *) ptr; + unused = (OffsetNumber *) ((char *) xlrec + SizeOfHashVacuumOnePage); for (i = 0; i < xlrec->ntuples; i++) { @@ -1121,23 +1125,15 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) if (action == BLK_NEEDS_REDO) { - char *ptr; - Size len; - - ptr = XLogRecGetBlockData(record, 0, &len); - page = (Page) BufferGetPage(buffer); - if (len > 0) + if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage) { OffsetNumber *unused; - OffsetNumber *unend; - unused = (OffsetNumber *) ptr; - unend = (OffsetNumber *) ((char *) ptr + len); + unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage); - if ((unend - unused) > 0) - PageIndexMultiDelete(page, unused, unend - unused); + PageIndexMultiDelete(page, unused, xldata->ntuples); } /* diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index 8640e85a5c..8699b5bc30 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -344,7 +344,6 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, Page page = BufferGetPage(buf); HashPageOpaque pageopaque; HashMetaPage metap; - double tuples_removed = 0; /* Scan each tuple in page to see if it is marked as LP_DEAD */ maxoff = PageGetMaxOffsetNumber(page); @@ -355,10 +354,7 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, ItemId itemId = PageGetItemId(page, offnum); if (ItemIdIsDead(itemId)) - { deletable[ndeletable++] = offnum; - tuples_removed += 1; - } } if (ndeletable > 0) @@ -386,7 +382,7 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES; metap = HashPageGetMeta(BufferGetPage(metabuf)); - metap->hashm_ntuples -= tuples_removed; + metap->hashm_ntuples -= ndeletable; MarkBufferDirty(buf); MarkBufferDirty(metabuf); @@ -398,13 +394,18 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, XLogRecPtr recptr; xlrec.hnode = hnode; - xlrec.ntuples = tuples_removed; + xlrec.ntuples = ndeletable; XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage); - XLogRegisterBuffer(0, buf, REGBUF_STANDARD); - XLogRegisterBufData(0, (char *) deletable, + /* + * We need the target-offsets array whether or not we store the whole + * buffer, to allow us to find the latestRemovedXid on a standby + * server. + */ + XLogRegisterData((char *) deletable, ndeletable * sizeof(OffsetNumber)); XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD); diff --git a/src/backend/access/rmgrdesc/hashdesc.c b/src/backend/access/rmgrdesc/hashdesc.c index 5f5f4a0255..35d86dc893 100644 --- a/src/backend/access/rmgrdesc/hashdesc.c +++ b/src/backend/access/rmgrdesc/hashdesc.c @@ -113,7 +113,7 @@ hash_desc(StringInfo buf, XLogReaderState *record) { xl_hash_vacuum_one_page *xlrec = (xl_hash_vacuum_one_page *) rec; - appendStringInfo(buf, "ntuples %g", + appendStringInfo(buf, "ntuples %d", xlrec->ntuples); break; } diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h index 2e64cfa3ea..644da2eaf2 100644 --- a/src/include/access/hash_xlog.h +++ b/src/include/access/hash_xlog.h @@ -265,11 +265,13 @@ typedef struct xl_hash_init_bitmap_page typedef struct xl_hash_vacuum_one_page { RelFileNode hnode; - double ntuples; + int ntuples; + + /* TARGET OFFSET NUMBERS FOLLOW AT THE END */ } xl_hash_vacuum_one_page; #define SizeOfHashVacuumOnePage \ - (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(double)) + (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(int)) extern void hash_redo(XLogReaderState *record); extern void hash_desc(StringInfo buf, XLogReaderState *record);