Back-patch log_newpage_range().

Back-patch a subset of commit 9155580fd5
to v11, v10, 9.6, and 9.5.  Include the latest repairs to this function.
Use a new XLOG_FPI_MULTI value instead of reusing XLOG_FPI.  That way,
if an older server reads WAL from this function, that server will PANIC
instead of applying just one page of the record.  The next commit adds a
call to this function.

Discussion: https://postgr.es/m/20200304.162919.898938381201316571.horikyota.ntt@gmail.com
This commit is contained in:
Noah Misch 2020-03-21 09:38:33 -07:00
parent 4433c6e8c0
commit ae86e46c3b
6 changed files with 113 additions and 9 deletions

View File

@ -78,7 +78,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
appendStringInfoString(buf, xlrec->rp_name); appendStringInfoString(buf, xlrec->rp_name);
} }
else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT) else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
info == XLOG_FPI_MULTI)
{ {
/* no further information to print */ /* no further information to print */
} }
@ -182,6 +183,9 @@ xlog_identify(uint8 info)
case XLOG_FPI_FOR_HINT: case XLOG_FPI_FOR_HINT:
id = "FPI_FOR_HINT"; id = "FPI_FOR_HINT";
break; break;
case XLOG_FPI_MULTI:
id = "FPI_MULTI";
break;
} }
return id; return id;

View File

@ -9754,7 +9754,7 @@ xlog_redo(XLogReaderState *record)
/* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */ /* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */
Assert(info == XLOG_FPI || info == XLOG_FPI_FOR_HINT || Assert(info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
!XLogRecHasAnyBlockRefs(record)); info == XLOG_FPI_MULTI || !XLogRecHasAnyBlockRefs(record));
if (info == XLOG_NEXTOID) if (info == XLOG_NEXTOID)
{ {
@ -9957,14 +9957,16 @@ xlog_redo(XLogReaderState *record)
{ {
/* nothing to do here */ /* nothing to do here */
} }
else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT) else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
info == XLOG_FPI_MULTI)
{ {
Buffer buffer; uint8 block_id;
/* /*
* Full-page image (FPI) records contain nothing else but a backup * Full-page image (FPI) records contain nothing else but a backup
* block. The block reference must include a full-page image - * block (or multiple backup blocks). Every block reference must
* otherwise there would be no point in this record. * include a full-page image - otherwise there would be no point in
* this record.
* *
* No recovery conflicts are generated by these generic records - if a * No recovery conflicts are generated by these generic records - if a
* resource manager needs to generate conflicts, it has to define a * resource manager needs to generate conflicts, it has to define a
@ -9976,9 +9978,14 @@ xlog_redo(XLogReaderState *record)
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
* code just to distinguish them for statistics purposes. * code just to distinguish them for statistics purposes.
*/ */
if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED) for (block_id = 0; block_id <= record->max_block_id; block_id++)
elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block"); {
UnlockReleaseBuffer(buffer); Buffer buffer;
if (XLogReadBufferForRedo(record, block_id, &buffer) != BLK_RESTORED)
elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
UnlockReleaseBuffer(buffer);
}
} }
else if (info == XLOG_BACKUP_END) else if (info == XLOG_BACKUP_END)
{ {

View File

@ -1021,6 +1021,94 @@ log_newpage_buffer(Buffer buffer, bool page_std)
return log_newpage(&rnode, forkNum, blkno, page, page_std); return log_newpage(&rnode, forkNum, blkno, page, page_std);
} }
/*
* WAL-log a range of blocks in a relation.
*
* An image of all pages with block numbers 'startblk' <= X < 'endblk' is
* written to the WAL. If the range is large, this is done in multiple WAL
* records.
*
* If all page follows the standard page layout, with a PageHeader and unused
* space between pd_lower and pd_upper, set 'page_std' to true. That allows
* the unused space to be left out from the WAL records, making them smaller.
*
* NOTE: This function acquires exclusive-locks on the pages. Typically, this
* is used on a newly-built relation, and the caller is holding a
* AccessExclusiveLock on it, so no other backend can be accessing it at the
* same time. If that's not the case, you must ensure that this does not
* cause a deadlock through some other means.
*/
void
log_newpage_range(Relation rel, ForkNumber forkNum,
BlockNumber startblk, BlockNumber endblk,
bool page_std)
{
int flags;
BlockNumber blkno;
flags = REGBUF_FORCE_IMAGE;
if (page_std)
flags |= REGBUF_STANDARD;
/*
* Iterate over all the pages in the range. They are collected into
* batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
* for each batch.
*/
XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
blkno = startblk;
while (blkno < endblk)
{
Buffer bufpack[XLR_MAX_BLOCK_ID];
XLogRecPtr recptr;
int nbufs;
int i;
CHECK_FOR_INTERRUPTS();
/* Collect a batch of blocks. */
nbufs = 0;
while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
{
Buffer buf = ReadBufferExtended(rel, forkNum, blkno,
RBM_NORMAL, NULL);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
/*
* Completely empty pages are not WAL-logged. Writing a WAL record
* would change the LSN, and we don't want that. We want the page
* to stay empty.
*/
if (!PageIsNew(BufferGetPage(buf)))
bufpack[nbufs++] = buf;
else
UnlockReleaseBuffer(buf);
blkno++;
}
/* Write WAL record for this batch. */
XLogBeginInsert();
START_CRIT_SECTION();
for (i = 0; i < nbufs; i++)
{
XLogRegisterBuffer(i, bufpack[i], flags);
MarkBufferDirty(bufpack[i]);
}
recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
for (i = 0; i < nbufs; i++)
{
PageSetLSN(BufferGetPage(bufpack[i]), recptr);
UnlockReleaseBuffer(bufpack[i]);
}
END_CRIT_SECTION();
}
}
/* /*
* Allocate working buffers needed for WAL record construction. * Allocate working buffers needed for WAL record construction.
*/ */

View File

@ -199,6 +199,7 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_FPW_CHANGE: case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT: case XLOG_FPI_FOR_HINT:
case XLOG_FPI: case XLOG_FPI:
case XLOG_FPI_MULTI:
break; break;
default: default:
elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info); elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);

View File

@ -16,6 +16,7 @@
#include "storage/block.h" #include "storage/block.h"
#include "storage/buf.h" #include "storage/buf.h"
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
#include "utils/relcache.h"
/* /*
* The minimum size of the WAL construction working area. If you need to * The minimum size of the WAL construction working area. If you need to
@ -54,6 +55,8 @@ extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
BlockNumber blk, char *page, bool page_std); BlockNumber blk, char *page, bool page_std);
extern XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std); extern XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std);
extern void log_newpage_range(Relation rel, ForkNumber forkNum,
BlockNumber startblk, BlockNumber endblk, bool page_std);
extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std); extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
extern void InitXLogInsert(void); extern void InitXLogInsert(void);

View File

@ -76,6 +76,7 @@ typedef struct CheckPoint
#define XLOG_END_OF_RECOVERY 0x90 #define XLOG_END_OF_RECOVERY 0x90
#define XLOG_FPI_FOR_HINT 0xA0 #define XLOG_FPI_FOR_HINT 0xA0
#define XLOG_FPI 0xB0 #define XLOG_FPI 0xB0
#define XLOG_FPI_MULTI 0xC0
/* /*