diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 9aa56411d5..368aa73ce2 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -72,7 +72,9 @@ int recovery_prefetch = RECOVERY_PREFETCH_TRY; #ifdef USE_PREFETCH -#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF) +#define RecoveryPrefetchEnabled() \ + (recovery_prefetch != RECOVERY_PREFETCH_OFF && \ + maintenance_io_concurrency > 0) #else #define RecoveryPrefetchEnabled() false #endif @@ -985,6 +987,7 @@ XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) { DecodedXLogRecord *record; + XLogRecPtr replayed_up_to; /* * See if it's time to reset the prefetching machinery, because a relevant @@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) if (RecoveryPrefetchEnabled()) { - max_inflight = Max(maintenance_io_concurrency, 2); + Assert(maintenance_io_concurrency > 0); + max_inflight = maintenance_io_concurrency; max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER; } else @@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) } /* - * Release last returned record, if there is one. We need to do this so - * that we can check for empty decode queue accurately. + * Release last returned record, if there is one, as it's now been + * replayed. */ - XLogReleasePreviousRecord(prefetcher->reader); + replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader); - /* If there's nothing queued yet, then start prefetching. */ + /* + * Can we drop any filters yet? If we were waiting for a relation to be + * created or extended, it is now OK to access blocks in the covered + * range. + */ + XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to); + + /* + * All IO initiated by earlier WAL is now completed. This might trigger + * further prefetching. + */ + lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to); + + /* + * If there's nothing queued yet, then start prefetching to cause at least + * one record to be queued. + */ if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader)) + { + Assert(lrq_inflight(prefetcher->streaming_read) == 0); + Assert(lrq_completed(prefetcher->streaming_read) == 0); lrq_prefetch(prefetcher->streaming_read); + } /* Read the next record. */ record = XLogNextRecord(prefetcher->reader, errmsg); @@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) Assert(record == prefetcher->reader->record); /* - * Can we drop any prefetch filters yet, given the record we're about to - * return? This assumes that any records with earlier LSNs have been - * replayed, so if we were waiting for a relation to be created or - * extended, it is now OK to access blocks in the covered range. + * If maintenance_io_concurrency is set very low, we might have started + * prefetching some but not all of the blocks referenced in the record + * we're about to return. Forget about the rest of the blocks in this + * record by dropping the prefetcher's reference to it. */ - XLogPrefetcherCompleteFilters(prefetcher, record->lsn); + if (record == prefetcher->record) + prefetcher->record = NULL; /* * See if it's time to compute some statistics, because enough WAL has @@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn)) XLogPrefetcherComputeStats(prefetcher); - /* - * The caller is about to replay this record, so we can now report that - * all IO initiated because of early WAL must be finished. This may - * trigger more readahead. - */ - lrq_complete_lsn(prefetcher->streaming_read, record->lsn); - Assert(record == prefetcher->reader->record); return &record->header; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cdcacc7803..cd3dd8cc5c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) } /* - * See if we can release the last record that was returned by - * XLogNextRecord(), if any, to free up space. + * Release the last record that was returned by XLogNextRecord(), if any, to + * free up space. Returns the LSN past the end of the record. */ -void +XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state) { DecodedXLogRecord *record; + XLogRecPtr next_lsn; if (!state->record) - return; + return InvalidXLogRecPtr; /* * Remove it from the decoded record queue. It must be the oldest item * decoded, decode_queue_head. */ record = state->record; + next_lsn = record->next_lsn; Assert(record == state->decode_queue_head); state->record = NULL; state->decode_queue_head = record->next; @@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state) state->decode_buffer_tail = state->decode_buffer; } } + + return next_lsn; } /* @@ -907,6 +911,17 @@ err: */ state->abortedRecPtr = RecPtr; state->missingContrecPtr = targetPagePtr; + + /* + * If we got here without reporting an error, report one now so that + * XLogPrefetcherReadRecord() doesn't bring us back a second time and + * clobber the above state. Otherwise, the existing error takes + * precedence. + */ + if (!state->errormsg_buf[0]) + report_invalid_record(state, + "missing contrecord at %X/%X", + LSN_FORMAT_ARGS(RecPtr)); } if (decoded && decoded->oversized) diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 97b6f00114..6afec33d41 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, char **errormsg); /* Release the previously returned record, if necessary. */ -extern void XLogReleasePreviousRecord(XLogReaderState *state); +extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state); /* Try to read ahead, if there is data and space. */ extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,