diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6901e71f9d..ac533968a0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3657,6 +3657,70 @@ include_dir 'conf.d'
+
+
+ Recovery
+
+
+ configuration
+ of recovery
+ general settings
+
+
+
+ This section describes the settings that apply to recovery in general,
+ affecting crash recovery, streaming replication and archive-based
+ replication.
+
+
+
+
+
+ recovery_prefetch (enum)
+
+ recovery_prefetch configuration parameter
+
+
+
+
+ Whether to try to prefetch blocks that are referenced in the WAL that
+ are not yet in the buffer pool, during recovery. Valid values are
+ off (the default), on and
+ try. The setting try enables
+ prefetching only if the operating system provides the
+ posix_fadvise function, which is currently used
+ to implement prefetching. Note that some operating systems provide the
+ function, but it doesn't do anything.
+
+
+ Prefetching blocks that will soon be needed can reduce I/O wait times
+ during recovery with some workloads.
+ See also the and
+ settings, which limit
+ prefetching activity.
+
+
+
+
+
+ wal_decode_buffer_size (integer)
+
+ wal_decode_buffer_size configuration parameter
+
+
+
+
+ A limit on how far ahead the server can look in the WAL, to find
+ blocks to prefetch. If this value is specified without units, it is
+ taken as bytes.
+ The default is 512kB.
+
+
+
+
+
+
+
Archive Recovery
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 24924647b5..76766d28dd 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -328,6 +328,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+
+ pg_stat_recovery_prefetchpg_stat_recovery_prefetch
+ Only one row, showing statistics about blocks prefetched during recovery.
+ See for details.
+
+
+
pg_stat_subscriptionpg_stat_subscription
At least one row per subscription, showing information about
@@ -2979,6 +2986,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
copy of the subscribed tables.
+
+ pg_stat_recovery_prefetch View
+
+
+
+ Column
+ Type
+ Description
+
+
+
+
+
+ prefetch
+ bigint
+ Number of blocks prefetched because they were not in the buffer pool
+
+
+ hit
+ bigint
+ Number of blocks not prefetched because they were already in the buffer pool
+
+
+ skip_init
+ bigint
+ Number of blocks not prefetched because they would be zero-initialized
+
+
+ skip_new
+ bigint
+ Number of blocks not prefetched because they didn't exist yet
+
+
+ skip_fpw
+ bigint
+ Number of blocks not prefetched because a full page image was included in the WAL
+
+
+ skip_rep
+ bigint
+ Number of blocks not prefetched because they were already recently prefetched
+
+
+ wal_distance
+ integer
+ How many bytes ahead the prefetcher is looking
+
+
+ block_distance
+ integer
+ How many blocks ahead the prefetcher is looking
+
+
+ io_depth
+ integer
+ How many prefetches have been initiated but are not yet known to have completed
+
+
+
+
+
+
+ The pg_stat_recovery_prefetch view will contain
+ only one row. It is filled with nulls if recovery has not run or
+ is not enabled. The
+ columns wal_distance,
+ block_distance
+ and io_depth show current values, and the
+ other columns show cumulative counters that can be reset
+ with the pg_stat_reset_shared function.
+
+
pg_stat_subscription View
@@ -5199,8 +5278,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
all the counters shown in
the pg_stat_bgwriter
view, archiver to reset all the counters shown in
- the pg_stat_archiver view or wal
- to reset all the counters shown in the pg_stat_wal view.
+ the pg_stat_archiver view,
+ wal to reset all the counters shown in the
+ pg_stat_wal view or
+ recovery_prefetch to reset all the counters shown
+ in the pg_stat_recovery_prefetch view.
This function is restricted to superusers by default, but other users
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index 2bb27a8468..6b3406b7de 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -803,6 +803,18 @@
counted as wal_write and wal_sync
in pg_stat_wal, respectively.
+
+
+ The parameter can be used to reduce
+ I/O wait times during recovery by instructing the kernel to initiate reads
+ of disk blocks that will soon be needed but are not currently in
+ PostgreSQL's buffer pool.
+ The and
+ settings limit prefetching
+ concurrency and distance, respectively. By default, it is set to
+ try, which enabled the feature on systems where
+ posix_fadvise is available.
+
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 79314c69ab..8c17c88dfc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
xlogarchive.o \
xlogfuncs.o \
xloginsert.o \
+ xlogprefetcher.o \
xlogreader.o \
xlogrecovery.o \
xlogutils.o
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c076e48445..6770c3ddba 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -59,6 +59,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
@@ -133,6 +134,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
int max_slot_wal_keep_size_mb = -1;
+int wal_decode_buffer_size = 512 * 1024;
bool track_wal_io_timing = false;
#ifdef WAL_DEBUG
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
new file mode 100644
index 0000000000..f3428888d2
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -0,0 +1,1082 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.c
+ * Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogprefetcher.c
+ *
+ * This module provides a drop-in replacement for an XLogReader that tries to
+ * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
+ * accessed in the near future are not already in the buffer pool, it initiates
+ * I/Os that might complete before the caller eventually needs the data. When
+ * referenced blocks are found in the buffer pool already, the buffer is
+ * recorded in the decoded record so that XLogReadBufferForRedo() can try to
+ * avoid a second buffer mapping table lookup.
+ *
+ * Currently, only the main fork is considered for prefetching. Currently,
+ * prefetching is only effective on systems where BufferPrefetch() does
+ * something useful (mainly Linux).
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
+#include "commands/dbcommands_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Every time we process this much WAL, we'll update the values in
+ * pg_stat_recovery_prefetch.
+ */
+#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
+
+/*
+ * To detect repeated access to the same block and skip useless extra system
+ * calls, we remember a small window of recently prefetched blocks.
+ */
+#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
+
+/*
+ * When maintenance_io_concurrency is not saturated, we're prepared to look
+ * ahead up to N times that number of block references.
+ */
+#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
+
+/* Define to log internal debugging messages. */
+/* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
+
+/* GUCs */
+int recovery_prefetch = RECOVERY_PREFETCH_TRY;
+
+#ifdef USE_PREFETCH
+#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
+#else
+#define RecoveryPrefetchEnabled() false
+#endif
+
+static int XLogPrefetchReconfigureCount = 0;
+
+/*
+ * Enum used to report whether an IO should be started.
+ */
+typedef enum
+{
+ LRQ_NEXT_NO_IO,
+ LRQ_NEXT_IO,
+ LRQ_NEXT_AGAIN
+} LsnReadQueueNextStatus;
+
+/*
+ * Type of callback that can decide which block to prefetch next. For now
+ * there is only one.
+ */
+typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
+ XLogRecPtr *lsn);
+
+/*
+ * A simple circular queue of LSNs, using to control the number of
+ * (potentially) inflight IOs. This stands in for a later more general IO
+ * control mechanism, which is why it has the apparently unnecessary
+ * indirection through a function pointer.
+ */
+typedef struct LsnReadQueue
+{
+ LsnReadQueueNextFun next;
+ uintptr_t lrq_private;
+ uint32 max_inflight;
+ uint32 inflight;
+ uint32 completed;
+ uint32 head;
+ uint32 tail;
+ uint32 size;
+ struct
+ {
+ bool io;
+ XLogRecPtr lsn;
+ } queue[FLEXIBLE_ARRAY_MEMBER];
+} LsnReadQueue;
+
+/*
+ * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
+ * blocks that will be soon be referenced, to try to avoid IO stalls.
+ */
+struct XLogPrefetcher
+{
+ /* WAL reader and current reading state. */
+ XLogReaderState *reader;
+ DecodedXLogRecord *record;
+ int next_block_id;
+
+ /* When to publish stats. */
+ XLogRecPtr next_stats_shm_lsn;
+
+ /* Book-keeping to avoid accessing blocks that don't exist yet. */
+ HTAB *filter_table;
+ dlist_head filter_queue;
+
+ /* Book-keeping to avoid repeat prefetches. */
+ RelFileNode recent_rnode[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
+ BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
+ int recent_idx;
+
+ /* Book-keeping to disable prefetching temporarily. */
+ XLogRecPtr no_readahead_until;
+
+ /* IO depth manager. */
+ LsnReadQueue *streaming_read;
+
+ XLogRecPtr begin_ptr;
+
+ int reconfigure_count;
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that (we assume) have already been dropped, or will be created by bulk WAL
+ * operators.
+ */
+typedef struct XLogPrefetcherFilter
+{
+ RelFileNode rnode;
+ XLogRecPtr filter_until_replayed;
+ BlockNumber filter_from_block;
+ dlist_node link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_recovery_prefetch.
+ */
+typedef struct XLogPrefetchStats
+{
+ pg_atomic_uint64 reset_time; /* Time of last reset. */
+ pg_atomic_uint64 prefetch; /* Prefetches initiated. */
+ pg_atomic_uint64 hit; /* Blocks already in cache. */
+ pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
+ pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
+ pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
+ pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
+
+ /* Dynamic values */
+ int wal_distance; /* Number of WAL bytes ahead. */
+ int block_distance; /* Number of block references ahead. */
+ int io_depth; /* Number of I/Os in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno,
+ XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
+ XLogRecPtr *lsn);
+
+static XLogPrefetchStats *SharedStats;
+
+static inline LsnReadQueue *
+lrq_alloc(uint32 max_distance,
+ uint32 max_inflight,
+ uintptr_t lrq_private,
+ LsnReadQueueNextFun next)
+{
+ LsnReadQueue *lrq;
+ uint32 size;
+
+ Assert(max_distance >= max_inflight);
+
+ size = max_distance + 1; /* full ring buffer has a gap */
+ lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
+ lrq->lrq_private = lrq_private;
+ lrq->max_inflight = max_inflight;
+ lrq->size = size;
+ lrq->next = next;
+ lrq->head = 0;
+ lrq->tail = 0;
+ lrq->inflight = 0;
+ lrq->completed = 0;
+
+ return lrq;
+}
+
+static inline void
+lrq_free(LsnReadQueue *lrq)
+{
+ pfree(lrq);
+}
+
+static inline uint32
+lrq_inflight(LsnReadQueue *lrq)
+{
+ return lrq->inflight;
+}
+
+static inline uint32
+lrq_completed(LsnReadQueue *lrq)
+{
+ return lrq->completed;
+}
+
+static inline void
+lrq_prefetch(LsnReadQueue *lrq)
+{
+ /* Try to start as many IOs as we can within our limits. */
+ while (lrq->inflight < lrq->max_inflight &&
+ lrq->inflight + lrq->completed < lrq->size - 1)
+ {
+ Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
+ switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
+ {
+ case LRQ_NEXT_AGAIN:
+ return;
+ case LRQ_NEXT_IO:
+ lrq->queue[lrq->head].io = true;
+ lrq->inflight++;
+ break;
+ case LRQ_NEXT_NO_IO:
+ lrq->queue[lrq->head].io = false;
+ lrq->completed++;
+ break;
+ }
+ lrq->head++;
+ if (lrq->head == lrq->size)
+ lrq->head = 0;
+ }
+}
+
+static inline void
+lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
+{
+ /*
+ * We know that LSNs before 'lsn' have been replayed, so we can now assume
+ * that any IOs that were started before then have finished.
+ */
+ while (lrq->tail != lrq->head &&
+ lrq->queue[lrq->tail].lsn < lsn)
+ {
+ if (lrq->queue[lrq->tail].io)
+ lrq->inflight--;
+ else
+ lrq->completed--;
+ lrq->tail++;
+ if (lrq->tail == lrq->size)
+ lrq->tail = 0;
+ }
+ if (RecoveryPrefetchEnabled())
+ lrq_prefetch(lrq);
+}
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+ return sizeof(XLogPrefetchStats);
+}
+
+/*
+ * Reset all counters to zero.
+ */
+void
+XLogPrefetchResetStats(void)
+{
+ pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_write_u64(&SharedStats->prefetch, 0);
+ pg_atomic_write_u64(&SharedStats->hit, 0);
+ pg_atomic_write_u64(&SharedStats->skip_init, 0);
+ pg_atomic_write_u64(&SharedStats->skip_new, 0);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_write_u64(&SharedStats->skip_rep, 0);
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+ bool found;
+
+ SharedStats = (XLogPrefetchStats *)
+ ShmemInitStruct("XLogPrefetchStats",
+ sizeof(XLogPrefetchStats),
+ &found);
+
+ if (!found)
+ {
+ pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_init_u64(&SharedStats->prefetch, 0);
+ pg_atomic_init_u64(&SharedStats->hit, 0);
+ pg_atomic_init_u64(&SharedStats->skip_init, 0);
+ pg_atomic_init_u64(&SharedStats->skip_new, 0);
+ pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_init_u64(&SharedStats->skip_rep, 0);
+ }
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+ XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Increment a counter in shared memory. This is equivalent to *counter++ on a
+ * plain uint64 without any memory barrier or locking, except on platforms
+ * where readers can't read uint64 without possibly observing a torn value.
+ */
+static inline void
+XLogPrefetchIncrement(pg_atomic_uint64 *counter)
+{
+ Assert(AmStartupProcess() || !IsUnderPostmaster);
+ pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL records.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogReaderState *reader)
+{
+ XLogPrefetcher *prefetcher;
+ static HASHCTL hash_table_ctl = {
+ .keysize = sizeof(RelFileNode),
+ .entrysize = sizeof(XLogPrefetcherFilter)
+ };
+
+ prefetcher = palloc0(sizeof(XLogPrefetcher));
+
+ prefetcher->reader = reader;
+ prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+ &hash_table_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ dlist_init(&prefetcher->filter_queue);
+
+ SharedStats->wal_distance = 0;
+ SharedStats->block_distance = 0;
+ SharedStats->io_depth = 0;
+
+ /* First usage will cause streaming_read to be allocated. */
+ prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+
+ return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+ lrq_free(prefetcher->streaming_read);
+ hash_destroy(prefetcher->filter_table);
+ pfree(prefetcher);
+}
+
+/*
+ * Provide access to the reader.
+ */
+XLogReaderState *
+XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
+{
+ return prefetcher->reader;
+}
+
+/*
+ * Update the statistics visible in the pg_stat_recovery_prefetch view.
+ */
+void
+XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
+{
+ uint32 io_depth;
+ uint32 completed;
+ int64 wal_distance;
+
+
+ /* How far ahead of replay are we now? */
+ if (prefetcher->reader->decode_queue_tail)
+ {
+ wal_distance =
+ prefetcher->reader->decode_queue_tail->lsn -
+ prefetcher->reader->decode_queue_head->lsn;
+ }
+ else
+ {
+ wal_distance = 0;
+ }
+
+ /* How many IOs are currently in flight and completed? */
+ io_depth = lrq_inflight(prefetcher->streaming_read);
+ completed = lrq_completed(prefetcher->streaming_read);
+
+ /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
+ SharedStats->io_depth = io_depth;
+ SharedStats->block_distance = io_depth + completed;
+ SharedStats->wal_distance = wal_distance;
+
+ prefetcher->next_stats_shm_lsn =
+ prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
+}
+
+/*
+ * A callback that examines the next block reference in the WAL, and possibly
+ * starts an IO so that a later read will be fast.
+ *
+ * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
+ *
+ * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
+ * that isn't in the buffer pool, and the kernel has been asked to start
+ * reading it to make a future read system call faster. An LSN is written to
+ * *lsn, and the I/O will be considered to have completed once that LSN is
+ * replayed.
+ *
+ * Returns LRQ_NO_IO if we examined the next block reference and found that it
+ * was already in the buffer pool, or we decided for various reasons not to
+ * prefetch.
+ */
+static LsnReadQueueNextStatus
+XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
+{
+ XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
+ XLogReaderState *reader = prefetcher->reader;
+ XLogRecPtr replaying_lsn = reader->ReadRecPtr;
+
+ /*
+ * We keep track of the record and block we're up to between calls with
+ * prefetcher->record and prefetcher->next_block_id.
+ */
+ for (;;)
+ {
+ DecodedXLogRecord *record;
+
+ /* Try to read a new future record, if we don't already have one. */
+ if (prefetcher->record == NULL)
+ {
+ bool nonblocking;
+
+ /*
+ * If there are already records or an error queued up that could
+ * be replayed, we don't want to block here. Otherwise, it's OK
+ * to block waiting for more data: presumably the caller has
+ * nothing else to do.
+ */
+ nonblocking = XLogReaderHasQueuedRecordOrError(reader);
+
+ /* Certain records act as barriers for all readahead. */
+ if (nonblocking && replaying_lsn < prefetcher->no_readahead_until)
+ return LRQ_NEXT_AGAIN;
+
+ record = XLogReadAhead(prefetcher->reader, nonblocking);
+ if (record == NULL)
+ {
+ /*
+ * We can't read any more, due to an error or lack of data in
+ * nonblocking mode.
+ */
+ return LRQ_NEXT_AGAIN;
+ }
+
+ /*
+ * If prefetching is disabled, we don't need to analyze the record
+ * or issue any prefetches. We just need to cause one record to
+ * be decoded.
+ */
+ if (!RecoveryPrefetchEnabled())
+ {
+ *lsn = InvalidXLogRecPtr;
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /* We have a new record to process. */
+ prefetcher->record = record;
+ prefetcher->next_block_id = 0;
+ }
+ else
+ {
+ /* Continue to process from last call, or last loop. */
+ record = prefetcher->record;
+ }
+
+ /*
+ * Check for operations that require us to filter out block ranges, or
+ * pause readahead completely.
+ */
+ if (replaying_lsn < record->lsn)
+ {
+ uint8 rmid = record->header.xl_rmid;
+ uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
+
+ if (rmid == RM_XLOG_ID)
+ {
+ if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
+ record_type == XLOG_END_OF_RECOVERY)
+ {
+ /*
+ * These records might change the TLI. Avoid potential
+ * bugs if we were to allow "read TLI" and "replay TLI" to
+ * differ without more analysis.
+ */
+ prefetcher->no_readahead_until = record->lsn;
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing all readahead until %X/%X is replayed due to possible TLI change",
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+
+ /* Fall through so we move past this record. */
+ }
+ }
+ else if (rmid == RM_DBASE_ID)
+ {
+ /*
+ * When databases are created with the file-copy strategy,
+ * there are no WAL records to tell us about the creation of
+ * individual relations.
+ */
+ if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
+ {
+ xl_dbase_create_file_copy_rec *xlrec =
+ (xl_dbase_create_file_copy_rec *) record->main_data;
+ RelFileNode rnode = {InvalidOid, xlrec->db_id, InvalidOid};
+
+ /*
+ * Don't try to prefetch anything in this database until
+ * it has been created, or we might confuse the blocks of
+ * different generations, if a database OID or relfilenode
+ * is reused. It's also more efficient than discovering
+ * that relations don't exist on disk yet with ENOENT
+ * errors.
+ */
+ XLogPrefetcherAddFilter(prefetcher, rnode, 0, record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
+ rnode.dbNode,
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+ }
+ }
+ else if (rmid == RM_SMGR_ID)
+ {
+ if (record_type == XLOG_SMGR_CREATE)
+ {
+ xl_smgr_create *xlrec = (xl_smgr_create *)
+ record->main_data;
+
+ if (xlrec->forkNum == MAIN_FORKNUM)
+ {
+ /*
+ * Don't prefetch anything for this whole relation
+ * until it has been created. Otherwise we might
+ * confuse the blocks of different generations, if a
+ * relfilenode is reused. This also avoids the need
+ * to discover the problem via extra syscalls that
+ * report ENOENT.
+ */
+ XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
+ record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
+ xlrec->rnode.spcNode,
+ xlrec->rnode.dbNode,
+ xlrec->rnode.relNode,
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+ }
+ }
+ else if (record_type == XLOG_SMGR_TRUNCATE)
+ {
+ xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
+ record->main_data;
+
+ /*
+ * Don't consider prefetching anything in the truncated
+ * range until the truncation has been performed.
+ */
+ XLogPrefetcherAddFilter(prefetcher, xlrec->rnode,
+ xlrec->blkno,
+ record->lsn);
+
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
+ xlrec->rnode.spcNode,
+ xlrec->rnode.dbNode,
+ xlrec->rnode.relNode,
+ xlrec->blkno,
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+ }
+ }
+ }
+
+ /* Scan the block references, starting where we left off last time. */
+ while (prefetcher->next_block_id <= record->max_block_id)
+ {
+ int block_id = prefetcher->next_block_id++;
+ DecodedBkpBlock *block = &record->blocks[block_id];
+ SMgrRelation reln;
+ PrefetchBufferResult result;
+
+ if (!block->in_use)
+ continue;
+
+ Assert(!BufferIsValid(block->prefetch_buffer));;
+
+ /*
+ * Record the LSN of this record. When it's replayed,
+ * LsnReadQueue will consider any IOs submitted for earlier LSNs
+ * to be finished.
+ */
+ *lsn = record->lsn;
+
+ /* We don't try to prefetch anything but the main fork for now. */
+ if (block->forknum != MAIN_FORKNUM)
+ {
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /*
+ * If there is a full page image attached, we won't be reading the
+ * page, so don't bother trying to prefetch.
+ */
+ if (block->has_image)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_fpw);
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /* There is no point in reading a page that will be zeroed. */
+ if (block->flags & BKPBLOCK_WILL_INIT)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_init);
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /* Should we skip prefetching this block due to a filter? */
+ if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /* There is no point in repeatedly prefetching the same block. */
+ for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
+ {
+ if (block->blkno == prefetcher->recent_block[i] &&
+ RelFileNodeEquals(block->rnode, prefetcher->recent_rnode[i]))
+ {
+ /*
+ * XXX If we also remembered where it was, we could set
+ * recent_buffer so that recovery could skip smgropen()
+ * and a buffer table lookup.
+ */
+ XLogPrefetchIncrement(&SharedStats->skip_rep);
+ return LRQ_NEXT_NO_IO;
+ }
+ }
+ prefetcher->recent_rnode[prefetcher->recent_idx] = block->rnode;
+ prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
+ prefetcher->recent_idx =
+ (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
+
+ /*
+ * We could try to have a fast path for repeated references to the
+ * same relation (with some scheme to handle invalidations
+ * safely), but for now we'll call smgropen() every time.
+ */
+ reln = smgropen(block->rnode, InvalidBackendId);
+
+ /*
+ * If the relation file doesn't exist on disk, for example because
+ * we're replaying after a crash and the file will be created and
+ * then unlinked by WAL that hasn't been replayed yet, suppress
+ * further prefetching in the relation until this record is
+ * replayed.
+ */
+ if (!smgrexists(reln, MAIN_FORKNUM))
+ {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
+ reln->smgr_rnode.node.spcNode,
+ reln->smgr_rnode.node.dbNode,
+ reln->smgr_rnode.node.relNode,
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /*
+ * If the relation isn't big enough to contain the referenced
+ * block yet, suppress prefetching of this block and higher until
+ * this record is replayed.
+ */
+ if (block->blkno >= smgrnblocks(reln, block->forknum))
+ {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
+ reln->smgr_rnode.node.spcNode,
+ reln->smgr_rnode.node.dbNode,
+ reln->smgr_rnode.node.relNode,
+ block->blkno,
+ LSN_FORMAT_ARGS(record->lsn));
+#endif
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ return LRQ_NEXT_NO_IO;
+ }
+
+ /* Try to initiate prefetching. */
+ result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+ if (BufferIsValid(result.recent_buffer))
+ {
+ /* Cache hit, nothing to do. */
+ XLogPrefetchIncrement(&SharedStats->hit);
+ block->prefetch_buffer = result.recent_buffer;
+ return LRQ_NEXT_NO_IO;
+ }
+ else if (result.initiated_io)
+ {
+ /* Cache miss, I/O (presumably) started. */
+ XLogPrefetchIncrement(&SharedStats->prefetch);
+ block->prefetch_buffer = InvalidBuffer;
+ return LRQ_NEXT_IO;
+ }
+ else
+ {
+ /*
+ * This shouldn't be possible, because we already determined
+ * that the relation exists on disk and is big enough.
+ * Something is wrong with the cache invalidation for
+ * smgrexists(), smgrnblocks(), or the file was unlinked or
+ * truncated beneath our feet?
+ */
+ elog(ERROR,
+ "could not prefetch relation %u/%u/%u block %u",
+ reln->smgr_rnode.node.spcNode,
+ reln->smgr_rnode.node.dbNode,
+ reln->smgr_rnode.node.relNode,
+ block->blkno);
+ }
+ }
+
+ /*
+ * Several callsites need to be able to read exactly one record
+ * without any internal readahead. Examples: xlog.c reading
+ * checkpoint records with emode set to PANIC, which might otherwise
+ * cause XLogPageRead() to panic on some future page, and xlog.c
+ * determining where to start writing WAL next, which depends on the
+ * contents of the reader's internal buffer after reading one record.
+ * Therefore, don't even think about prefetching until the first
+ * record after XLogPrefetcherBeginRead() has been consumed.
+ */
+ if (prefetcher->reader->decode_queue_tail &&
+ prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
+ return LRQ_NEXT_AGAIN;
+
+ /* Advance to the next record. */
+ prefetcher->record = NULL;
+ }
+ pg_unreachable();
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
+ bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
+
+ SetSingleFuncCall(fcinfo, 0);
+
+ for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
+ nulls[i] = false;
+
+ values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
+ values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
+ values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
+ values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
+ values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
+ values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
+ values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
+ values[7] = Int32GetDatum(SharedStats->wal_distance);
+ values[8] = Int32GetDatum(SharedStats->block_distance);
+ values[9] = Int32GetDatum(SharedStats->io_depth);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ return (Datum) 0;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno, XLogRecPtr lsn)
+{
+ XLogPrefetcherFilter *filter;
+ bool found;
+
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+ if (!found)
+ {
+ /*
+ * Don't allow any prefetching of this block or higher until replayed.
+ */
+ filter->filter_until_replayed = lsn;
+ filter->filter_from_block = blockno;
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+ else
+ {
+ /*
+ * We were already filtering this rnode. Extend the filter's lifetime
+ * to cover this WAL record, but leave the lower of the block numbers
+ * there because we don't want to have to track individual blocks.
+ */
+ filter->filter_until_replayed = lsn;
+ dlist_delete(&filter->link);
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ filter->filter_from_block = Min(filter->filter_from_block, blockno);
+ }
+}
+
+/*
+ * Have we replayed any records that caused us to begin filtering a block
+ * range? That means that relations should have been created, extended or
+ * dropped as required, so we can stop filtering out accesses to a given
+ * relfilenode.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+ link,
+ &prefetcher->filter_queue);
+
+ if (filter->filter_until_replayed >= replaying_lsn)
+ break;
+
+ dlist_delete(&filter->link);
+ hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno)
+{
+ /*
+ * Test for empty queue first, because we expect it to be empty most of
+ * the time and we can avoid the hash table lookup in that case.
+ */
+ if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter;
+
+ /* See if the block range is filtered. */
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
+ if (filter && filter->filter_from_block <= blockno)
+ {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
+ rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
+ LSN_FORMAT_ARGS(filter->filter_until_replayed),
+ filter->filter_from_block);
+#endif
+ return true;
+ }
+
+ /* See if the whole database is filtered. */
+ rnode.relNode = InvalidOid;
+ rnode.spcNode = InvalidOid;
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL);
+ if (filter)
+ {
+#ifdef XLOGPREFETCHER_DEBUG_LEVEL
+ elog(XLOGPREFETCHER_DEBUG_LEVEL,
+ "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
+ rnode.spcNode, rnode.dbNode, rnode.relNode, blockno,
+ LSN_FORMAT_ARGS(filter->filter_until_replayed));
+#endif
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/*
+ * A wrapper for XLogBeginRead() that also resets the prefetcher.
+ */
+void
+XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
+{
+ /* This will forget about any in-flight IO. */
+ prefetcher->reconfigure_count--;
+
+ /* Book-keeping to avoid readahead on first read. */
+ prefetcher->begin_ptr = recPtr;
+
+ prefetcher->no_readahead_until = 0;
+
+ /* This will forget about any queued up records in the decoder. */
+ XLogBeginRead(prefetcher->reader, recPtr);
+}
+
+/*
+ * A wrapper for XLogReadRecord() that provides the same interface, but also
+ * tries to initiate I/O for blocks referenced in future WAL records.
+ */
+XLogRecord *
+XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
+{
+ DecodedXLogRecord *record;
+
+ /*
+ * See if it's time to reset the prefetching machinery, because a relevant
+ * GUC was changed.
+ */
+ if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
+ {
+ uint32 max_distance;
+ uint32 max_inflight;
+
+ if (prefetcher->streaming_read)
+ lrq_free(prefetcher->streaming_read);
+
+ if (RecoveryPrefetchEnabled())
+ {
+ max_inflight = Max(maintenance_io_concurrency, 2);
+ max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
+ }
+ else
+ {
+ max_inflight = 1;
+ max_distance = 1;
+ }
+
+ prefetcher->streaming_read = lrq_alloc(max_distance,
+ max_inflight,
+ (uintptr_t) prefetcher,
+ XLogPrefetcherNextBlock);
+
+ prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
+ }
+
+ /*
+ * Release last returned record, if there is one. We need to do this so
+ * that we can check for empty decode queue accurately.
+ */
+ XLogReleasePreviousRecord(prefetcher->reader);
+
+ /* If there's nothing queued yet, then start prefetching. */
+ if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
+ lrq_prefetch(prefetcher->streaming_read);
+
+ /* Read the next record. */
+ record = XLogNextRecord(prefetcher->reader, errmsg);
+ if (!record)
+ return NULL;
+
+ /*
+ * The record we just got is the "current" one, for the benefit of the
+ * XLogRecXXX() macros.
+ */
+ Assert(record == prefetcher->reader->record);
+
+ /*
+ * Can we drop any prefetch filters yet, given the record we're about to
+ * return? This assumes that any records with earlier LSNs have been
+ * replayed, so if we were waiting for a relation to be created or
+ * extended, it is now OK to access blocks in the covered range.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
+
+ /*
+ * See if it's time to compute some statistics, because enough WAL has
+ * been processed.
+ */
+ if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
+ XLogPrefetcherComputeStats(prefetcher);
+
+ /*
+ * The caller is about to replay this record, so we can now report that
+ * all IO initiated because of early WAL must be finished. This may
+ * trigger more readahead.
+ */
+ lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
+
+ Assert(record == prefetcher->reader->record);
+
+ return &record->header;
+}
+
+bool
+check_recovery_prefetch(int *new_value, void **extra, GucSource source)
+{
+#ifndef USE_PREFETCH
+ if (*new_value == RECOVERY_PREFETCH_ON)
+ {
+ GUC_check_errdetail("recovery_prefetch not supported on platforms that lack posix_fadvise().");
+ return false;
+ }
+#endif
+
+ return true;
+}
+
+void
+assign_recovery_prefetch(int new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e612aa933a..5862d9dc75 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1727,6 +1727,8 @@ DecodeXLogRecord(XLogReaderState *state,
blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+ blk->prefetch_buffer = InvalidBuffer;
+
COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
if (blk->has_data && blk->data_len == 0)
@@ -1925,14 +1927,29 @@ err:
/*
* Returns information about the block that a block reference refers to.
- *
- * If the WAL record contains a block reference with the given ID, *rnode,
- * *forknum, and *blknum are filled in (if not NULL), and returns true.
- * Otherwise returns false.
+ * See XLogRecGetBlockTagExtended().
*/
bool
XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
+{
+ return XLogRecGetBlockTagExtended(record, block_id, rnode, forknum, blknum,
+ NULL);
+}
+
+/*
+ * Returns information about the block that a block reference refers to,
+ * optionally including the buffer that the block may already be in.
+ *
+ * If the WAL record contains a block reference with the given ID, *rnode,
+ * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and
+ * returns true. Otherwise returns false.
+ */
+bool
+XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum,
+ BlockNumber *blknum,
+ Buffer *prefetch_buffer)
{
DecodedBkpBlock *bkpb;
@@ -1947,6 +1964,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
*forknum = bkpb->forknum;
if (blknum)
*blknum = bkpb->blkno;
+ if (prefetch_buffer)
+ *prefetch_buffer = bkpb->prefetch_buffer;
return true;
}
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 79d38a837c..54fd10475a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -36,6 +36,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
@@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply;
/* XLogReader object used to parse the WAL records */
static XLogReaderState *xlogreader = NULL;
+/* XLogPrefetcher object used to consume WAL records with read-ahead */
+static XLogPrefetcher *xlogprefetcher = NULL;
+
/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
typedef struct XLogPageReadPrivate
{
@@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
- int emode, bool fetching_ckpt, TimeLineID replayTLI);
+static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
+ int emode, bool fetching_ckpt,
+ TimeLineID replayTLI);
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
-static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt,
- XLogRecPtr tliRecPtr,
- TimeLineID replayTLI,
- XLogRecPtr replayLSN);
+static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
+ bool randAccess,
+ bool fetching_ckpt,
+ XLogRecPtr tliRecPtr,
+ TimeLineID replayTLI,
+ XLogRecPtr replayLSN,
+ bool nonblocking);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
-static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
int whichChkpt, bool report, TimeLineID replayTLI);
static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
@@ -561,6 +568,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
errdetail("Failed while allocating a WAL reading processor.")));
xlogreader->system_identifier = ControlFile->system_identifier;
+ /*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /* Create a WAL prefetcher. */
+ xlogprefetcher = XLogPrefetcherAllocate(xlogreader);
+
/*
* Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons:
@@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
- record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 0, true, CheckPointTLI);
+ record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 0, true,
+ CheckPointTLI);
if (record != NULL)
{
memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
@@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
*/
if (checkPoint.redo < CheckPointLoc)
{
- XLogBeginRead(xlogreader, checkPoint.redo);
- if (!ReadRecord(xlogreader, LOG, false,
+ XLogPrefetcherBeginRead(xlogprefetcher, checkPoint.redo);
+ if (!ReadRecord(xlogprefetcher, LOG, false,
checkPoint.ThisTimeLineID))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
@@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID;
RedoStartLSN = ControlFile->checkPointCopy.redo;
RedoStartTLI = ControlFile->checkPointCopy.ThisTimeLineID;
- record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 1, true,
+ record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 1, true,
CheckPointTLI);
if (record != NULL)
{
@@ -1413,8 +1430,8 @@ FinishWalRecovery(void)
lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
}
- XLogBeginRead(xlogreader, lastRec);
- (void) ReadRecord(xlogreader, PANIC, false, lastRecTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
+ (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
endOfLog = xlogreader->EndRecPtr;
/*
@@ -1503,6 +1520,9 @@ ShutdownWalRecovery(void)
{
char recoveryPath[MAXPGPATH];
+ /* Final update of pg_stat_recovery_prefetch. */
+ XLogPrefetcherComputeStats(xlogprefetcher);
+
/* Shut down xlogreader */
if (readFile >= 0)
{
@@ -1510,6 +1530,7 @@ ShutdownWalRecovery(void)
readFile = -1;
}
XLogReaderFree(xlogreader);
+ XLogPrefetcherFree(xlogprefetcher);
if (ArchiveRecoveryRequested)
{
@@ -1593,15 +1614,15 @@ PerformWalRecovery(void)
{
/* back up to find the record */
replayTLI = RedoStartTLI;
- XLogBeginRead(xlogreader, RedoStartLSN);
- record = ReadRecord(xlogreader, PANIC, false, replayTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, RedoStartLSN);
+ record = ReadRecord(xlogprefetcher, PANIC, false, replayTLI);
}
else
{
/* just have to read next record after CheckPoint */
Assert(xlogreader->ReadRecPtr == CheckPointLoc);
replayTLI = CheckPointTLI;
- record = ReadRecord(xlogreader, LOG, false, replayTLI);
+ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
}
if (record != NULL)
@@ -1710,7 +1731,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogreader, LOG, false, replayTLI);
+ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
/*
@@ -1921,6 +1942,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
*/
if (AllowCascadeReplication())
WalSndWakeup();
+
+ /* Reset the prefetcher. */
+ XLogPrefetchReconfigure();
}
}
@@ -2305,7 +2329,8 @@ verifyBackupPageConsistency(XLogReaderState *record)
* temporary page.
*/
buf = XLogReadBufferExtended(rnode, forknum, blkno,
- RBM_NORMAL_NO_LOG);
+ RBM_NORMAL_NO_LOG,
+ InvalidBuffer);
if (!BufferIsValid(buf))
continue;
@@ -2917,17 +2942,18 @@ ConfirmRecoveryPaused(void)
* Attempt to read the next XLOG record.
*
* Before first call, the reader needs to be positioned to the first record
- * by calling XLogBeginRead().
+ * by calling XLogPrefetcherBeginRead().
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, int emode,
+ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
bool fetching_ckpt, TimeLineID replayTLI)
{
XLogRecord *record;
+ XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
/* Pass through parameters to XLogPageRead */
@@ -2943,7 +2969,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
{
char *errormsg;
- record = XLogReadRecord(xlogreader, &errormsg);
+ record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
if (record == NULL)
{
/*
@@ -3056,9 +3082,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
/*
* Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns number of bytes read, if the page is read successfully, or -1
- * in case of errors. When errors occur, they are ereport'ed, but only
- * if they have not been previously reported.
+ * Returns number of bytes read, if the page is read successfully, or
+ * XLREAD_FAIL in case of errors. When errors occur, they are ereport'ed, but
+ * only if they have not been previously reported.
+ *
+ * While prefetching, xlogreader->nonblocking may be set. In that case,
+ * returns XLREAD_WOULDBLOCK if we'd otherwise have to wait for more WAL.
*
* This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode.
@@ -3066,7 +3095,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
* 'emode' specifies the log level used for reporting "file not found" or
* "end of WAL" situations in archive recovery, or in standby mode when a
* trigger file is found. If set to WARNING or below, XLogPageRead() returns
- * false in those situations, on higher log levels the ereport() won't
+ * XLREAD_FAIL in those situations, on higher log levels the ereport() won't
* return.
*
* In standby mode, if after a successful return of XLogPageRead() the
@@ -3125,20 +3154,31 @@ retry:
(readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen))
{
- if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
- private->randAccess,
- private->fetching_ckpt,
- targetRecPtr,
- private->replayTLI,
- xlogreader->EndRecPtr))
- {
- if (readFile >= 0)
- close(readFile);
- readFile = -1;
- readLen = 0;
- readSource = XLOG_FROM_ANY;
+ if (readFile >= 0 &&
+ xlogreader->nonblocking &&
+ readSource == XLOG_FROM_STREAM &&
+ flushedUpto < targetPagePtr + reqLen)
+ return XLREAD_WOULDBLOCK;
- return -1;
+ switch (WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+ private->randAccess,
+ private->fetching_ckpt,
+ targetRecPtr,
+ private->replayTLI,
+ xlogreader->EndRecPtr,
+ xlogreader->nonblocking))
+ {
+ case XLREAD_WOULDBLOCK:
+ return XLREAD_WOULDBLOCK;
+ case XLREAD_FAIL:
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readLen = 0;
+ readSource = XLOG_FROM_ANY;
+ return XLREAD_FAIL;
+ case XLREAD_SUCCESS:
+ break;
}
}
@@ -3263,7 +3303,7 @@ next_record_is_invalid:
if (StandbyMode)
goto retry;
else
- return -1;
+ return XLREAD_FAIL;
}
/*
@@ -3292,14 +3332,18 @@ next_record_is_invalid:
* available.
*
* When the requested record becomes available, the function opens the file
- * containing it (if not open already), and returns true. When end of standby
- * mode is triggered by the user, and there is no more WAL available, returns
- * false.
+ * containing it (if not open already), and returns XLREAD_SUCCESS. When end
+ * of standby mode is triggered by the user, and there is no more WAL
+ * available, returns XLREAD_FAIL.
+ *
+ * If nonblocking is true, then give up immediately if we can't satisfy the
+ * request, returning XLREAD_WOULDBLOCK instead of waiting.
*/
-static bool
+static XLogPageReadResult
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr,
- TimeLineID replayTLI, XLogRecPtr replayLSN)
+ TimeLineID replayTLI, XLogRecPtr replayLSN,
+ bool nonblocking)
{
static TimestampTz last_fail_time = 0;
TimestampTz now;
@@ -3353,6 +3397,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
if (lastSourceFailed)
{
+ /*
+ * Don't allow any retry loops to occur during nonblocking
+ * readahead. Let the caller process everything that has been
+ * decoded already first.
+ */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
@@ -3367,7 +3419,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (StandbyMode && CheckForStandbyTrigger())
{
XLogShutdownWalRcv();
- return false;
+ return XLREAD_FAIL;
}
/*
@@ -3375,7 +3427,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* and pg_wal.
*/
if (!StandbyMode)
- return false;
+ return XLREAD_FAIL;
/*
* Move to XLOG_FROM_STREAM state, and set to start a
@@ -3519,7 +3571,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
currentSource);
if (readFile >= 0)
- return true; /* success! */
+ return XLREAD_SUCCESS; /* success! */
/*
* Nope, not found in archive or pg_wal.
@@ -3674,11 +3726,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM;
- return true;
+ return XLREAD_SUCCESS;
}
break;
}
+ /* In nonblocking mode, return rather than sleeping. */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
/*
* Data not here yet. Check for trigger, then wait for
* walreceiver to wake us up when new WAL arrives.
@@ -3686,13 +3742,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (CheckForStandbyTrigger())
{
/*
- * Note that we don't "return false" immediately here.
- * After being triggered, we still want to replay all
- * the WAL that was already streamed. It's in pg_wal
- * now, so we just treat this as a failure, and the
- * state machine will move on to replay the streamed
- * WAL from pg_wal, and then recheck the trigger and
- * exit replay.
+ * Note that we don't return XLREAD_FAIL immediately
+ * here. After being triggered, we still want to
+ * replay all the WAL that was already streamed. It's
+ * in pg_wal now, so we just treat this as a failure,
+ * and the state machine will move on to replay the
+ * streamed WAL from pg_wal, and then recheck the
+ * trigger and exit replay.
*/
lastSourceFailed = true;
break;
@@ -3711,6 +3767,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
streaming_reply_sent = true;
}
+ /* Update pg_stat_recovery_prefetch before sleeping. */
+ XLogPrefetcherComputeStats(xlogprefetcher);
+
/*
* Wait for more WAL to arrive. Time out after 5 seconds
* to react to a trigger file promptly and to check if the
@@ -3743,7 +3802,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
HandleStartupProcInterrupts();
}
- return false; /* not reached */
+ return XLREAD_FAIL; /* not reached */
}
@@ -3788,7 +3847,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
* 1 for "primary", 0 for "other" (backup_label)
*/
static XLogRecord *
-ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
int whichChkpt, bool report, TimeLineID replayTLI)
{
XLogRecord *record;
@@ -3815,8 +3874,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
return NULL;
}
- XLogBeginRead(xlogreader, RecPtr);
- record = ReadRecord(xlogreader, LOG, true, replayTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, RecPtr);
+ record = ReadRecord(xlogprefetcher, LOG, true, replayTLI);
if (record == NULL)
{
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index a4dedc58b7..bb2d3ec991 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -22,6 +22,7 @@
#include "access/timeline.h"
#include "access/xlogrecovery.h"
#include "access/xlog_internal.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -355,11 +356,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
+ Buffer prefetch_buffer;
Page page;
bool zeromode;
bool willinit;
- if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+ if (!XLogRecGetBlockTagExtended(record, block_id, &rnode, &forknum, &blkno,
+ &prefetch_buffer))
{
/* Caller specified a bogus block_id */
elog(PANIC, "failed to locate backup block with ID %d", block_id);
@@ -381,7 +384,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
{
Assert(XLogRecHasBlockImage(record, block_id));
*buf = XLogReadBufferExtended(rnode, forknum, blkno,
- get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
+ get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
+ prefetch_buffer);
page = BufferGetPage(*buf);
if (!RestoreBlockImage(record, block_id, page))
elog(ERROR, "failed to restore block image");
@@ -410,7 +414,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
}
else
{
- *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
+ *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode, prefetch_buffer);
if (BufferIsValid(*buf))
{
if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
@@ -450,6 +454,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
* exist, and we don't check for all-zeroes. Thus, no log entry is made
* to imply that the page should be dropped or truncated later.
*
+ * Optionally, recent_buffer can be used to provide a hint about the location
+ * of the page in the buffer pool; it does not have to be correct, but avoids
+ * a buffer mapping table probe if it is.
+ *
* NB: A redo function should normally not call this directly. To get a page
* to modify, use XLogReadBufferForRedoExtended instead. It is important that
* all pages modified by a WAL record are registered in the WAL records, or
@@ -457,7 +465,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
*/
Buffer
XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
- BlockNumber blkno, ReadBufferMode mode)
+ BlockNumber blkno, ReadBufferMode mode,
+ Buffer recent_buffer)
{
BlockNumber lastblock;
Buffer buffer;
@@ -465,6 +474,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
Assert(blkno != P_NEW);
+ /* Do we have a clue where the buffer might be already? */
+ if (BufferIsValid(recent_buffer) &&
+ mode == RBM_NORMAL &&
+ ReadRecentBuffer(rnode, forknum, blkno, recent_buffer))
+ {
+ buffer = recent_buffer;
+ goto recent_buffer_fast_path;
+ }
+
/* Open the relation at smgr level */
smgr = smgropen(rnode, InvalidBackendId);
@@ -523,6 +541,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
}
}
+recent_buffer_fast_path:
if (mode == RBM_NORMAL)
{
/* check that page has been initialized */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e701d1c676..b1a6df16ad 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -930,6 +930,20 @@ CREATE VIEW pg_stat_wal_receiver AS
FROM pg_stat_get_wal_receiver() s
WHERE s.pid IS NOT NULL;
+CREATE VIEW pg_stat_recovery_prefetch AS
+ SELECT
+ s.stats_reset,
+ s.prefetch,
+ s.hit,
+ s.skip_init,
+ s.skip_new,
+ s.skip_fpw,
+ s.skip_rep,
+ s.wal_distance,
+ s.block_distance,
+ s.io_depth
+ FROM pg_stat_get_recovery_prefetch() s;
+
CREATE VIEW pg_stat_subscription AS
SELECT
su.oid AS subid,
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f80f90ac3c..93c1ea2d9f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -649,6 +649,8 @@ ReadRecentBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum,
pg_atomic_write_u32(&bufHdr->state,
buf_state + BUF_USAGECOUNT_ONE);
+ pgBufferUsage.local_blks_hit++;
+
return true;
}
}
@@ -680,6 +682,8 @@ ReadRecentBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum,
else
PinBuffer_Locked(bufHdr); /* pin for first time */
+ pgBufferUsage.shared_blks_hit++;
+
return true;
}
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 78c073b7c9..d41ae37090 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -211,7 +211,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
blkno = fsm_logical_to_physical(addr);
/* If the page doesn't exist already, extend */
- buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
+ buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR,
+ InvalidBuffer);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 88ff59c568..75e456360b 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/syncscan.h"
#include "access/twophase.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
#include "miscadmin.h"
@@ -119,6 +120,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, LockShmemSize());
size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
+ size = add_size(size, XLogPrefetchShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, XLogRecoveryShmemSize());
size = add_size(size, CLOGShmemSize());
@@ -244,6 +246,7 @@ CreateSharedMemoryAndSemaphores(void)
* Set up xlog, clog, and buffers
*/
XLOGShmemInit();
+ XLogPrefetchShmemInit();
XLogRecoveryShmemInit();
CLOGShmemInit();
CommitTsShmemInit();
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 879f647dbc..286dd3f755 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -162,9 +162,11 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
{
/*
* Close it first, to ensure that we notice if the fork has been unlinked
- * since we opened it.
+ * since we opened it. As an optimization, we can skip that in recovery,
+ * which already closes relations when dropping them.
*/
- mdclose(reln, forkNum);
+ if (!InRecovery)
+ mdclose(reln, forkNum);
return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2bf8ab8f98..d3ad795a6e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -16,6 +16,7 @@
#include "access/htup_details.h"
#include "access/xlog.h"
+#include "access/xlogprefetcher.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
@@ -2103,13 +2104,15 @@ pg_stat_reset_shared(PG_FUNCTION_ARGS)
pgstat_reset_of_kind(PGSTAT_KIND_BGWRITER);
pgstat_reset_of_kind(PGSTAT_KIND_CHECKPOINTER);
}
+ else if (strcmp(target, "recovery_prefetch") == 0)
+ XLogPrefetchResetStats();
else if (strcmp(target, "wal") == 0)
pgstat_reset_of_kind(PGSTAT_KIND_WAL);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized reset target: \"%s\"", target),
- errhint("Target must be \"archiver\", \"bgwriter\", or \"wal\".")));
+ errhint("Target must be \"archiver\", \"bgwriter\", \"recovery_prefetch\", or \"wal\".")));
PG_RETURN_VOID();
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 89f8259bac..22b5571a70 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -41,6 +41,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
@@ -217,6 +218,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource
static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
static bool check_huge_page_size(int *newval, void **extra, GucSource source);
static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
static bool check_application_name(char **newval, void **extra, GucSource source);
static void assign_application_name(const char *newval, void *extra);
static bool check_cluster_name(char **newval, void **extra, GucSource source);
@@ -495,6 +497,19 @@ static const struct config_enum_entry huge_pages_options[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry recovery_prefetch_options[] = {
+ {"off", RECOVERY_PREFETCH_OFF, false},
+ {"on", RECOVERY_PREFETCH_ON, false},
+ {"try", RECOVERY_PREFETCH_TRY, false},
+ {"true", RECOVERY_PREFETCH_ON, true},
+ {"false", RECOVERY_PREFETCH_OFF, true},
+ {"yes", RECOVERY_PREFETCH_ON, true},
+ {"no", RECOVERY_PREFETCH_OFF, true},
+ {"1", RECOVERY_PREFETCH_ON, true},
+ {"0", RECOVERY_PREFETCH_OFF, true},
+ {NULL, 0, false}
+};
+
static const struct config_enum_entry force_parallel_mode_options[] = {
{"off", FORCE_PARALLEL_OFF, false},
{"on", FORCE_PARALLEL_ON, false},
@@ -785,6 +800,8 @@ const char *const config_group_names[] =
gettext_noop("Write-Ahead Log / Checkpoints"),
/* WAL_ARCHIVING */
gettext_noop("Write-Ahead Log / Archiving"),
+ /* WAL_RECOVERY */
+ gettext_noop("Write-Ahead Log / Recovery"),
/* WAL_ARCHIVE_RECOVERY */
gettext_noop("Write-Ahead Log / Archive Recovery"),
/* WAL_RECOVERY_TARGET */
@@ -2818,6 +2835,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"wal_decode_buffer_size", PGC_POSTMASTER, WAL_RECOVERY,
+ gettext_noop("Maximum buffer size for reading ahead in the WAL during recovery."),
+ gettext_noop("This controls the maximum distance we can read ahead in the WAL to prefetch referenced blocks."),
+ GUC_UNIT_BYTE
+ },
+ &wal_decode_buffer_size,
+ 512 * 1024, 64 * 1024, MaxAllocSize,
+ NULL, NULL, NULL
+ },
+
{
{"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the size of WAL files held for standby servers."),
@@ -3141,7 +3169,8 @@ static struct config_int ConfigureNamesInt[] =
0,
#endif
0, MAX_IO_CONCURRENCY,
- check_maintenance_io_concurrency, NULL, NULL
+ check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+ NULL
},
{
@@ -5013,6 +5042,16 @@ static struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
+ {
+ {"recovery_prefetch", PGC_SIGHUP, WAL_RECOVERY,
+ gettext_noop("Prefetch referenced blocks during recovery"),
+ gettext_noop("Look ahead in the WAL to find references to uncached data.")
+ },
+ &recovery_prefetch,
+ RECOVERY_PREFETCH_TRY, recovery_prefetch_options,
+ check_recovery_prefetch, assign_recovery_prefetch, NULL
+ },
+
{
{"force_parallel_mode", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Forces use of parallel query facilities."),
@@ -12422,6 +12461,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour
return true;
}
+static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+ /*
+ * Reconfigure recovery prefetching, because a setting it depends on
+ * changed.
+ */
+ maintenance_io_concurrency = newval;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+#endif
+}
+
static bool
check_application_name(char **newval, void **extra, GucSource source)
{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e75b7d63ea..94270eb0ec 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,12 @@
#max_wal_size = 1GB
#min_wal_size = 80MB
+# - Prefetching during recovery -
+
+#recovery_prefetch = try # prefetch pages referenced in the WAL?
+#wal_decode_buffer_size = 512kB # lookahead window used for prefetching
+ # (change requires restart)
+
# - Archiving -
#archive_mode = off # enables archiving; off, on, or always
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index b81917f243..e302bd102c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -50,6 +50,7 @@ extern bool *wal_consistency_checking;
extern char *wal_consistency_checking_string;
extern bool log_checkpoints;
extern bool track_wal_io_timing;
+extern int wal_decode_buffer_size;
extern int CheckPointSegments;
diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
new file mode 100644
index 0000000000..c30b09b727
--- /dev/null
+++ b/src/include/access/xlogprefetcher.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetcher.h
+ * Declarations for the recovery prefetching module.
+ *
+ * Portions Copyright (c) 2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/access/xlogprefetcher.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCHER_H
+#define XLOGPREFETCHER_H
+
+#include "access/xlogdefs.h"
+
+/* GUCs */
+extern int recovery_prefetch;
+
+/* Possible values for recovery_prefetch */
+typedef enum
+{
+ RECOVERY_PREFETCH_OFF,
+ RECOVERY_PREFETCH_ON,
+ RECOVERY_PREFETCH_TRY
+} RecoveryPrefetchValue;
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+
+extern void XLogPrefetchReconfigure(void);
+
+extern size_t XLogPrefetchShmemSize(void);
+extern void XLogPrefetchShmemInit(void);
+
+extern void XLogPrefetchResetStats(void);
+
+extern XLogPrefetcher *XLogPrefetcherAllocate(XLogReaderState *reader);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+
+extern XLogReaderState *XLogPrefetcherGetReader(XLogPrefetcher *prefetcher);
+
+extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher,
+ XLogRecPtr recPtr);
+
+extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher,
+ char **errmsg);
+
+extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher);
+
+#endif
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f4388cc9be..d8eb857611 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -39,6 +39,7 @@
#endif
#include "access/xlogrecord.h"
+#include "storage/buf.h"
/* WALOpenSegment represents a WAL segment being read. */
typedef struct WALOpenSegment
@@ -125,6 +126,9 @@ typedef struct
ForkNumber forknum;
BlockNumber blkno;
+ /* Prefetching workspace. */
+ Buffer prefetch_buffer;
+
/* copy of the fork_flags field from the XLogRecordBlockHeader */
uint8 flags;
@@ -430,5 +434,9 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *
extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum);
+extern bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum,
+ BlockNumber *blknum,
+ Buffer *prefetch_buffer);
#endif /* XLOGREADER_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 64708949db..ff40f96e42 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -84,7 +84,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
Buffer *buf);
extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
- BlockNumber blkno, ReadBufferMode mode);
+ BlockNumber blkno, ReadBufferMode mode,
+ Buffer recent_buffer);
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index e133113543..67f3d8526c 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202204073
+#define CATALOG_VERSION_NO 202204074
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 776e31f3b5..61876c4e80 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5654,6 +5654,13 @@
proargmodes => '{o,o,o,o,o,o,o,o,o}',
proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}',
prosrc => 'pg_stat_get_wal' },
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+ proname => 'pg_stat_get_recovery_prefetch', prorows => '1', provolatile => 'v',
+ proretset => 't', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int8,int4,int4,int4}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{stats_reset,prefetch,hit,skip_init,skip_new,skip_fpw,skip_rep,wal_distance,block_distance,io_depth}',
+ prosrc => 'pg_stat_get_recovery_prefetch' },
{ oid => '2306', descr => 'statistics: information about SLRU caches',
proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 74018ea27b..1189e1a226 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -453,4 +453,8 @@ extern void assign_search_path(const char *newval, void *extra);
extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
extern void assign_xlog_sync_method(int new_sync_method, void *extra);
+/* in access/transam/xlogprefetcher.c */
+extern bool check_recovery_prefetch(int *new_value, void **extra, GucSource source);
+extern void assign_recovery_prefetch(int new_value, void *extra);
+
#endif /* GUC_H */
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 1c5b3930a9..63b56f18e0 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -67,6 +67,7 @@ enum config_group
WAL_SETTINGS,
WAL_CHECKPOINTS,
WAL_ARCHIVING,
+ WAL_RECOVERY,
WAL_ARCHIVE_RECOVERY,
WAL_RECOVERY_TARGET,
REPLICATION_SENDING,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 423b9b99fb..db652ea8d8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2019,6 +2019,17 @@ pg_stat_progress_vacuum| SELECT s.pid,
s.param7 AS num_dead_tuples
FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_recovery_prefetch| SELECT s.stats_reset,
+ s.prefetch,
+ s.hit,
+ s.skip_init,
+ s.skip_new,
+ s.skip_fpw,
+ s.skip_rep,
+ s.wal_distance,
+ s.block_distance,
+ s.io_depth
+ FROM pg_stat_get_recovery_prefetch() s(stats_reset, prefetch, hit, skip_init, skip_new, skip_fpw, skip_rep, wal_distance, block_distance, io_depth);
pg_stat_replication| SELECT s.pid,
s.usesysid,
u.rolname AS usename,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 566ecbf091..be3fafadf8 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1421,6 +1421,9 @@ LogicalRepWorker
LogicalRewriteMappingData
LogicalTape
LogicalTapeSet
+LsnReadQueue
+LsnReadQueueNextFun
+LsnReadQueueNextStatus
LtreeGistOptions
LtreeSignature
MAGIC
@@ -2949,6 +2952,9 @@ XLogPageHeaderData
XLogPageReadCB
XLogPageReadPrivate
XLogPageReadResult
+XLogPrefetcher
+XLogPrefetcherFilter
+XLogPrefetchStats
XLogReaderRoutine
XLogReaderState
XLogRecData