diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8e659620fa..db7d9930cb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -694,6 +694,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) uint32 freespace; int curridx; XLogRecData *rdt; + XLogRecData *rdt_lastnormal; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; @@ -708,6 +709,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) bool updrqst; bool doPageWrites; bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); + uint8 info_orig = info; /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) @@ -731,23 +733,18 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) } /* - * Here we scan the rdata chain, determine which buffers must be backed - * up, and compute the CRC values for the data. Note that the record - * header isn't added into the CRC initially since we don't know the final - * length or info bits quite yet. Thus, the CRC will represent the CRC of - * the whole record in the order "rdata, then backup blocks, then record - * header". + * Here we scan the rdata chain, to determine which buffers must be backed + * up. * * We may have to loop back to here if a race condition is detected below. * We could prevent the race by doing all this work while holding the * insert lock, but it seems better to avoid doing CRC calculations while - * holding the lock. This means we have to be careful about modifying the - * rdata chain until we know we aren't going to loop back again. The only - * change we allow ourselves to make earlier is to set rdt->data = NULL in - * chain items we have decided we will have to back up the whole buffer - * for. This is OK because we will certainly decide the same thing again - * for those items if we do it over; doing it here saves an extra pass - * over the chain later. + * holding the lock. + * + * We add entries for backup blocks to the chain, so that they don't + * need any special treatment in the critical section where the chunks are + * copied into the WAL buffers. Those entries have to be unlinked from the + * chain if we have to loop back here. */ begin:; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) @@ -764,7 +761,6 @@ begin:; */ doPageWrites = fullPageWrites || Insert->forcePageWrites; - INIT_CRC32(rdata_crc); len = 0; for (rdt = rdata;;) { @@ -772,7 +768,6 @@ begin:; { /* Simple data, just include it */ len += rdt->len; - COMP_CRC32(rdata_crc, rdt->data, rdt->len); } else { @@ -783,12 +778,12 @@ begin:; { /* Buffer already referenced by earlier chain item */ if (dtbuf_bkp[i]) - rdt->data = NULL; - else if (rdt->data) { - len += rdt->len; - COMP_CRC32(rdata_crc, rdt->data, rdt->len); + rdt->data = NULL; + rdt->len = 0; } + else if (rdt->data) + len += rdt->len; break; } if (dtbuf[i] == InvalidBuffer) @@ -800,12 +795,10 @@ begin:; { dtbuf_bkp[i] = true; rdt->data = NULL; + rdt->len = 0; } else if (rdt->data) - { len += rdt->len; - COMP_CRC32(rdata_crc, rdt->data, rdt->len); - } break; } } @@ -819,39 +812,6 @@ begin:; rdt = rdt->next; } - /* - * Now add the backup block headers and data into the CRC - */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (dtbuf_bkp[i]) - { - BkpBlock *bkpb = &(dtbuf_xlg[i]); - char *page; - - COMP_CRC32(rdata_crc, - (char *) bkpb, - sizeof(BkpBlock)); - page = (char *) BufferGetBlock(dtbuf[i]); - if (bkpb->hole_length == 0) - { - COMP_CRC32(rdata_crc, - page, - BLCKSZ); - } - else - { - /* must skip the hole */ - COMP_CRC32(rdata_crc, - page, - bkpb->hole_offset); - COMP_CRC32(rdata_crc, - page + (bkpb->hole_offset + bkpb->hole_length), - BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); - } - } - } - /* * NOTE: We disallow len == 0 because it provides a useful bit of extra * error checking in ReadRecord. This means that all callers of @@ -862,70 +822,20 @@ begin:; if (len == 0 && !isLogSwitch) elog(PANIC, "invalid xlog record length %u", len); - START_CRIT_SECTION(); - - /* Now wait to get insert lock */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); - - /* - * Check to see if my RedoRecPtr is out of date. If so, may have to go - * back and recompute everything. This can only happen just after a - * checkpoint, so it's better to be slow in this case and fast otherwise. - * - * If we aren't doing full-page writes then RedoRecPtr doesn't actually - * affect the contents of the XLOG record, so we'll update our local copy - * but not force a recomputation. - */ - if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) - { - Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); - RedoRecPtr = Insert->RedoRecPtr; - - if (doPageWrites) - { - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (dtbuf[i] == InvalidBuffer) - continue; - if (dtbuf_bkp[i] == false && - XLByteLE(dtbuf_lsn[i], RedoRecPtr)) - { - /* - * Oops, this buffer now needs to be backed up, but we - * didn't think so above. Start over. - */ - LWLockRelease(WALInsertLock); - END_CRIT_SECTION(); - goto begin; - } - } - } - } - - /* - * Also check to see if forcePageWrites was just turned on; if we weren't - * already doing full-page writes then go back and recompute. (If it was - * just turned off, we could recompute the record without full pages, but - * we choose not to bother.) - */ - if (Insert->forcePageWrites && !doPageWrites) - { - /* Oops, must redo it with full-page data */ - LWLockRelease(WALInsertLock); - END_CRIT_SECTION(); - goto begin; - } - /* * Make additional rdata chain entries for the backup blocks, so that we - * don't need to special-case them in the write loop. Note that we have - * now irrevocably changed the input rdata chain. At the exit of this - * loop, write_len includes the backup block data. + * don't need to special-case them in the write loop. This modifies the + * original rdata chain, but we keep a pointer to the last regular entry, + * rdt_lastnormal, so that we can undo this if we have to loop back to the + * beginning. + * + * At the exit of this loop, write_len includes the backup block data. * * Also set the appropriate info bits to show which buffers were backed * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct * buffer value (ignoring InvalidBuffer) appearing in the rdata chain. */ + rdt_lastnormal = rdt; write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { @@ -974,6 +884,76 @@ begin:; } } + /* + * Calculate CRC of the data, including all the backup blocks + * + * Note that the record header isn't added into the CRC initially since + * we don't know the prev-link yet. Thus, the CRC will represent the CRC + * of the whole record in the order: rdata, then backup blocks, then + * record header. + */ + INIT_CRC32(rdata_crc); + for (rdt = rdata; rdt != NULL; rdt = rdt->next) + COMP_CRC32(rdata_crc, rdt->data, rdt->len); + + START_CRIT_SECTION(); + + /* Now wait to get insert lock */ + LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + + /* + * Check to see if my RedoRecPtr is out of date. If so, may have to go + * back and recompute everything. This can only happen just after a + * checkpoint, so it's better to be slow in this case and fast otherwise. + * + * If we aren't doing full-page writes then RedoRecPtr doesn't actually + * affect the contents of the XLOG record, so we'll update our local copy + * but not force a recomputation. + */ + if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) + { + Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); + RedoRecPtr = Insert->RedoRecPtr; + + if (doPageWrites) + { + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + if (dtbuf[i] == InvalidBuffer) + continue; + if (dtbuf_bkp[i] == false && + XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + { + /* + * Oops, this buffer now needs to be backed up, but we + * didn't think so above. Start over. + */ + LWLockRelease(WALInsertLock); + END_CRIT_SECTION(); + rdt_lastnormal->next = NULL; + info = info_orig; + goto begin; + } + } + } + } + + /* + * Also check to see if forcePageWrites was just turned on; if we weren't + * already doing full-page writes then go back and recompute. (If it was + * just turned off, we could recompute the record without full pages, but + * we choose not to bother.) + */ + if (Insert->forcePageWrites && !doPageWrites) + { + /* Oops, must redo it with full-page data. */ + LWLockRelease(WALInsertLock); + END_CRIT_SECTION(); + rdt_lastnormal->next = NULL; + info = info_orig; + goto begin; + } + /* * If there isn't enough space on the current XLOG page for a record * header, advance to the next page (leaving the unused space as zeroes).