diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 44cada2b40..ee98585027 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4150,6 +4150,67 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + WAL Summarization + + + + + + summarize_wal (boolean) + + summarize_wal configuration parameter + + + + + Enables the WAL summarizer process. Note that WAL summarization can + be enabled either on a primary or on a standby. WAL summarization + cannot be enabled when wal_level is set to + minimal. This parameter can only be set in the + postgresql.conf file or on the server command line. + The default is off. + + + + + + wal_summary_keep_time (boolean) + + wal_summary_keep_time configuration parameter + + + + + Configures the amount of time after which the WAL summarizer + automatically removes old WAL summaries. The file timestamp is used to + determine which files are old enough to remove. Typically, you should set + this comfortably higher than the time that could pass between a backup + and a later incremental backup that depends on it. WAL summaries must + be available for the entire range of WAL records between the preceding + backup and the new one being taken; if not, the incremental backup will + fail. If this parameter is set to zero, WAL summaries will not be + automatically deleted, but it is safe to manually remove files that you + know will not be required for future incremental backups. + This parameter can only be set in the + postgresql.conf file or on the server command line. + The default is 10 days. If summarize_wal = off, + existing WAL summaries will not be removed regardless of the value of + this parameter, because the WAL summarizer will not run. + + + + + + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 56e4d6fb02..1e9019156a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -77,6 +77,7 @@ #include "port/pg_iovec.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logical.h" #include "replication/origin.h" @@ -3592,6 +3593,43 @@ XLogGetLastRemovedSegno(void) return lastRemovedSegNo; } +/* + * Return the oldest WAL segment on the given TLI that still exists in + * XLOGDIR, or 0 if none. + */ +XLogSegNo +XLogGetOldestSegno(TimeLineID tli) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo oldest_segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID file_tli; + XLogSegNo file_segno; + + /* Ignore files that are not XLOG segments. */ + if (!IsXLogFileName(xlde->d_name)) + continue; + + /* Parse filename to get TLI and segno. */ + XLogFromFileName(xlde->d_name, &file_tli, &file_segno, + wal_segment_size); + + /* Ignore anything that's not from the TLI of interest. */ + if (tli != file_tli) + continue; + + /* If it's the oldest so far, update oldest_segno. */ + if (oldest_segno == 0 || file_segno < oldest_segno) + oldest_segno = file_segno; + } + + FreeDir(xldir); + return oldest_segno; +} /* * Update the last removed segno pointer in shared memory, to reflect that the @@ -3872,8 +3910,8 @@ RemoveXlogFile(const struct dirent *segment_de, } /* - * Verify whether pg_wal and pg_wal/archive_status exist. - * If the latter does not exist, recreate it. + * Verify whether pg_wal, pg_wal/archive_status, and pg_wal/summaries exist. + * If the latter do not exist, recreate them. * * It is not the goal of this function to verify the contents of these * directories, but to help in cases where someone has performed a cluster @@ -3916,6 +3954,26 @@ ValidateXLOGDirectoryStructure(void) (errmsg("could not create missing directory \"%s\": %m", path))); } + + /* Check for summaries */ + snprintf(path, MAXPGPATH, XLOGDIR "/summaries"); + if (stat(path, &stat_buf) == 0) + { + /* Check for weird cases where it exists but isn't a directory */ + if (!S_ISDIR(stat_buf.st_mode)) + ereport(FATAL, + (errmsg("required WAL directory \"%s\" does not exist", + path))); + } + else + { + ereport(LOG, + (errmsg("creating missing WAL directory \"%s\"", path))); + if (MakePGDirectory(path) < 0) + ereport(FATAL, + (errmsg("could not create missing directory \"%s\": %m", + path))); + } } /* @@ -5243,9 +5301,9 @@ StartupXLOG(void) #endif /* - * Verify that pg_wal and pg_wal/archive_status exist. In cases where - * someone has performed a copy for PITR, these directories may have been - * excluded and need to be re-created. + * Verify that pg_wal, pg_wal/archive_status, and pg_wal/summaries exist. + * In cases where someone has performed a copy for PITR, these directories + * may have been excluded and need to be re-created. */ ValidateXLOGDirectoryStructure(); @@ -6962,6 +7020,25 @@ CreateCheckPoint(int flags) */ END_CRIT_SECTION(); + /* + * WAL summaries end when the next XLOG_CHECKPOINT_REDO or + * XLOG_CHECKPOINT_SHUTDOWN record is reached. This is the first point + * where (a) we're not inside of a critical section and (b) we can be + * certain that the relevant record has been flushed to disk, which must + * happen before it can be summarized. + * + * If this is a shutdown checkpoint, then this happens reasonably + * promptly: we've only just inserted and flushed the + * XLOG_CHECKPOINT_SHUTDOWN record. If this is not a shutdown checkpoint, + * then this might not be very prompt at all: the XLOG_CHECKPOINT_REDO + * record was written before we began flushing data to disk, and that + * could be many minutes ago at this point. However, we don't XLogFlush() + * after inserting that record, so we're not guaranteed that it's on disk + * until after the above call that flushes the XLOG_CHECKPOINT_ONLINE + * record. + */ + SetWalSummarizerLatch(); + /* * Let smgr do post-checkpoint cleanup (eg, deleting old files). */ @@ -7636,6 +7713,20 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) } } + /* + * If WAL summarization is in use, don't remove WAL that has yet to be + * summarized. + */ + keep = GetOldestUnsummarizedLSN(NULL, NULL, false); + if (keep != InvalidXLogRecPtr) + { + XLogSegNo unsummarized_segno; + + XLByteToSeg(keep, unsummarized_segno, wal_segment_size); + if (unsummarized_segno < segno) + segno = unsummarized_segno; + } + /* but, keep at least wal_keep_size if that's set */ if (wal_keep_size_mb > 0) { diff --git a/src/backend/backup/Makefile b/src/backend/backup/Makefile index b21bd8ff43..a67b3c58d4 100644 --- a/src/backend/backup/Makefile +++ b/src/backend/backup/Makefile @@ -25,6 +25,8 @@ OBJS = \ basebackup_server.o \ basebackup_sink.o \ basebackup_target.o \ - basebackup_throttle.o + basebackup_throttle.o \ + walsummary.o \ + walsummaryfuncs.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/backup/meson.build b/src/backend/backup/meson.build index 11a79bbf80..5d4ebe3ebe 100644 --- a/src/backend/backup/meson.build +++ b/src/backend/backup/meson.build @@ -12,4 +12,6 @@ backend_sources += files( 'basebackup_target.c', 'basebackup_throttle.c', 'basebackup_zstd.c', + 'walsummary.c', + 'walsummaryfuncs.c', ) diff --git a/src/backend/backup/walsummary.c b/src/backend/backup/walsummary.c new file mode 100644 index 0000000000..271d199874 --- /dev/null +++ b/src/backend/backup/walsummary.c @@ -0,0 +1,356 @@ +/*------------------------------------------------------------------------- + * + * walsummary.c + * Functions for accessing and managing WAL summary data. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/backend/backup/walsummary.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "access/xlog_internal.h" +#include "backup/walsummary.h" +#include "utils/wait_event.h" + +static bool IsWalSummaryFilename(char *filename); +static int ListComparatorForWalSummaryFiles(const ListCell *a, + const ListCell *b); + +/* + * Get a list of WAL summaries. + * + * If tli != 0, only WAL summaries with the indicated TLI will be included. + * + * If start_lsn != InvalidXLogRecPtr, only summaries that end after the + * indicated LSN will be included. + * + * If end_lsn != InvalidXLogRecPtr, only summaries that start before the + * indicated LSN will be included. + * + * The intent is that you can call GetWalSummaries(tli, start_lsn, end_lsn) + * to get all WAL summaries on the indicated timeline that overlap the + * specified LSN range. + */ +List * +GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + DIR *sdir; + struct dirent *dent; + List *result = NIL; + + sdir = AllocateDir(XLOGDIR "/summaries"); + while ((dent = ReadDir(sdir, XLOGDIR "/summaries")) != NULL) + { + WalSummaryFile *ws; + uint32 tmp[5]; + TimeLineID file_tli; + XLogRecPtr file_start_lsn; + XLogRecPtr file_end_lsn; + + /* Decode filename, or skip if it's not in the expected format. */ + if (!IsWalSummaryFilename(dent->d_name)) + continue; + sscanf(dent->d_name, "%08X%08X%08X%08X%08X", + &tmp[0], &tmp[1], &tmp[2], &tmp[3], &tmp[4]); + file_tli = tmp[0]; + file_start_lsn = ((uint64) tmp[1]) << 32 | tmp[2]; + file_end_lsn = ((uint64) tmp[3]) << 32 | tmp[4]; + + /* Skip if it doesn't match the filter criteria. */ + if (tli != 0 && tli != file_tli) + continue; + if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn >= file_end_lsn) + continue; + if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn <= file_start_lsn) + continue; + + /* Add it to the list. */ + ws = palloc(sizeof(WalSummaryFile)); + ws->tli = file_tli; + ws->start_lsn = file_start_lsn; + ws->end_lsn = file_end_lsn; + result = lappend(result, ws); + } + FreeDir(sdir); + + return result; +} + +/* + * Build a new list of WAL summaries based on an existing list, but filtering + * out summaries that don't match the search parameters. + * + * If tli != 0, only WAL summaries with the indicated TLI will be included. + * + * If start_lsn != InvalidXLogRecPtr, only summaries that end after the + * indicated LSN will be included. + * + * If end_lsn != InvalidXLogRecPtr, only summaries that start before the + * indicated LSN will be included. + */ +List * +FilterWalSummaries(List *wslist, TimeLineID tli, + XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + List *result = NIL; + ListCell *lc; + + /* Loop over input. */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + /* Skip if it doesn't match the filter criteria. */ + if (tli != 0 && tli != ws->tli) + continue; + if (!XLogRecPtrIsInvalid(start_lsn) && start_lsn > ws->end_lsn) + continue; + if (!XLogRecPtrIsInvalid(end_lsn) && end_lsn < ws->start_lsn) + continue; + + /* Add it to the result list. */ + result = lappend(result, ws); + } + + return result; +} + +/* + * Check whether the supplied list of WalSummaryFile objects covers the + * whole range of LSNs from start_lsn to end_lsn. This function ignores + * timelines, so the caller should probably filter using the appropriate + * timeline before calling this. + * + * If the whole range of LSNs is covered, returns true, otherwise false. + * If false is returned, *missing_lsn is set either to InvalidXLogRecPtr + * if there are no WAL summary files in the input list, or to the first LSN + * in the range that is not covered by a WAL summary file in the input list. + */ +bool +WalSummariesAreComplete(List *wslist, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, XLogRecPtr *missing_lsn) +{ + XLogRecPtr current_lsn = start_lsn; + ListCell *lc; + + /* Special case for empty list. */ + if (wslist == NIL) + { + *missing_lsn = InvalidXLogRecPtr; + return false; + } + + /* Make a private copy of the list and sort it by start LSN. */ + wslist = list_copy(wslist); + list_sort(wslist, ListComparatorForWalSummaryFiles); + + /* + * Consider summary files in order of increasing start_lsn, advancing the + * known-summarized range from start_lsn toward end_lsn. + * + * Normally, the summary files should cover non-overlapping WAL ranges, + * but this algorithm is intended to be correct even in case of overlap. + */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + if (ws->start_lsn > current_lsn) + { + /* We found a gap. */ + break; + } + if (ws->end_lsn > current_lsn) + { + /* + * Next summary extends beyond end of previous summary, so extend + * the end of the range known to be summarized. + */ + current_lsn = ws->end_lsn; + + /* + * If the range we know to be summarized has reached the required + * end LSN, we have proved completeness. + */ + if (current_lsn >= end_lsn) + return true; + } + } + + /* + * We either ran out of summary files without reaching the end LSN, or we + * hit a gap in the sequence that resulted in us bailing out of the loop + * above. + */ + *missing_lsn = current_lsn; + return false; +} + +/* + * Open a WAL summary file. + * + * This will throw an error in case of trouble. As an exception, if + * missing_ok = true and the trouble is specifically that the file does + * not exist, it will not throw an error and will return a value less than 0. + */ +File +OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok) +{ + char path[MAXPGPATH]; + File file; + + snprintf(path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + ws->tli, + LSN_FORMAT_ARGS(ws->start_lsn), + LSN_FORMAT_ARGS(ws->end_lsn)); + + file = PathNameOpenFile(path, O_RDONLY); + if (file < 0 && (errno != EEXIST || !missing_ok)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + return file; +} + +/* + * Remove a WAL summary file if the last modification time precedes the + * cutoff time. + */ +void +RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time) +{ + char path[MAXPGPATH]; + struct stat statbuf; + + snprintf(path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + ws->tli, + LSN_FORMAT_ARGS(ws->start_lsn), + LSN_FORMAT_ARGS(ws->end_lsn)); + + if (lstat(path, &statbuf) != 0) + { + if (errno == ENOENT) + return; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + } + if (statbuf.st_mtime >= cutoff_time) + return; + if (unlink(path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + ereport(DEBUG2, + (errmsg_internal("removing file \"%s\"", path))); +} + +/* + * Test whether a filename looks like a WAL summary file. + */ +static bool +IsWalSummaryFilename(char *filename) +{ + return strspn(filename, "0123456789ABCDEF") == 40 && + strcmp(filename + 40, ".summary") == 0; +} + +/* + * Data read callback for use with CreateBlockRefTableReader. + */ +int +ReadWalSummary(void *wal_summary_io, void *data, int length) +{ + WalSummaryIO *io = wal_summary_io; + int nbytes; + + nbytes = FileRead(io->file, data, length, io->filepos, + WAIT_EVENT_WAL_SUMMARY_READ); + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(io->file)))); + + io->filepos += nbytes; + return nbytes; +} + +/* + * Data write callback for use with WriteBlockRefTable. + */ +int +WriteWalSummary(void *wal_summary_io, void *data, int length) +{ + WalSummaryIO *io = wal_summary_io; + int nbytes; + + nbytes = FileWrite(io->file, data, length, io->filepos, + WAIT_EVENT_WAL_SUMMARY_WRITE); + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + FilePathName(io->file)))); + if (nbytes != length) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u", + FilePathName(io->file), nbytes, + length, (unsigned) io->filepos), + errhint("Check free disk space."))); + + io->filepos += nbytes; + return nbytes; +} + +/* + * Error-reporting callback for use with CreateBlockRefTableReader. + */ +void +ReportWalSummaryError(void *callback_arg, char *fmt,...) +{ + StringInfoData buf; + va_list ap; + int needed; + + initStringInfo(&buf); + for (;;) + { + va_start(ap, fmt); + needed = appendStringInfoVA(&buf, fmt, ap); + va_end(ap); + if (needed == 0) + break; + enlargeStringInfo(&buf, needed); + } + ereport(ERROR, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("%s", buf.data)); +} + +/* + * Comparator to sort a List of WalSummaryFile objects by start_lsn. + */ +static int +ListComparatorForWalSummaryFiles(const ListCell *a, const ListCell *b) +{ + WalSummaryFile *ws1 = lfirst(a); + WalSummaryFile *ws2 = lfirst(b); + + if (ws1->start_lsn < ws2->start_lsn) + return -1; + if (ws1->start_lsn > ws2->start_lsn) + return 1; + return 0; +} diff --git a/src/backend/backup/walsummaryfuncs.c b/src/backend/backup/walsummaryfuncs.c new file mode 100644 index 0000000000..a1f69ad4ba --- /dev/null +++ b/src/backend/backup/walsummaryfuncs.c @@ -0,0 +1,169 @@ +/*------------------------------------------------------------------------- + * + * walsummaryfuncs.c + * SQL-callable functions for accessing WAL summary data. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/backend/backup/walsummaryfuncs.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "backup/walsummary.h" +#include "common/blkreftable.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" + +#define NUM_WS_ATTS 3 +#define NUM_SUMMARY_ATTS 6 +#define MAX_BLOCKS_PER_CALL 256 + +/* + * List the WAL summary files available in pg_wal/summaries. + */ +Datum +pg_available_wal_summaries(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsi; + List *wslist; + ListCell *lc; + Datum values[NUM_WS_ATTS]; + bool nulls[NUM_WS_ATTS]; + + InitMaterializedSRF(fcinfo, 0); + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + + memset(nulls, 0, sizeof(nulls)); + + wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr); + foreach(lc, wslist) + { + WalSummaryFile *ws = (WalSummaryFile *) lfirst(lc); + HeapTuple tuple; + + CHECK_FOR_INTERRUPTS(); + + values[0] = Int64GetDatum((int64) ws->tli); + values[1] = LSNGetDatum(ws->start_lsn); + values[2] = LSNGetDatum(ws->end_lsn); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + + return (Datum) 0; +} + +/* + * List the contents of a WAL summary file identified by TLI, start LSN, + * and end LSN. + */ +Datum +pg_wal_summary_contents(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsi; + Datum values[NUM_SUMMARY_ATTS]; + bool nulls[NUM_SUMMARY_ATTS]; + WalSummaryFile ws; + WalSummaryIO io; + BlockRefTableReader *reader; + int64 raw_tli; + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber limit_block; + + InitMaterializedSRF(fcinfo, 0); + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + memset(nulls, 0, sizeof(nulls)); + + /* + * Since the timeline could at least in theory be more than 2^31, and + * since we don't have unsigned types at the SQL level, it is passed as a + * 64-bit integer. Test whether it's out of range. + */ + raw_tli = PG_GETARG_INT64(0); + if (raw_tli < 1 || raw_tli > PG_INT32_MAX) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeline %lld", (long long) raw_tli)); + + /* Prepare to read the specified WAL summry file. */ + ws.tli = (TimeLineID) raw_tli; + ws.start_lsn = PG_GETARG_LSN(1); + ws.end_lsn = PG_GETARG_LSN(2); + io.filepos = 0; + io.file = OpenWalSummaryFile(&ws, false); + reader = CreateBlockRefTableReader(ReadWalSummary, &io, + FilePathName(io.file), + ReportWalSummaryError, NULL); + + /* Loop over relation forks. */ + while (BlockRefTableReaderNextRelation(reader, &rlocator, &forknum, + &limit_block)) + { + BlockNumber blocks[MAX_BLOCKS_PER_CALL]; + HeapTuple tuple; + + CHECK_FOR_INTERRUPTS(); + + values[0] = ObjectIdGetDatum(rlocator.relNumber); + values[1] = ObjectIdGetDatum(rlocator.spcOid); + values[2] = ObjectIdGetDatum(rlocator.dbOid); + values[3] = Int16GetDatum((int16) forknum); + + /* Loop over blocks within the current relation fork. */ + while (1) + { + unsigned nblocks; + unsigned i; + + CHECK_FOR_INTERRUPTS(); + + nblocks = BlockRefTableReaderGetBlocks(reader, blocks, + MAX_BLOCKS_PER_CALL); + if (nblocks == 0) + break; + + /* + * For each block that we specifically know to have been modified, + * emit a row with that block number and limit_block = false. + */ + values[5] = BoolGetDatum(false); + for (i = 0; i < nblocks; ++i) + { + values[4] = Int64GetDatum((int64) blocks[i]); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + + /* + * If the limit block is not InvalidBlockNumber, emit an exta row + * with that block number and limit_block = true. + * + * There is no point in doing this when the limit_block is + * InvalidBlockNumber, because no block with that number or any + * higher number can ever exist. + */ + if (BlockNumberIsValid(limit_block)) + { + values[4] = Int64GetDatum((int64) limit_block); + values[5] = BoolGetDatum(true); + + tuple = heap_form_tuple(rsi->setDesc, values, nulls); + tuplestore_puttuple(rsi->setResult, tuple); + } + } + } + + /* Cleanup */ + DestroyBlockRefTableReader(reader); + FileClose(io.file); + + return (Datum) 0; +} diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 047448b34e..367a46c617 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -24,6 +24,7 @@ OBJS = \ postmaster.o \ startup.o \ syslogger.o \ + walsummarizer.o \ walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c index bae6f68c40..5f244216a6 100644 --- a/src/backend/postmaster/auxprocess.c +++ b/src/backend/postmaster/auxprocess.c @@ -21,6 +21,7 @@ #include "postmaster/auxprocess.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/walreceiver.h" #include "storage/bufmgr.h" @@ -80,6 +81,9 @@ AuxiliaryProcessMain(AuxProcType auxtype) case WalReceiverProcess: MyBackendType = B_WAL_RECEIVER; break; + case WalSummarizerProcess: + MyBackendType = B_WAL_SUMMARIZER; + break; default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); MyBackendType = B_INVALID; @@ -158,6 +162,10 @@ AuxiliaryProcessMain(AuxProcType auxtype) WalReceiverMain(); proc_exit(1); + case WalSummarizerProcess: + WalSummarizerMain(); + proc_exit(1); + default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); proc_exit(1); diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build index cda921fd10..a30eb6692f 100644 --- a/src/backend/postmaster/meson.build +++ b/src/backend/postmaster/meson.build @@ -12,5 +12,6 @@ backend_sources += files( 'postmaster.c', 'startup.c', 'syslogger.c', + 'walsummarizer.c', 'walwriter.c', ) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 651b85ea74..b163e89cbb 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -113,6 +113,7 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -250,6 +251,7 @@ static pid_t StartupPID = 0, CheckpointerPID = 0, WalWriterPID = 0, WalReceiverPID = 0, + WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, SysLoggerPID = 0; @@ -441,6 +443,7 @@ static bool CreateOptsFile(int argc, char *argv[], char *fullprogname); static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); +static void MaybeStartWalSummarizer(void); static void InitPostmasterDeathWatchHandle(void); /* @@ -564,6 +567,7 @@ static void ShmemBackendArrayRemove(Backend *bn); #define StartCheckpointer() StartChildProcess(CheckpointerProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) #define StartWalReceiver() StartChildProcess(WalReceiverProcess) +#define StartWalSummarizer() StartChildProcess(WalSummarizerProcess) /* Macros to check exit status of a child process */ #define EXIT_STATUS_0(st) ((st) == 0) @@ -933,6 +937,9 @@ PostmasterMain(int argc, char *argv[]) if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"replica\" or \"logical\""))); + if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL) + ereport(ERROR, + (errmsg("WAL cannot be summarized when wal_level is \"minimal\""))); /* * Other one-time internal sanity checks can go here, if they are fast. @@ -1835,6 +1842,9 @@ ServerLoop(void) if (WalReceiverRequested) MaybeStartWalReceiver(); + /* If we need to start a WAL summarizer, try to do that now */ + MaybeStartWalSummarizer(); + /* Get other worker processes running, if needed */ if (StartWorkerNeeded || HaveCrashedWorker) maybe_start_bgworkers(); @@ -2659,6 +2669,8 @@ process_pm_reload_request(void) signal_child(WalWriterPID, SIGHUP); if (WalReceiverPID != 0) signal_child(WalReceiverPID, SIGHUP); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, SIGHUP); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGHUP); if (PgArchPID != 0) @@ -3012,6 +3024,7 @@ process_pm_child_exit(void) BgWriterPID = StartBackgroundWriter(); if (WalWriterPID == 0) WalWriterPID = StartWalWriter(); + MaybeStartWalSummarizer(); /* * Likewise, start other special children as needed. In a restart @@ -3130,6 +3143,20 @@ process_pm_child_exit(void) continue; } + /* + * Was it the wal summarizer? Normal exit can be ignored; we'll start + * a new one at the next iteration of the postmaster's main loop, if + * necessary. Any other exit condition is treated as a crash. + */ + if (pid == WalSummarizerPID) + { + WalSummarizerPID = 0; + if (!EXIT_STATUS_0(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("WAL summarizer process")); + continue; + } + /* * Was it the autovacuum launcher? Normal exit can be ignored; we'll * start a new one at the next iteration of the postmaster's main @@ -3525,6 +3552,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (WalReceiverPID != 0 && take_action) sigquit_child(WalReceiverPID); + /* Take care of the walsummarizer too */ + if (pid == WalSummarizerPID) + WalSummarizerPID = 0; + else if (WalSummarizerPID != 0 && take_action) + sigquit_child(WalSummarizerPID); + /* Take care of the autovacuum launcher too */ if (pid == AutoVacPID) AutoVacPID = 0; @@ -3675,6 +3708,8 @@ PostmasterStateMachine(void) signal_child(StartupPID, SIGTERM); if (WalReceiverPID != 0) signal_child(WalReceiverPID, SIGTERM); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3701,6 +3736,7 @@ PostmasterStateMachine(void) if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && WalReceiverPID == 0 && + WalSummarizerPID == 0 && BgWriterPID == 0 && (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && @@ -3798,6 +3834,7 @@ PostmasterStateMachine(void) /* These other guys should be dead already */ Assert(StartupPID == 0); Assert(WalReceiverPID == 0); + Assert(WalSummarizerPID == 0); Assert(BgWriterPID == 0); Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); @@ -4019,6 +4056,8 @@ TerminateChildren(int signal) signal_child(WalWriterPID, signal); if (WalReceiverPID != 0) signal_child(WalReceiverPID, signal); + if (WalSummarizerPID != 0) + signal_child(WalSummarizerPID, signal); if (AutoVacPID != 0) signal_child(AutoVacPID, signal); if (PgArchPID != 0) @@ -5326,6 +5365,10 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork WAL receiver process: %m"))); break; + case WalSummarizerProcess: + ereport(LOG, + (errmsg("could not fork WAL summarizer process: %m"))); + break; default: ereport(LOG, (errmsg("could not fork process: %m"))); @@ -5462,6 +5505,19 @@ MaybeStartWalReceiver(void) } } +/* + * MaybeStartWalSummarizer + * Start the WAL summarizer process, if not running and our state allows. + */ +static void +MaybeStartWalSummarizer(void) +{ + if (summarize_wal && WalSummarizerPID == 0 && + (pmState == PM_RUN || pmState == PM_HOT_STANDBY) && + Shutdown <= SmartShutdown) + WalSummarizerPID = StartWalSummarizer(); +} + /* * Create the opts file diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c new file mode 100644 index 0000000000..9fa155349e --- /dev/null +++ b/src/backend/postmaster/walsummarizer.c @@ -0,0 +1,1398 @@ +/*------------------------------------------------------------------------- + * + * walsummarizer.c + * + * Background process to perform WAL summarization, if it is enabled. + * It continuously scans the write-ahead log and periodically emits a + * summary file which indicates which blocks in which relation forks + * were modified by WAL records in the LSN range covered by the summary + * file. See walsummary.c and blkreftable.c for more details on the + * naming and contents of WAL summary files. + * + * If configured to do, this background process will also remove WAL + * summary files when the file timestamp is older than a configurable + * threshold (but only if the WAL has been removed first). + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/postmaster/walsummarizer.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/timeline.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "backup/walsummary.h" +#include "catalog/storage_xlog.h" +#include "common/blkreftable.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "postmaster/bgwriter.h" +#include "postmaster/interrupt.h" +#include "postmaster/walsummarizer.h" +#include "replication/walreceiver.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/wait_event.h" + +/* + * Data in shared memory related to WAL summarization. + */ +typedef struct +{ + /* + * These fields are protected by WALSummarizerLock. + * + * Until we've discovered what summary files already exist on disk and + * stored that information in shared memory, initialized is false and the + * other fields here contain no meaningful information. After that has + * been done, initialized is true. + * + * summarized_tli and summarized_lsn indicate the last LSN and TLI at + * which the next summary file will start. Normally, these are the LSN and + * TLI at which the last file ended; in such case, lsn_is_exact is true. + * If, however, the LSN is just an approximation, then lsn_is_exact is + * false. This can happen if, for example, there are no existing WAL + * summary files at startup. In that case, we have to derive the position + * at which to start summarizing from the WAL files that exist on disk, + * and so the LSN might point to the start of the next file even though + * that might happen to be in the middle of a WAL record. + * + * summarizer_pgprocno is the pgprocno value for the summarizer process, + * if one is running, or else INVALID_PGPROCNO. + * + * pending_lsn is used by the summarizer to advertise the ending LSN of a + * record it has recently read. It shouldn't ever be less than + * summarized_lsn, but might be greater, because the summarizer buffers + * data for a range of LSNs in memory before writing out a new file. + */ + bool initialized; + TimeLineID summarized_tli; + XLogRecPtr summarized_lsn; + bool lsn_is_exact; + int summarizer_pgprocno; + XLogRecPtr pending_lsn; + + /* + * This field handles its own synchronizaton. + */ + ConditionVariable summary_file_cv; +} WalSummarizerData; + +/* + * Private data for our xlogreader's page read callback. + */ +typedef struct +{ + TimeLineID tli; + bool historic; + XLogRecPtr read_upto; + bool end_of_wal; +} SummarizerReadLocalXLogPrivate; + +/* Pointer to shared memory state. */ +static WalSummarizerData *WalSummarizerCtl; + +/* + * When we reach end of WAL and need to read more, we sleep for a number of + * milliseconds that is a integer multiple of MS_PER_SLEEP_QUANTUM. This is + * the multiplier. It should vary between 1 and MAX_SLEEP_QUANTA, depending + * on system activity. See summarizer_wait_for_wal() for how we adjust this. + */ +static long sleep_quanta = 1; + +/* + * The sleep time will always be a multiple of 200ms and will not exceed + * thirty seconds (150 * 200 = 30 * 1000). Note that the timeout here needs + * to be substntially less than the maximum amount of time for which an + * incremental backup will wait for this process to catch up. Otherwise, an + * incremental backup might time out on an idle system just because we sleep + * for too long. + */ +#define MAX_SLEEP_QUANTA 150 +#define MS_PER_SLEEP_QUANTUM 200 + +/* + * This is a count of the number of pages of WAL that we've read since the + * last time we waited for more WAL to appear. + */ +static long pages_read_since_last_sleep = 0; + +/* + * Most recent RedoRecPtr value observed by MaybeRemoveOldWalSummaries. + */ +static XLogRecPtr redo_pointer_at_last_summary_removal = InvalidXLogRecPtr; + +/* + * GUC parameters + */ +bool summarize_wal = false; +int wal_summary_keep_time = 10 * 24 * 60; + +static XLogRecPtr GetLatestLSN(TimeLineID *tli); +static void HandleWalSummarizerInterrupts(void); +static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, + bool exact, XLogRecPtr switch_lsn, + XLogRecPtr maximum_lsn); +static void SummarizeSmgrRecord(XLogReaderState *xlogreader, + BlockRefTable *brtab); +static void SummarizeXactRecord(XLogReaderState *xlogreader, + BlockRefTable *brtab); +static bool SummarizeXlogRecord(XLogReaderState *xlogreader); +static int summarizer_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, + int reqLen, + XLogRecPtr targetRecPtr, + char *cur_page); +static void summarizer_wait_for_wal(void); +static void MaybeRemoveOldWalSummaries(void); + +/* + * Amount of shared memory required for this module. + */ +Size +WalSummarizerShmemSize(void) +{ + return sizeof(WalSummarizerData); +} + +/* + * Create or attach to shared memory segment for this module. + */ +void +WalSummarizerShmemInit(void) +{ + bool found; + + WalSummarizerCtl = (WalSummarizerData *) + ShmemInitStruct("Wal Summarizer Ctl", WalSummarizerShmemSize(), + &found); + + if (!found) + { + /* + * First time through, so initialize. + * + * We're just filling in dummy values here -- the real initialization + * will happen when GetOldestUnsummarizedLSN() is called for the first + * time. + */ + WalSummarizerCtl->initialized = false; + WalSummarizerCtl->summarized_tli = 0; + WalSummarizerCtl->summarized_lsn = InvalidXLogRecPtr; + WalSummarizerCtl->lsn_is_exact = false; + WalSummarizerCtl->summarizer_pgprocno = INVALID_PGPROCNO; + WalSummarizerCtl->pending_lsn = InvalidXLogRecPtr; + ConditionVariableInit(&WalSummarizerCtl->summary_file_cv); + } +} + +/* + * Entry point for walsummarizer process. + */ +void +WalSummarizerMain(void) +{ + sigjmp_buf local_sigjmp_buf; + MemoryContext context; + + /* + * Within this function, 'current_lsn' and 'current_tli' refer to the + * point from which the next WAL summary file should start. 'exact' is + * true if 'current_lsn' is known to be the start of a WAL recod or WAL + * segment, and false if it might be in the middle of a record someplace. + * + * 'switch_lsn' and 'switch_tli', if set, are the LSN at which we need to + * switch to a new timeline and the timeline to which we need to switch. + * If not set, we either haven't figured out the answers yet or we're + * already on the latest timeline. + */ + XLogRecPtr current_lsn; + TimeLineID current_tli; + bool exact; + XLogRecPtr switch_lsn = InvalidXLogRecPtr; + TimeLineID switch_tli = 0; + + ereport(DEBUG1, + (errmsg_internal("WAL summarizer started"))); + + /* + * Properly accept or ignore signals the postmaster might send us + * + * We have no particular use for SIGINT at the moment, but seems + * reasonable to treat like SIGTERM. + */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + /* SIGQUIT handler was already set up by InitPostmasterChild */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); /* not used */ + + /* Advertise ourselves. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + WalSummarizerCtl->summarizer_pgprocno = MyProc->pgprocno; + LWLockRelease(WALSummarizerLock); + + /* Create and switch to a memory context that we can reset on error. */ + context = AllocSetContextCreate(TopMemoryContext, + "Wal Summarizer", + ALLOCSET_DEFAULT_SIZES); + MemoryContextSwitchTo(context); + + /* + * Reset some signals that are accepted by postmaster but not here + */ + pqsignal(SIGCHLD, SIG_DFL); + + /* + * If an exception is encountered, processing resumes here. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Release resources we might have acquired. */ + LWLockReleaseAll(); + ConditionVariableCancelSleep(); + pgstat_report_wait_end(); + ReleaseAuxProcessResources(false); + AtEOXact_Files(false); + AtEOXact_HashTables(false); + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(context); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextReset(context); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep for 10 seconds before attempting to resume operations in + * order to avoid excessing logging. + * + * Many of the likely error conditions are things that will repeat + * every time. For example, if the WAL can't be read or the summary + * can't be written, only administrator action will cure the problem. + * So a really fast retry time doesn't seem to be especially + * beneficial, and it will clutter the logs. + */ + (void) WaitLatch(MyLatch, + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10000, + WAIT_EVENT_WAL_SUMMARIZER_ERROR); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + /* + * Fetch information about previous progress from shared memory, and ask + * GetOldestUnsummarizedLSN to reset pending_lsn to summarized_lsn. We + * might be recovering from an error, and if so, pending_lsn might have + * advanced past summarized_lsn, but any WAL we read previously has been + * lost and will need to be reread. + * + * If we discover that WAL summarization is not enabled, just exit. + */ + current_lsn = GetOldestUnsummarizedLSN(¤t_tli, &exact, true); + if (XLogRecPtrIsInvalid(current_lsn)) + proc_exit(0); + + /* + * Loop forever + */ + for (;;) + { + XLogRecPtr latest_lsn; + TimeLineID latest_tli; + XLogRecPtr end_of_summary_lsn; + + /* Flush any leaked data in the top-level context */ + MemoryContextReset(context); + + /* Process any signals received recently. */ + HandleWalSummarizerInterrupts(); + + /* If it's time to remove any old WAL summaries, do that now. */ + MaybeRemoveOldWalSummaries(); + + /* Find the LSN and TLI up to which we can safely summarize. */ + latest_lsn = GetLatestLSN(&latest_tli); + + /* + * If we're summarizing a historic timeline and we haven't yet + * computed the point at which to switch to the next timeline, do that + * now. + * + * Note that if this is a standby, what was previously the current + * timeline could become historic at any time. + * + * We could try to make this more efficient by caching the results of + * readTimeLineHistory when latest_tli has not changed, but since we + * only have to do this once per timeline switch, we probably wouldn't + * save any significant amount of work in practice. + */ + if (current_tli != latest_tli && XLogRecPtrIsInvalid(switch_lsn)) + { + List *tles = readTimeLineHistory(latest_tli); + + switch_lsn = tliSwitchPoint(current_tli, tles, &switch_tli); + ereport(DEBUG1, + errmsg("switch point from TLI %u to TLI %u is at %X/%X", + current_tli, switch_tli, LSN_FORMAT_ARGS(switch_lsn))); + } + + /* + * If we've reached the switch LSN, we can't summarize anything else + * on this timeline. Switch to the next timeline and go around again. + */ + if (!XLogRecPtrIsInvalid(switch_lsn) && current_lsn >= switch_lsn) + { + current_tli = switch_tli; + switch_lsn = InvalidXLogRecPtr; + switch_tli = 0; + continue; + } + + /* Summarize WAL. */ + end_of_summary_lsn = SummarizeWAL(current_tli, + current_lsn, exact, + switch_lsn, latest_lsn); + Assert(!XLogRecPtrIsInvalid(end_of_summary_lsn)); + Assert(end_of_summary_lsn >= current_lsn); + + /* + * Update state for next loop iteration. + * + * Next summary file should start from exactly where this one ended. + */ + current_lsn = end_of_summary_lsn; + exact = true; + + /* Update state in shared memory. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + Assert(WalSummarizerCtl->pending_lsn <= end_of_summary_lsn); + WalSummarizerCtl->summarized_lsn = end_of_summary_lsn; + WalSummarizerCtl->summarized_tli = current_tli; + WalSummarizerCtl->lsn_is_exact = true; + WalSummarizerCtl->pending_lsn = end_of_summary_lsn; + LWLockRelease(WALSummarizerLock); + + /* Wake up anyone waiting for more summary files to be written. */ + ConditionVariableBroadcast(&WalSummarizerCtl->summary_file_cv); + } +} + +/* + * Get the oldest LSN in this server's timeline history that has not yet been + * summarized. + * + * If *tli != NULL, it will be set to the TLI for the LSN that is returned. + * + * If *lsn_is_exact != NULL, it will be set to true if the returned LSN is + * necessarily the start of a WAL record and false if it's just the beginning + * of a WAL segment. + * + * If reset_pending_lsn is true, resets the pending_lsn in shared memory to + * be equal to the summarized_lsn. + */ +XLogRecPtr +GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact, + bool reset_pending_lsn) +{ + TimeLineID latest_tli; + LWLockMode mode = reset_pending_lsn ? LW_EXCLUSIVE : LW_SHARED; + int n; + List *tles; + XLogRecPtr unsummarized_lsn; + TimeLineID unsummarized_tli = 0; + bool should_make_exact = false; + List *existing_summaries; + ListCell *lc; + + /* If not summarizing WAL, do nothing. */ + if (!summarize_wal) + return InvalidXLogRecPtr; + + /* + * Unless we need to reset the pending_lsn, we initally acquire the lock + * in shared mode and try to fetch the required information. If we acquire + * in shared mode and find that the data structure hasn't been + * initialized, we reacquire the lock in exclusive mode so that we can + * initialize it. However, if someone else does that first before we get + * the lock, then we can just return the requested information after all. + */ + while (1) + { + LWLockAcquire(WALSummarizerLock, mode); + + if (WalSummarizerCtl->initialized) + { + unsummarized_lsn = WalSummarizerCtl->summarized_lsn; + if (tli != NULL) + *tli = WalSummarizerCtl->summarized_tli; + if (lsn_is_exact != NULL) + *lsn_is_exact = WalSummarizerCtl->lsn_is_exact; + if (reset_pending_lsn) + WalSummarizerCtl->pending_lsn = + WalSummarizerCtl->summarized_lsn; + LWLockRelease(WALSummarizerLock); + return unsummarized_lsn; + } + + if (mode == LW_EXCLUSIVE) + break; + + LWLockRelease(WALSummarizerLock); + mode = LW_EXCLUSIVE; + } + + /* + * The data structure needs to be initialized, and we are the first to + * obtain the lock in exclusive mode, so it's our job to do that + * initialization. + * + * So, find the oldest timeline on which WAL still exists, and the + * earliest segment for which it exists. + */ + (void) GetLatestLSN(&latest_tli); + tles = readTimeLineHistory(latest_tli); + for (n = list_length(tles) - 1; n >= 0; --n) + { + TimeLineHistoryEntry *tle = list_nth(tles, n); + XLogSegNo oldest_segno; + + oldest_segno = XLogGetOldestSegno(tle->tli); + if (oldest_segno != 0) + { + /* Compute oldest LSN that still exists on disk. */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, + unsummarized_lsn); + + unsummarized_tli = tle->tli; + break; + } + } + + /* It really should not be possible for us to find no WAL. */ + if (unsummarized_tli == 0) + ereport(ERROR, + errcode(ERRCODE_INTERNAL_ERROR), + errmsg_internal("no WAL found on timeline %d", latest_tli)); + + /* + * Don't try to summarize anything older than the end LSN of the newest + * summary file that exists for this timeline. + */ + existing_summaries = + GetWalSummaries(unsummarized_tli, + InvalidXLogRecPtr, InvalidXLogRecPtr); + foreach(lc, existing_summaries) + { + WalSummaryFile *ws = lfirst(lc); + + if (ws->end_lsn > unsummarized_lsn) + { + unsummarized_lsn = ws->end_lsn; + should_make_exact = true; + } + } + + /* Update shared memory with the discovered values. */ + WalSummarizerCtl->initialized = true; + WalSummarizerCtl->summarized_lsn = unsummarized_lsn; + WalSummarizerCtl->summarized_tli = unsummarized_tli; + WalSummarizerCtl->lsn_is_exact = should_make_exact; + WalSummarizerCtl->pending_lsn = unsummarized_lsn; + + /* Also return the to the caller as required. */ + if (tli != NULL) + *tli = WalSummarizerCtl->summarized_tli; + if (lsn_is_exact != NULL) + *lsn_is_exact = WalSummarizerCtl->lsn_is_exact; + LWLockRelease(WALSummarizerLock); + + return unsummarized_lsn; +} + +/* + * Attempt to set the WAL summarizer's latch. + * + * This might not work, because there's no guarantee that the WAL summarizer + * process was successfully started, and it also might have started but + * subsequently terminated. So, under normal circumstances, this will get the + * latch set, but there's no guarantee. + */ +void +SetWalSummarizerLatch(void) +{ + int pgprocno; + + if (WalSummarizerCtl == NULL) + return; + + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + pgprocno = WalSummarizerCtl->summarizer_pgprocno; + LWLockRelease(WALSummarizerLock); + + if (pgprocno != INVALID_PGPROCNO) + SetLatch(&ProcGlobal->allProcs[pgprocno].procLatch); +} + +/* + * Wait until WAL summarization reaches the given LSN, but not longer than + * the given timeout. + * + * The return value is the first still-unsummarized LSN. If it's greater than + * or equal to the passed LSN, then that LSN was reached. If not, we timed out. + * + * Either way, *pending_lsn is set to the value taken from WalSummarizerCtl. + */ +XLogRecPtr +WaitForWalSummarization(XLogRecPtr lsn, long timeout, XLogRecPtr *pending_lsn) +{ + TimestampTz start_time = GetCurrentTimestamp(); + TimestampTz deadline = TimestampTzPlusMilliseconds(start_time, timeout); + XLogRecPtr summarized_lsn; + + Assert(!XLogRecPtrIsInvalid(lsn)); + Assert(timeout > 0); + + while (1) + { + TimestampTz now; + long remaining_timeout; + + /* + * If the LSN summarized on disk has reached the target value, stop. + */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + summarized_lsn = WalSummarizerCtl->summarized_lsn; + *pending_lsn = WalSummarizerCtl->pending_lsn; + LWLockRelease(WALSummarizerLock); + if (summarized_lsn >= lsn) + break; + + /* Timeout reached? If yes, stop. */ + now = GetCurrentTimestamp(); + remaining_timeout = TimestampDifferenceMilliseconds(now, deadline); + if (remaining_timeout <= 0) + break; + + /* Wait and see. */ + ConditionVariableTimedSleep(&WalSummarizerCtl->summary_file_cv, + remaining_timeout, + WAIT_EVENT_WAL_SUMMARY_READY); + } + + return summarized_lsn; +} + +/* + * Get the latest LSN that is eligible to be summarized, and set *tli to the + * corresponding timeline. + */ +static XLogRecPtr +GetLatestLSN(TimeLineID *tli) +{ + if (!RecoveryInProgress()) + { + /* Don't summarize WAL before it's flushed. */ + return GetFlushRecPtr(tli); + } + else + { + XLogRecPtr flush_lsn; + TimeLineID flush_tli; + XLogRecPtr replay_lsn; + TimeLineID replay_tli; + + /* + * What we really want to know is how much WAL has been flushed to + * disk, but the only flush position available is the one provided by + * the walreceiver, which may not be running, because this could be + * crash recovery or recovery via restore_command. So use either the + * WAL receiver's flush position or the replay position, whichever is + * further ahead, on the theory that if the WAL has been replayed then + * it must also have been flushed to disk. + */ + flush_lsn = GetWalRcvFlushRecPtr(NULL, &flush_tli); + replay_lsn = GetXLogReplayRecPtr(&replay_tli); + if (flush_lsn > replay_lsn) + { + *tli = flush_tli; + return flush_lsn; + } + else + { + *tli = replay_tli; + return replay_lsn; + } + } +} + +/* + * Interrupt handler for main loop of WAL summarizer process. + */ +static void +HandleWalSummarizerInterrupts(void) +{ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (ShutdownRequestPending || !summarize_wal) + { + ereport(DEBUG1, + errmsg_internal("WAL summarizer shutting down")); + proc_exit(0); + } + + /* Perform logging of memory contexts of this process */ + if (LogMemoryContextPending) + ProcessLogMemoryContextInterrupt(); +} + +/* + * Summarize a range of WAL records on a single timeline. + * + * 'tli' is the timeline to be summarized. + * + * 'start_lsn' is the point at which we should start summarizing. If this + * value comes from the end LSN of the previous record as returned by the + * xlograder machinery, 'exact' should be true; otherwise, 'exact' should + * be false, and this function will search forward for the start of a valid + * WAL record. + * + * 'switch_lsn' is the point at which we should switch to a later timeline, + * if we're summarizing a historic timeline. + * + * 'maximum_lsn' identifies the point beyond which we can't count on being + * able to read any more WAL. It should be the switch point when reading a + * historic timeline, or the most-recently-measured end of WAL when reading + * the current timeline. + * + * The return value is the LSN at which the WAL summary actually ends. Most + * often, a summary file ends because we notice that a checkpoint has + * occurred and reach the redo pointer of that checkpoint, but sometimes + * we stop for other reasons, such as a timeline switch. + */ +static XLogRecPtr +SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, + XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn) +{ + SummarizerReadLocalXLogPrivate *private_data; + XLogReaderState *xlogreader; + XLogRecPtr summary_start_lsn; + XLogRecPtr summary_end_lsn = switch_lsn; + char temp_path[MAXPGPATH]; + char final_path[MAXPGPATH]; + WalSummaryIO io; + BlockRefTable *brtab = CreateEmptyBlockRefTable(); + + /* Initialize private data for xlogreader. */ + private_data = (SummarizerReadLocalXLogPrivate *) + palloc0(sizeof(SummarizerReadLocalXLogPrivate)); + private_data->tli = tli; + private_data->historic = !XLogRecPtrIsInvalid(switch_lsn); + private_data->read_upto = maximum_lsn; + + /* Create xlogreader. */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &summarizer_read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + private_data); + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + /* + * When exact = false, we're starting from an arbitrary point in the WAL + * and must search forward for the start of the next record. + * + * When exact = true, start_lsn should be either the LSN where a record + * begins, or the LSN of a page where the page header is immediately + * followed by the start of a new record. XLogBeginRead should tolerate + * either case. + * + * We need to allow for both cases because the behavior of xlogreader + * varies. When a record spans two or more xlog pages, the ending LSN + * reported by xlogreader will be the starting LSN of the following + * record, but when an xlog page boundary falls between two records, the + * end LSN for the first will be reported as the first byte of the + * following page. We can't know until we read that page how large the + * header will be, but we'll have to skip over it to find the next record. + */ + if (exact) + { + /* + * Even if start_lsn is the beginning of a page rather than the + * beginning of the first record on that page, we should still use it + * as the start LSN for the summary file. That's because we detect + * missing summary files by looking for cases where the end LSN of one + * file is less than the start LSN of the next file. When only a page + * header is skipped, nothing has been missed. + */ + XLogBeginRead(xlogreader, start_lsn); + summary_start_lsn = start_lsn; + } + else + { + summary_start_lsn = XLogFindNextRecord(xlogreader, start_lsn); + if (XLogRecPtrIsInvalid(summary_start_lsn)) + { + /* + * If we hit end-of-WAL while trying to find the next valid + * record, we must be on a historic timeline that has no valid + * records that begin after start_lsn and before end of WAL. + */ + if (private_data->end_of_wal) + { + ereport(DEBUG1, + errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X", + tli, + LSN_FORMAT_ARGS(start_lsn), + LSN_FORMAT_ARGS(private_data->read_upto))); + + /* + * The timeline ends at or after start_lsn, without containing + * any records. Thus, we must make sure the main loop does not + * iterate. If start_lsn is the end of the timeline, then we + * won't actually emit an empty summary file, but otherwise, + * we must, to capture the fact that the LSN range in question + * contains no interesting WAL records. + */ + summary_start_lsn = start_lsn; + summary_end_lsn = private_data->read_upto; + switch_lsn = xlogreader->EndRecPtr; + } + else + ereport(ERROR, + (errmsg("could not find a valid record after %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + } + + /* We shouldn't go backward. */ + Assert(summary_start_lsn >= start_lsn); + } + + /* + * Main loop: read xlog records one by one. + */ + while (1) + { + int block_id; + char *errormsg; + XLogRecord *record; + bool stop_requested = false; + + HandleWalSummarizerInterrupts(); + + /* We shouldn't go backward. */ + Assert(summary_start_lsn <= xlogreader->EndRecPtr); + + /* Now read the next record. */ + record = XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + { + if (private_data->end_of_wal) + { + /* + * This timeline must be historic and must end before we were + * able to read a complete record. + */ + ereport(DEBUG1, + errmsg_internal("could not read WAL from timeline %d at %X/%X: end of WAL at %X/%X", + tli, + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + LSN_FORMAT_ARGS(private_data->read_upto))); + /* Summary ends at end of WAL. */ + summary_end_lsn = private_data->read_upto; + break; + } + if (errormsg) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL from timeline %u at %X/%X: %s", + tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr), + errormsg))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL from timeline %u at %X/%X", + tli, LSN_FORMAT_ARGS(xlogreader->EndRecPtr)))); + } + + /* We shouldn't go backward. */ + Assert(summary_start_lsn <= xlogreader->EndRecPtr); + + if (!XLogRecPtrIsInvalid(switch_lsn) && + xlogreader->ReadRecPtr >= switch_lsn) + { + /* + * Woops! We've read a record that *starts* after the switch LSN, + * contrary to our goal of reading only until we hit the first + * record that ends at or after the switch LSN. Pretend we didn't + * read it after all by bailing out of this loop right here, + * before we do anything with this record. + * + * This can happen because the last record before the switch LSN + * might be continued across multiple pages, and then we might + * come to a page with XLP_FIRST_IS_OVERWRITE_CONTRECORD set. In + * that case, the record that was continued across multiple pages + * is incomplete and will be disregarded, and the read will + * restart from the beginning of the page that is flagged + * XLP_FIRST_IS_OVERWRITE_CONTRECORD. + * + * If this case occurs, we can fairly say that the current summary + * file ends at the switch LSN exactly. The first record on the + * page marked XLP_FIRST_IS_OVERWRITE_CONTRECORD will be + * discovered when generating the next summary file. + */ + summary_end_lsn = switch_lsn; + break; + } + + /* Special handling for particular types of WAL records. */ + switch (XLogRecGetRmid(xlogreader)) + { + case RM_SMGR_ID: + SummarizeSmgrRecord(xlogreader, brtab); + break; + case RM_XACT_ID: + SummarizeXactRecord(xlogreader, brtab); + break; + case RM_XLOG_ID: + stop_requested = SummarizeXlogRecord(xlogreader); + break; + default: + break; + } + + /* + * If we've been told that it's time to end this WAL summary file, do + * so. As an exception, if there's nothing included in this WAL + * summary file yet, then stopping doesn't make any sense, and we + * should wait until the next stop point instead. + */ + if (stop_requested && xlogreader->ReadRecPtr > summary_start_lsn) + { + summary_end_lsn = xlogreader->ReadRecPtr; + break; + } + + /* Feed block references from xlog record to block reference table. */ + for (block_id = 0; block_id <= XLogRecMaxBlockId(xlogreader); + block_id++) + { + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber blocknum; + + if (!XLogRecGetBlockTagExtended(xlogreader, block_id, &rlocator, + &forknum, &blocknum, NULL)) + continue; + + /* + * As we do elsewhere, ignore the FSM fork, because it's not fully + * WAL-logged. + */ + if (forknum != FSM_FORKNUM) + BlockRefTableMarkBlockModified(brtab, &rlocator, forknum, + blocknum); + } + + /* Update our notion of where this summary file ends. */ + summary_end_lsn = xlogreader->EndRecPtr; + + /* Also update shared memory. */ + LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE); + Assert(summary_end_lsn >= WalSummarizerCtl->pending_lsn); + Assert(summary_end_lsn >= WalSummarizerCtl->summarized_lsn); + WalSummarizerCtl->pending_lsn = summary_end_lsn; + LWLockRelease(WALSummarizerLock); + + /* + * If we have a switch LSN and have reached it, stop before reading + * the next record. + */ + if (!XLogRecPtrIsInvalid(switch_lsn) && + xlogreader->EndRecPtr >= switch_lsn) + break; + } + + /* Destroy xlogreader. */ + pfree(xlogreader->private_data); + XLogReaderFree(xlogreader); + + /* + * If a timeline switch occurs, we may fail to make any progress at all + * before exiting the loop above. If that happens, we don't write a WAL + * summary file at all. + */ + if (summary_end_lsn > summary_start_lsn) + { + /* Generate temporary and final path name. */ + snprintf(temp_path, MAXPGPATH, + XLOGDIR "/summaries/temp.summary"); + snprintf(final_path, MAXPGPATH, + XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary", + tli, + LSN_FORMAT_ARGS(summary_start_lsn), + LSN_FORMAT_ARGS(summary_end_lsn)); + + /* Open the temporary file for writing. */ + io.filepos = 0; + io.file = PathNameOpenFile(temp_path, O_WRONLY | O_CREAT | O_TRUNC); + if (io.file < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", temp_path))); + + /* Write the data. */ + WriteBlockRefTable(brtab, WriteWalSummary, &io); + + /* Close temporary file and shut down xlogreader. */ + FileClose(io.file); + + /* Tell the user what we did. */ + ereport(DEBUG1, + errmsg("summarized WAL on TLI %d from %X/%X to %X/%X", + tli, + LSN_FORMAT_ARGS(summary_start_lsn), + LSN_FORMAT_ARGS(summary_end_lsn))); + + /* Durably rename the new summary into place. */ + durable_rename(temp_path, final_path, ERROR); + } + + return summary_end_lsn; +} + +/* + * Special handling for WAL records with RM_SMGR_ID. + */ +static void +SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + if (info == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec; + + /* + * If a new relation fork is created on disk, there is no point + * tracking anything about which blocks have been modified, because + * the whole thing will be new. Hence, set the limit block for this + * fork to 0. + * + * Ignore the FSM fork, which is not fully WAL-logged. + */ + xlrec = (xl_smgr_create *) XLogRecGetData(xlogreader); + + if (xlrec->forkNum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + xlrec->forkNum, 0); + } + else if (info == XLOG_SMGR_TRUNCATE) + { + xl_smgr_truncate *xlrec; + + xlrec = (xl_smgr_truncate *) XLogRecGetData(xlogreader); + + /* + * If a relation fork is truncated on disk, there is no point in + * tracking anything about block modifications beyond the truncation + * point. + * + * We ignore SMGR_TRUNCATE_FSM here because the FSM isn't fully + * WAL-logged and thus we can't track modified blocks for it anyway. + */ + if ((xlrec->flags & SMGR_TRUNCATE_HEAP) != 0) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + MAIN_FORKNUM, xlrec->blkno); + if ((xlrec->flags & SMGR_TRUNCATE_VM) != 0) + BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator, + VISIBILITYMAP_FORKNUM, xlrec->blkno); + } +} + +/* + * Special handling for WAL recods with RM_XACT_ID. + */ +static void +SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + uint8 xact_info = info & XLOG_XACT_OPMASK; + + if (xact_info == XLOG_XACT_COMMIT || + xact_info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader); + xl_xact_parsed_commit parsed; + int i; + + /* + * Don't track modified blocks for any relations that were removed on + * commit. + */ + ParseCommitRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed); + for (i = 0; i < parsed.nrels; ++i) + { + ForkNumber forknum; + + for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum) + if (forknum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i], + forknum, 0); + } + } + else if (xact_info == XLOG_XACT_ABORT || + xact_info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader); + xl_xact_parsed_abort parsed; + int i; + + /* + * Don't track modified blocks for any relations that were removed on + * abort. + */ + ParseAbortRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed); + for (i = 0; i < parsed.nrels; ++i) + { + ForkNumber forknum; + + for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum) + if (forknum != FSM_FORKNUM) + BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i], + forknum, 0); + } + } +} + +/* + * Special handling for WAL recods with RM_XLOG_ID. + */ +static bool +SummarizeXlogRecord(XLogReaderState *xlogreader) +{ + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + if (info == XLOG_CHECKPOINT_REDO || info == XLOG_CHECKPOINT_SHUTDOWN) + { + /* + * This is an LSN at which redo might begin, so we'd like + * summarization to stop just before this WAL record. + */ + return true; + } + + return false; +} + +/* + * Similar to read_local_xlog_page, but limited to read from one particular + * timeline. If the end of WAL is reached, it will wait for more if reading + * from the current timeline, or give up if reading from a historic timeline. + * In the latter case, it will also set private_data->end_of_wal = true. + * + * Caller must set private_data->tli to the TLI of interest, + * private_data->read_upto to the lowest LSN that is not known to be safe + * to read on that timeline, and private_data->historic to true if and only + * if the timeline is not the current timeline. This function will update + * private_data->read_upto and private_data->historic if more WAL appears + * on the current timeline or if the current timeline becomes historic. + */ +static int +summarizer_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *cur_page) +{ + int count; + WALReadError errinfo; + SummarizerReadLocalXLogPrivate *private_data; + + HandleWalSummarizerInterrupts(); + + private_data = (SummarizerReadLocalXLogPrivate *) + state->private_data; + + while (1) + { + if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto) + { + /* + * more than one block available; read only that block, have + * caller come back if they need more. + */ + count = XLOG_BLCKSZ; + break; + } + else if (targetPagePtr + reqLen > private_data->read_upto) + { + /* We don't seem to have enough data. */ + if (private_data->historic) + { + /* + * This is a historic timeline, so there will never be any + * more data than we have currently. + */ + private_data->end_of_wal = true; + return -1; + } + else + { + XLogRecPtr latest_lsn; + TimeLineID latest_tli; + + /* + * This is - or at least was up until very recently - the + * current timeline, so more data might show up. Delay here + * so we don't tight-loop. + */ + HandleWalSummarizerInterrupts(); + summarizer_wait_for_wal(); + + /* Recheck end-of-WAL. */ + latest_lsn = GetLatestLSN(&latest_tli); + if (private_data->tli == latest_tli) + { + /* Still the current timeline, update max LSN. */ + Assert(latest_lsn >= private_data->read_upto); + private_data->read_upto = latest_lsn; + } + else + { + List *tles = readTimeLineHistory(latest_tli); + XLogRecPtr switchpoint; + + /* + * The timeline we're scanning is no longer the latest + * one. Figure out when it ended. + */ + private_data->historic = true; + switchpoint = tliSwitchPoint(private_data->tli, tles, + NULL); + + /* + * Allow reads up to exactly the switch point. + * + * It's possible that this will cause read_upto to move + * backwards, because walreceiver might have read a + * partial record and flushed it to disk, and we'd view + * that data as safe to read. However, the + * XLOG_END_OF_RECOVERY record will be written at the end + * of the last complete WAL record, not at the end of the + * WAL that we've flushed to disk. + * + * So switchpoint < private->read_upto is possible here, + * but switchpoint < state->EndRecPtr should not be. + */ + Assert(switchpoint >= state->EndRecPtr); + private_data->read_upto = switchpoint; + + /* Debugging output. */ + ereport(DEBUG1, + errmsg("timeline %u became historic, can read up to %X/%X", + private_data->tli, LSN_FORMAT_ARGS(private_data->read_upto))); + } + + /* Go around and try again. */ + } + } + else + { + /* enough bytes available to satisfy the request */ + count = private_data->read_upto - targetPagePtr; + break; + } + } + + /* + * Even though we just determined how much of the page can be validly read + * as 'count', read the whole page anyway. It's guaranteed to be + * zero-padded up to the page boundary if it's incomplete. + */ + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, + private_data->tli, &errinfo)) + WALReadRaiseError(&errinfo); + + /* Track that we read a page, for sleep time calculation. */ + ++pages_read_since_last_sleep; + + /* number of valid bytes in the buffer */ + return count; +} + +/* + * Sleep for long enough that we believe it's likely that more WAL will + * be available afterwards. + */ +static void +summarizer_wait_for_wal(void) +{ + if (pages_read_since_last_sleep == 0) + { + /* + * No pages were read since the last sleep, so double the sleep time, + * but not beyond the maximum allowable value. + */ + sleep_quanta = Min(sleep_quanta * 2, MAX_SLEEP_QUANTA); + } + else if (pages_read_since_last_sleep > 1) + { + /* + * Multiple pages were read since the last sleep, so reduce the sleep + * time. + * + * A large burst of activity should be able to quickly reduce the + * sleep time to the minimum, but we don't want a handful of extra WAL + * records to provoke a strong reaction. We choose to reduce the sleep + * time by 1 quantum for each page read beyond the first, which is a + * fairly arbitrary way of trying to be reactive without + * overrreacting. + */ + if (pages_read_since_last_sleep > sleep_quanta - 1) + sleep_quanta = 1; + else + sleep_quanta -= pages_read_since_last_sleep; + } + + /* OK, now sleep. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_quanta * MS_PER_SLEEP_QUANTUM, + WAIT_EVENT_WAL_SUMMARIZER_WAL); + ResetLatch(MyLatch); + + /* Reset count of pages read. */ + pages_read_since_last_sleep = 0; +} + +/* + * Most recent RedoRecPtr value observed by RemoveOldWalSummaries. + */ +static void +MaybeRemoveOldWalSummaries(void) +{ + XLogRecPtr redo_pointer = GetRedoRecPtr(); + List *wslist; + time_t cutoff_time; + + /* If WAL summary removal is disabled, don't do anything. */ + if (wal_summary_keep_time == 0) + return; + + /* + * If the redo pointer has not advanced, don't do anything. + * + * This has the effect that we only try to remove old WAL summary files + * once per checkpoint cycle. + */ + if (redo_pointer == redo_pointer_at_last_summary_removal) + return; + redo_pointer_at_last_summary_removal = redo_pointer; + + /* + * Files should only be removed if the last modification time precedes the + * cutoff time we compute here. + */ + cutoff_time = time(NULL) - 60 * wal_summary_keep_time; + + /* Get all the summaries that currently exist. */ + wslist = GetWalSummaries(0, InvalidXLogRecPtr, InvalidXLogRecPtr); + + /* Loop until all summaries have been considered for removal. */ + while (wslist != NIL) + { + ListCell *lc; + XLogSegNo oldest_segno; + XLogRecPtr oldest_lsn = InvalidXLogRecPtr; + TimeLineID selected_tli; + + HandleWalSummarizerInterrupts(); + + /* + * Pick a timeline for which some summary files still exist on disk, + * and find the oldest LSN that still exists on disk for that + * timeline. + */ + selected_tli = ((WalSummaryFile *) linitial(wslist))->tli; + oldest_segno = XLogGetOldestSegno(selected_tli); + if (oldest_segno != 0) + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, + oldest_lsn); + + + /* Consider each WAL file on the selected timeline in turn. */ + foreach(lc, wslist) + { + WalSummaryFile *ws = lfirst(lc); + + HandleWalSummarizerInterrupts(); + + /* If it's not on this timeline, it's not time to consider it. */ + if (selected_tli != ws->tli) + continue; + + /* + * If the WAL doesn't exist any more, we can remove it if the file + * modification time is old enough. + */ + if (XLogRecPtrIsInvalid(oldest_lsn) || ws->end_lsn <= oldest_lsn) + RemoveWalSummaryIfOlderThan(ws, cutoff_time); + + /* + * Whether we removed the file or not, we need not consider it + * again. + */ + wslist = foreach_delete_current(wslist, lc); + pfree(ws); + } + } +} diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..d621f5507f 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +WALSummarizerLock 49 diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index d99ecdd4d8..0dd9b98b3e 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -306,7 +306,8 @@ pgstat_io_snapshot_cb(void) * - Syslogger because it is not connected to shared memory * - Archiver because most relevant archiving IO is delegated to a * specialized command or module -* - WAL Receiver and WAL Writer IO is not tracked in pg_stat_io for now +* - WAL Receiver, WAL Writer, and WAL Summarizer IO are not tracked in +* pg_stat_io for now * * Function returns true if BackendType participates in the cumulative stats * subsystem for IO and false if it does not. @@ -328,6 +329,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_LOGGER: case B_WAL_RECEIVER: case B_WAL_WRITER: + case B_WAL_SUMMARIZER: return false; case B_AUTOVAC_LAUNCHER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d7995931bd..7e79163466 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -56,6 +56,7 @@ RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." +WAL_SUMMARIZER_WAL "Waiting in WAL summarizer for more WAL to be generated." WAL_WRITER_MAIN "Waiting in main loop of WAL writer process." @@ -142,6 +143,7 @@ SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a READ ONLY DEFER SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." +WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at end of a parallel operation." @@ -162,6 +164,7 @@ REGISTER_SYNC_REQUEST "Waiting while sending synchronization requests to the che SPIN_DELAY "Waiting while acquiring a contended spinlock." VACUUM_DELAY "Waiting in a cost-based vacuum delay point." VACUUM_TRUNCATE "Waiting to acquire an exclusive lock to truncate off any empty pages at the end of a table vacuumed." +WAL_SUMMARIZER_ERROR "Waiting after a WAL summarizer error." # @@ -243,6 +246,8 @@ WAL_COPY_WRITE "Waiting for a write when creating a new WAL segment by copying a WAL_INIT_SYNC "Waiting for a newly initialized WAL file to reach durable storage." WAL_INIT_WRITE "Waiting for a write while initializing a new WAL file." WAL_READ "Waiting for a read from a WAL file." +WAL_SUMMARY_READ "Waiting for a read from a WAL summary file." +WAL_SUMMARY_WRITE "Waiting for a write to a WAL summary file." WAL_SYNC "Waiting for a WAL file to reach durable storage." WAL_SYNC_METHOD_ASSIGN "Waiting for data to reach durable storage while assigning a new WAL sync method." WAL_WRITE "Waiting for a write to a WAL file." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 819936ec02..5c9b6f991e 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -305,6 +305,9 @@ GetBackendTypeDesc(BackendType backendType) case B_WAL_SENDER: backendDesc = "walsender"; break; + case B_WAL_SUMMARIZER: + backendDesc = "walsummarizer"; + break; case B_WAL_WRITER: backendDesc = "walwriter"; break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f7c9882f7c..9f59440526 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -63,6 +63,7 @@ #include "postmaster/postmaster.h" #include "postmaster/startup.h" #include "postmaster/syslogger.h" +#include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" #include "replication/slot.h" @@ -703,6 +704,8 @@ const char *const config_group_names[] = gettext_noop("Write-Ahead Log / Archive Recovery"), /* WAL_RECOVERY_TARGET */ gettext_noop("Write-Ahead Log / Recovery Target"), + /* WAL_SUMMARIZATION */ + gettext_noop("Write-Ahead Log / Summarization"), /* REPLICATION_SENDING */ gettext_noop("Replication / Sending Servers"), /* REPLICATION_PRIMARY */ @@ -1786,6 +1789,16 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"summarize_wal", PGC_SIGHUP, WAL_SUMMARIZATION, + gettext_noop("Starts the WAL summarizer process to enable incremental backup."), + NULL + }, + &summarize_wal, + false, + NULL, NULL, NULL + }, + { {"hot_standby", PGC_POSTMASTER, REPLICATION_STANDBY, gettext_noop("Allows connections and queries during recovery."), @@ -3200,6 +3213,19 @@ struct config_int ConfigureNamesInt[] = check_wal_segment_size, NULL, NULL }, + { + {"wal_summary_keep_time", PGC_SIGHUP, WAL_SUMMARIZATION, + gettext_noop("Time for which WAL summary files should be kept."), + NULL, + GUC_UNIT_MIN, + }, + &wal_summary_keep_time, + 10 * 24 * 60, /* 10 days */ + 0, + INT_MAX, + NULL, NULL, NULL + }, + { {"autovacuum_naptime", PGC_SIGHUP, AUTOVACUUM, gettext_noop("Time to sleep between autovacuum runs."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index cf9f283cfe..b2809c711a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -302,6 +302,11 @@ #recovery_target_action = 'pause' # 'pause', 'promote', 'shutdown' # (change requires restart) +# - WAL Summarization - + +#summarize_wal = off # run WAL summarizer process? +#wal_summary_keep_time = '10d' # when to remove old summary files, 0 = never + #------------------------------------------------------------------------------ # REPLICATION diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 0c6f5ceb0a..e68b40d2b5 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -227,6 +227,7 @@ static char *extra_options = ""; static const char *const subdirs[] = { "global", "pg_wal/archive_status", + "pg_wal/summaries", "pg_commit_ts", "pg_dynshmem", "pg_notify", diff --git a/src/common/Makefile b/src/common/Makefile index 3f9067e0a2..2ba5069dca 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -47,6 +47,7 @@ OBJS_COMMON = \ archive.o \ base64.o \ binaryheap.o \ + blkreftable.o \ checksum_helper.o \ compression.o \ config_info.o \ diff --git a/src/common/blkreftable.c b/src/common/blkreftable.c new file mode 100644 index 0000000000..21ee6f5968 --- /dev/null +++ b/src/common/blkreftable.c @@ -0,0 +1,1308 @@ +/*------------------------------------------------------------------------- + * + * blkreftable.c + * Block reference tables. + * + * A block reference table is used to keep track of which blocks have + * been modified by WAL records within a certain LSN range. + * + * For each relation fork, we keep track of all blocks that have appeared + * in block reference in the WAL. We also keep track of the "limit block", + * which is the smallest relation length in blocks known to have occurred + * during that range of WAL records. This should be set to 0 if the relation + * fork is created or destroyed, and to the post-truncation length if + * truncated. + * + * Whenever we set the limit block, we also forget about any modified blocks + * beyond that point. Those blocks don't exist any more. Such blocks can + * later be marked as modified again; if that happens, it means the relation + * was re-extended. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/common/blkreftable.c + * + *------------------------------------------------------------------------- + */ + + +#ifndef FRONTEND +#include "postgres.h" +#else +#include "postgres_fe.h" +#endif + +#ifdef FRONTEND +#include "common/logging.h" +#endif + +#include "common/blkreftable.h" +#include "common/hashfn.h" +#include "port/pg_crc32c.h" + +/* + * A block reference table keeps track of the status of each relation + * fork individually. + */ +typedef struct BlockRefTableKey +{ + RelFileLocator rlocator; + ForkNumber forknum; +} BlockRefTableKey; + +/* + * We could need to store data either for a relation in which only a + * tiny fraction of the blocks have been modified or for a relation in + * which nearly every block has been modified, and we want a + * space-efficient representation in both cases. To accomplish this, + * we divide the relation into chunks of 2^16 blocks and choose between + * an array representation and a bitmap representation for each chunk. + * + * When the number of modified blocks in a given chunk is small, we + * essentially store an array of block numbers, but we need not store the + * entire block number: instead, we store each block number as a 2-byte + * offset from the start of the chunk. + * + * When the number of modified blocks in a given chunk is large, we switch + * to a bitmap representation. + * + * These same basic representational choices are used both when a block + * reference table is stored in memory and when it is serialized to disk. + * + * In the in-memory representation, we initially allocate each chunk with + * space for a number of entries given by INITIAL_ENTRIES_PER_CHUNK and + * increase that as necessary until we reach MAX_ENTRIES_PER_CHUNK. + * Any chunk whose allocated size reaches MAX_ENTRIES_PER_CHUNK is converted + * to a bitmap, and thus never needs to grow further. + */ +#define BLOCKS_PER_CHUNK (1 << 16) +#define BLOCKS_PER_ENTRY (BITS_PER_BYTE * sizeof(uint16)) +#define MAX_ENTRIES_PER_CHUNK (BLOCKS_PER_CHUNK / BLOCKS_PER_ENTRY) +#define INITIAL_ENTRIES_PER_CHUNK 16 +typedef uint16 *BlockRefTableChunk; + +/* + * State for one relation fork. + * + * 'rlocator' and 'forknum' identify the relation fork to which this entry + * pertains. + * + * 'limit_block' is the shortest known length of the relation in blocks + * within the LSN range covered by a particular block reference table. + * It should be set to 0 if the relation fork is created or dropped. If the + * relation fork is truncated, it should be set to the number of blocks that + * remain after truncation. + * + * 'nchunks' is the allocated length of each of the three arrays that follow. + * We can only represent the status of block numbers less than nchunks * + * BLOCKS_PER_CHUNK. + * + * 'chunk_size' is an array storing the allocated size of each chunk. + * + * 'chunk_usage' is an array storing the number of elements used in each + * chunk. If that value is less than MAX_ENTRIES_PER_CHUNK, the corresonding + * chunk is used as an array; else the corresponding chunk is used as a bitmap. + * When used as a bitmap, the least significant bit of the first array element + * is the status of the lowest-numbered block covered by this chunk. + * + * 'chunk_data' is the array of chunks. + */ +struct BlockRefTableEntry +{ + BlockRefTableKey key; + BlockNumber limit_block; + char status; + uint32 nchunks; + uint16 *chunk_size; + uint16 *chunk_usage; + BlockRefTableChunk *chunk_data; +}; + +/* Declare and define a hash table over type BlockRefTableEntry. */ +#define SH_PREFIX blockreftable +#define SH_ELEMENT_TYPE BlockRefTableEntry +#define SH_KEY_TYPE BlockRefTableKey +#define SH_KEY key +#define SH_HASH_KEY(tb, key) \ + hash_bytes((const unsigned char *) &key, sizeof(BlockRefTableKey)) +#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(BlockRefTableKey)) == 0) +#define SH_SCOPE static inline +#ifdef FRONTEND +#define SH_RAW_ALLOCATOR pg_malloc0 +#endif +#define SH_DEFINE +#define SH_DECLARE +#include "lib/simplehash.h" + +/* + * A block reference table is basically just the hash table, but we don't + * want to expose that to outside callers. + * + * We keep track of the memory context in use explicitly too, so that it's + * easy to place all of our allocations in the same context. + */ +struct BlockRefTable +{ + blockreftable_hash *hash; +#ifndef FRONTEND + MemoryContext mcxt; +#endif +}; + +/* + * On-disk serialization format for block reference table entries. + */ +typedef struct BlockRefTableSerializedEntry +{ + RelFileLocator rlocator; + ForkNumber forknum; + BlockNumber limit_block; + uint32 nchunks; +} BlockRefTableSerializedEntry; + +/* + * Buffer size, so that we avoid doing many small I/Os. + */ +#define BUFSIZE 65536 + +/* + * Ad-hoc buffer for file I/O. + */ +typedef struct BlockRefTableBuffer +{ + io_callback_fn io_callback; + void *io_callback_arg; + char data[BUFSIZE]; + int used; + int cursor; + pg_crc32c crc; +} BlockRefTableBuffer; + +/* + * State for keeping track of progress while incrementally reading a block + * table reference file from disk. + * + * total_chunks means the number of chunks for the RelFileLocator/ForkNumber + * combination that is curently being read, and consumed_chunks is the number + * of those that have been read. (We always read all the information for + * a single chunk at one time, so we don't need to be able to represent the + * state where a chunk has been partially read.) + * + * chunk_size is the array of chunk sizes. The length is given by total_chunks. + * + * chunk_data holds the current chunk. + * + * chunk_position helps us figure out how much progress we've made in returning + * the block numbers for the current chunk to the caller. If the chunk is a + * bitmap, it's the number of bits we've scanned; otherwise, it's the number + * of chunk entries we've scanned. + */ +struct BlockRefTableReader +{ + BlockRefTableBuffer buffer; + char *error_filename; + report_error_fn error_callback; + void *error_callback_arg; + uint32 total_chunks; + uint32 consumed_chunks; + uint16 *chunk_size; + uint16 chunk_data[MAX_ENTRIES_PER_CHUNK]; + uint32 chunk_position; +}; + +/* + * State for keeping track of progress while incrementally writing a block + * reference table file to disk. + */ +struct BlockRefTableWriter +{ + BlockRefTableBuffer buffer; +}; + +/* Function prototypes. */ +static int BlockRefTableComparator(const void *a, const void *b); +static void BlockRefTableFlush(BlockRefTableBuffer *buffer); +static void BlockRefTableRead(BlockRefTableReader *reader, void *data, + int length); +static void BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data, + int length); +static void BlockRefTableFileTerminate(BlockRefTableBuffer *buffer); + +/* + * Create an empty block reference table. + */ +BlockRefTable * +CreateEmptyBlockRefTable(void) +{ + BlockRefTable *brtab = palloc(sizeof(BlockRefTable)); + + /* + * Even completely empty database has a few hundred relation forks, so it + * seems best to size the hash on the assumption that we're going to have + * at least a few thousand entries. + */ +#ifdef FRONTEND + brtab->hash = blockreftable_create(4096, NULL); +#else + brtab->mcxt = CurrentMemoryContext; + brtab->hash = blockreftable_create(brtab->mcxt, 4096, NULL); +#endif + + return brtab; +} + +/* + * Set the "limit block" for a relation fork and forget any modified blocks + * with equal or higher block numbers. + * + * The "limit block" is the shortest known length of the relation within the + * range of WAL records covered by this block reference table. + */ +void +BlockRefTableSetLimitBlock(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber limit_block) +{ + BlockRefTableEntry *brtentry; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + bool found; + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + brtentry = blockreftable_insert(brtab->hash, key, &found); + + if (!found) + { + /* + * We have no existing data about this relation fork, so just record + * the limit_block value supplied by the caller, and make sure other + * parts of the entry are properly initialized. + */ + brtentry->limit_block = limit_block; + brtentry->nchunks = 0; + brtentry->chunk_size = NULL; + brtentry->chunk_usage = NULL; + brtentry->chunk_data = NULL; + return; + } + + BlockRefTableEntrySetLimitBlock(brtentry, limit_block); +} + +/* + * Mark a block in a given relation fork as known to have been modified. + */ +void +BlockRefTableMarkBlockModified(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber blknum) +{ + BlockRefTableEntry *brtentry; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + bool found; +#ifndef FRONTEND + MemoryContext oldcontext = MemoryContextSwitchTo(brtab->mcxt); +#endif + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + brtentry = blockreftable_insert(brtab->hash, key, &found); + + if (!found) + { + /* + * We want to set the initial limit block value to something higher + * than any legal block number. InvalidBlockNumber fits the bill. + */ + brtentry->limit_block = InvalidBlockNumber; + brtentry->nchunks = 0; + brtentry->chunk_size = NULL; + brtentry->chunk_usage = NULL; + brtentry->chunk_data = NULL; + } + + BlockRefTableEntryMarkBlockModified(brtentry, forknum, blknum); + +#ifndef FRONTEND + MemoryContextSwitchTo(oldcontext); +#endif +} + +/* + * Get an entry from a block reference table. + * + * If the entry does not exist, this function returns NULL. Otherwise, it + * returns the entry and sets *limit_block to the value from the entry. + */ +BlockRefTableEntry * +BlockRefTableGetEntry(BlockRefTable *brtab, const RelFileLocator *rlocator, + ForkNumber forknum, BlockNumber *limit_block) +{ + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + BlockRefTableEntry *entry; + + Assert(limit_block != NULL); + + memcpy(&key.rlocator, rlocator, sizeof(RelFileLocator)); + key.forknum = forknum; + entry = blockreftable_lookup(brtab->hash, key); + + if (entry != NULL) + *limit_block = entry->limit_block; + + return entry; +} + +/* + * Get block numbers from a table entry. + * + * 'blocks' must point to enough space to hold at least 'nblocks' block + * numbers, and any block numbers we manage to get will be written there. + * The return value is the number of block numbers actually written. + * + * We do not return block numbers unless they are greater than or equal to + * start_blkno and strictly less than stop_blkno. + */ +int +BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry, + BlockNumber start_blkno, + BlockNumber stop_blkno, + BlockNumber *blocks, + int nblocks) +{ + uint32 start_chunkno; + uint32 stop_chunkno; + uint32 chunkno; + int nresults = 0; + + Assert(entry != NULL); + + /* + * Figure out which chunks could potentially contain blocks of interest. + * + * We need to be careful about overflow here, because stop_blkno could be + * InvalidBlockNumber or something very close to it. + */ + start_chunkno = start_blkno / BLOCKS_PER_CHUNK; + stop_chunkno = stop_blkno / BLOCKS_PER_CHUNK; + if ((stop_blkno % BLOCKS_PER_CHUNK) != 0) + ++stop_chunkno; + if (stop_chunkno > entry->nchunks) + stop_chunkno = entry->nchunks; + + /* + * Loop over chunks. + */ + for (chunkno = start_chunkno; chunkno < stop_chunkno; ++chunkno) + { + uint16 chunk_usage = entry->chunk_usage[chunkno]; + BlockRefTableChunk chunk_data = entry->chunk_data[chunkno]; + unsigned start_offset = 0; + unsigned stop_offset = BLOCKS_PER_CHUNK; + + /* + * If the start and/or stop block number falls within this chunk, the + * whole chunk may not be of interest. Figure out which portion we + * care about, if it's not the whole thing. + */ + if (chunkno == start_chunkno) + start_offset = start_blkno % BLOCKS_PER_CHUNK; + if (chunkno == stop_chunkno - 1) + stop_offset = stop_blkno % BLOCKS_PER_CHUNK; + + /* + * Handling differs depending on whether this is an array of offsets + * or a bitmap. + */ + if (chunk_usage == MAX_ENTRIES_PER_CHUNK) + { + unsigned i; + + /* It's a bitmap, so test every relevant bit. */ + for (i = start_offset; i < stop_offset; ++i) + { + uint16 w = chunk_data[i / BLOCKS_PER_ENTRY]; + + if ((w & (1 << (i % BLOCKS_PER_ENTRY))) != 0) + { + BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + i; + + blocks[nresults++] = blkno; + + /* Early exit if we run out of output space. */ + if (nresults == nblocks) + return nresults; + } + } + } + else + { + unsigned i; + + /* It's an array of offsets, so check each one. */ + for (i = 0; i < chunk_usage; ++i) + { + uint16 offset = chunk_data[i]; + + if (offset >= start_offset && offset < stop_offset) + { + BlockNumber blkno = chunkno * BLOCKS_PER_CHUNK + offset; + + blocks[nresults++] = blkno; + + /* Early exit if we run out of output space. */ + if (nresults == nblocks) + return nresults; + } + } + } + } + + return nresults; +} + +/* + * Serialize a block reference table to a file. + */ +void +WriteBlockRefTable(BlockRefTable *brtab, + io_callback_fn write_callback, + void *write_callback_arg) +{ + BlockRefTableSerializedEntry *sdata = NULL; + BlockRefTableBuffer buffer; + uint32 magic = BLOCKREFTABLE_MAGIC; + + /* Prepare buffer. */ + memset(&buffer, 0, sizeof(BlockRefTableBuffer)); + buffer.io_callback = write_callback; + buffer.io_callback_arg = write_callback_arg; + INIT_CRC32C(buffer.crc); + + /* Write magic number. */ + BlockRefTableWrite(&buffer, &magic, sizeof(uint32)); + + /* Write the entries, assuming there are some. */ + if (brtab->hash->members > 0) + { + unsigned i = 0; + blockreftable_iterator it; + BlockRefTableEntry *brtentry; + + /* Extract entries into serializable format and sort them. */ + sdata = + palloc(brtab->hash->members * sizeof(BlockRefTableSerializedEntry)); + blockreftable_start_iterate(brtab->hash, &it); + while ((brtentry = blockreftable_iterate(brtab->hash, &it)) != NULL) + { + BlockRefTableSerializedEntry *sentry = &sdata[i++]; + + sentry->rlocator = brtentry->key.rlocator; + sentry->forknum = brtentry->key.forknum; + sentry->limit_block = brtentry->limit_block; + sentry->nchunks = brtentry->nchunks; + + /* trim trailing zero entries */ + while (sentry->nchunks > 0 && + brtentry->chunk_usage[sentry->nchunks - 1] == 0) + sentry->nchunks--; + } + Assert(i == brtab->hash->members); + qsort(sdata, i, sizeof(BlockRefTableSerializedEntry), + BlockRefTableComparator); + + /* Loop over entries in sorted order and serialize each one. */ + for (i = 0; i < brtab->hash->members; ++i) + { + BlockRefTableSerializedEntry *sentry = &sdata[i]; + BlockRefTableKey key = {0}; /* make sure any padding is zero */ + unsigned j; + + /* Write the serialized entry itself. */ + BlockRefTableWrite(&buffer, sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* Look up the original entry so we can access the chunks. */ + memcpy(&key.rlocator, &sentry->rlocator, sizeof(RelFileLocator)); + key.forknum = sentry->forknum; + brtentry = blockreftable_lookup(brtab->hash, key); + Assert(brtentry != NULL); + + /* Write the untruncated portion of the chunk length array. */ + if (sentry->nchunks != 0) + BlockRefTableWrite(&buffer, brtentry->chunk_usage, + sentry->nchunks * sizeof(uint16)); + + /* Write the contents of each chunk. */ + for (j = 0; j < brtentry->nchunks; ++j) + { + if (brtentry->chunk_usage[j] == 0) + continue; + BlockRefTableWrite(&buffer, brtentry->chunk_data[j], + brtentry->chunk_usage[j] * sizeof(uint16)); + } + } + } + + /* Write out appropriate terminator and CRC and flush buffer. */ + BlockRefTableFileTerminate(&buffer); +} + +/* + * Prepare to incrementally read a block reference table file. + * + * 'read_callback' is a function that can be called to read data from the + * underlying file (or other data source) into our internal buffer. + * + * 'read_callback_arg' is an opaque argument to be passed to read_callback. + * + * 'error_filename' is the filename that should be included in error messages + * if the file is found to be malformed. The value is not copied, so the + * caller should ensure that it remains valid until done with this + * BlockRefTableReader. + * + * 'error_callback' is a function to be called if the file is found to be + * malformed. This is not used for I/O errors, which must be handled internally + * by read_callback. + * + * 'error_callback_arg' is an opaque arguent to be passed to error_callback. + */ +BlockRefTableReader * +CreateBlockRefTableReader(io_callback_fn read_callback, + void *read_callback_arg, + char *error_filename, + report_error_fn error_callback, + void *error_callback_arg) +{ + BlockRefTableReader *reader; + uint32 magic; + + /* Initialize data structure. */ + reader = palloc0(sizeof(BlockRefTableReader)); + reader->buffer.io_callback = read_callback; + reader->buffer.io_callback_arg = read_callback_arg; + reader->error_filename = error_filename; + reader->error_callback = error_callback; + reader->error_callback_arg = error_callback_arg; + INIT_CRC32C(reader->buffer.crc); + + /* Verify magic number. */ + BlockRefTableRead(reader, &magic, sizeof(uint32)); + if (magic != BLOCKREFTABLE_MAGIC) + error_callback(error_callback_arg, + "file \"%s\" has wrong magic number: expected %u, found %u", + error_filename, + BLOCKREFTABLE_MAGIC, magic); + + return reader; +} + +/* + * Read next relation fork covered by this block reference table file. + * + * After calling this function, you must call BlockRefTableReaderGetBlocks + * until it returns 0 before calling it again. + */ +bool +BlockRefTableReaderNextRelation(BlockRefTableReader *reader, + RelFileLocator *rlocator, + ForkNumber *forknum, + BlockNumber *limit_block) +{ + BlockRefTableSerializedEntry sentry; + BlockRefTableSerializedEntry zentry = {{0}}; + + /* + * Sanity check: caller must read all blocks from all chunks before moving + * on to the next relation. + */ + Assert(reader->total_chunks == reader->consumed_chunks); + + /* Read serialized entry. */ + BlockRefTableRead(reader, &sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* + * If we just read the sentinel entry indicating that we've reached the + * end, read and check the CRC. + */ + if (memcmp(&sentry, &zentry, sizeof(BlockRefTableSerializedEntry)) == 0) + { + pg_crc32c expected_crc; + pg_crc32c actual_crc; + + /* + * We want to know the CRC of the file excluding the 4-byte CRC + * itself, so copy the current value of the CRC accumulator before + * reading those bytes, and use the copy to finalize the calculation. + */ + expected_crc = reader->buffer.crc; + FIN_CRC32C(expected_crc); + + /* Now we can read the actual value. */ + BlockRefTableRead(reader, &actual_crc, sizeof(pg_crc32c)); + + /* Throw an error if there is a mismatch. */ + if (!EQ_CRC32C(expected_crc, actual_crc)) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" has wrong checksum: expected %08X, found %08X", + reader->error_filename, expected_crc, actual_crc); + + return false; + } + + /* Read chunk size array. */ + if (reader->chunk_size != NULL) + pfree(reader->chunk_size); + reader->chunk_size = palloc(sentry.nchunks * sizeof(uint16)); + BlockRefTableRead(reader, reader->chunk_size, + sentry.nchunks * sizeof(uint16)); + + /* Set up for chunk scan. */ + reader->total_chunks = sentry.nchunks; + reader->consumed_chunks = 0; + + /* Return data to caller. */ + memcpy(rlocator, &sentry.rlocator, sizeof(RelFileLocator)); + *forknum = sentry.forknum; + *limit_block = sentry.limit_block; + return true; +} + +/* + * Get modified blocks associated with the relation fork returned by + * the most recent call to BlockRefTableReaderNextRelation. + * + * On return, block numbers will be written into the 'blocks' array, whose + * length should be passed via 'nblocks'. The return value is the number of + * entries actually written into the 'blocks' array, which may be less than + * 'nblocks' if we run out of modified blocks in the relation fork before + * we run out of room in the array. + */ +unsigned +BlockRefTableReaderGetBlocks(BlockRefTableReader *reader, + BlockNumber *blocks, + int nblocks) +{ + unsigned blocks_found = 0; + + /* Must provide space for at least one block number to be returned. */ + Assert(nblocks > 0); + + /* Loop collecting blocks to return to caller. */ + for (;;) + { + uint16 next_chunk_size; + + /* + * If we've read at least one chunk, maybe it contains some block + * numbers that could satisfy caller's request. + */ + if (reader->consumed_chunks > 0) + { + uint32 chunkno = reader->consumed_chunks - 1; + uint16 chunk_size = reader->chunk_size[chunkno]; + + if (chunk_size == MAX_ENTRIES_PER_CHUNK) + { + /* Bitmap format, so search for bits that are set. */ + while (reader->chunk_position < BLOCKS_PER_CHUNK && + blocks_found < nblocks) + { + uint16 chunkoffset = reader->chunk_position; + uint16 w; + + w = reader->chunk_data[chunkoffset / BLOCKS_PER_ENTRY]; + if ((w & (1u << (chunkoffset % BLOCKS_PER_ENTRY))) != 0) + blocks[blocks_found++] = + chunkno * BLOCKS_PER_CHUNK + chunkoffset; + ++reader->chunk_position; + } + } + else + { + /* Not in bitmap format, so each entry is a 2-byte offset. */ + while (reader->chunk_position < chunk_size && + blocks_found < nblocks) + { + blocks[blocks_found++] = chunkno * BLOCKS_PER_CHUNK + + reader->chunk_data[reader->chunk_position]; + ++reader->chunk_position; + } + } + } + + /* We found enough blocks, so we're done. */ + if (blocks_found >= nblocks) + break; + + /* + * We didn't find enough blocks, so we must need the next chunk. If + * there are none left, though, then we're done anyway. + */ + if (reader->consumed_chunks == reader->total_chunks) + break; + + /* + * Read data for next chunk and reset scan position to beginning of + * chunk. Note that the next chunk might be empty, in which case we + * consume the chunk without actually consuming any bytes from the + * underlying file. + */ + next_chunk_size = reader->chunk_size[reader->consumed_chunks]; + if (next_chunk_size > 0) + BlockRefTableRead(reader, reader->chunk_data, + next_chunk_size * sizeof(uint16)); + ++reader->consumed_chunks; + reader->chunk_position = 0; + } + + return blocks_found; +} + +/* + * Release memory used while reading a block reference table from a file. + */ +void +DestroyBlockRefTableReader(BlockRefTableReader *reader) +{ + if (reader->chunk_size != NULL) + { + pfree(reader->chunk_size); + reader->chunk_size = NULL; + } + pfree(reader); +} + +/* + * Prepare to write a block reference table file incrementally. + * + * Caller must be able to supply BlockRefTableEntry objects sorted in the + * appropriate order. + */ +BlockRefTableWriter * +CreateBlockRefTableWriter(io_callback_fn write_callback, + void *write_callback_arg) +{ + BlockRefTableWriter *writer; + uint32 magic = BLOCKREFTABLE_MAGIC; + + /* Prepare buffer and CRC check and save callbacks. */ + writer = palloc0(sizeof(BlockRefTableWriter)); + writer->buffer.io_callback = write_callback; + writer->buffer.io_callback_arg = write_callback_arg; + INIT_CRC32C(writer->buffer.crc); + + /* Write magic number. */ + BlockRefTableWrite(&writer->buffer, &magic, sizeof(uint32)); + + return writer; +} + +/* + * Append one entry to a block reference table file. + * + * Note that entries must be written in the proper order, that is, sorted by + * tablespace, then database, then relfilenumber, then fork number. Caller + * is responsible for supplying data in the correct order. If that seems hard, + * use an in-memory BlockRefTable instead. + */ +void +BlockRefTableWriteEntry(BlockRefTableWriter *writer, BlockRefTableEntry *entry) +{ + BlockRefTableSerializedEntry sentry; + unsigned j; + + /* Convert to serialized entry format. */ + sentry.rlocator = entry->key.rlocator; + sentry.forknum = entry->key.forknum; + sentry.limit_block = entry->limit_block; + sentry.nchunks = entry->nchunks; + + /* Trim trailing zero entries. */ + while (sentry.nchunks > 0 && entry->chunk_usage[sentry.nchunks - 1] == 0) + sentry.nchunks--; + + /* Write the serialized entry itself. */ + BlockRefTableWrite(&writer->buffer, &sentry, + sizeof(BlockRefTableSerializedEntry)); + + /* Write the untruncated portion of the chunk length array. */ + if (sentry.nchunks != 0) + BlockRefTableWrite(&writer->buffer, entry->chunk_usage, + sentry.nchunks * sizeof(uint16)); + + /* Write the contents of each chunk. */ + for (j = 0; j < entry->nchunks; ++j) + { + if (entry->chunk_usage[j] == 0) + continue; + BlockRefTableWrite(&writer->buffer, entry->chunk_data[j], + entry->chunk_usage[j] * sizeof(uint16)); + } +} + +/* + * Finalize an incremental write of a block reference table file. + */ +void +DestroyBlockRefTableWriter(BlockRefTableWriter *writer) +{ + BlockRefTableFileTerminate(&writer->buffer); + pfree(writer); +} + +/* + * Allocate a standalone BlockRefTableEntry. + * + * When we're manipulating a full in-memory BlockRefTable, the entries are + * part of the hash table and are allocated by simplehash. This routine is + * used by callers that want to write out a BlockRefTable to a file without + * needing to store the whole thing in memory at once. + * + * Entries allocated by this function can be manipulated using the functions + * BlockRefTableEntrySetLimitBlock and BlockRefTableEntryMarkBlockModified + * and then written using BlockRefTableWriteEntry and freed using + * BlockRefTableFreeEntry. + */ +BlockRefTableEntry * +CreateBlockRefTableEntry(RelFileLocator rlocator, ForkNumber forknum) +{ + BlockRefTableEntry *entry = palloc0(sizeof(BlockRefTableEntry)); + + memcpy(&entry->key.rlocator, &rlocator, sizeof(RelFileLocator)); + entry->key.forknum = forknum; + entry->limit_block = InvalidBlockNumber; + + return entry; +} + +/* + * Update a BlockRefTableEntry with a new value for the "limit block" and + * forget any equal-or-higher-numbered modified blocks. + * + * The "limit block" is the shortest known length of the relation within the + * range of WAL records covered by this block reference table. + */ +void +BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry, + BlockNumber limit_block) +{ + unsigned chunkno; + unsigned limit_chunkno; + unsigned limit_chunkoffset; + BlockRefTableChunk limit_chunk; + + /* If we already have an equal or lower limit block, do nothing. */ + if (limit_block >= entry->limit_block) + return; + + /* Record the new limit block value. */ + entry->limit_block = limit_block; + + /* + * Figure out which chunk would store the state of the new limit block, + * and which offset within that chunk. + */ + limit_chunkno = limit_block / BLOCKS_PER_CHUNK; + limit_chunkoffset = limit_block % BLOCKS_PER_CHUNK; + + /* + * If the number of chunks is not large enough for any blocks with equal + * or higher block numbers to exist, then there is nothing further to do. + */ + if (limit_chunkno >= entry->nchunks) + return; + + /* Discard entire contents of any higher-numbered chunks. */ + for (chunkno = limit_chunkno + 1; chunkno < entry->nchunks; ++chunkno) + entry->chunk_usage[chunkno] = 0; + + /* + * Next, we need to discard any offsets within the chunk that would + * contain the limit_block. We must handle this differenly depending on + * whether the chunk that would contain limit_block is a bitmap or an + * array of offsets. + */ + limit_chunk = entry->chunk_data[limit_chunkno]; + if (entry->chunk_usage[limit_chunkno] == MAX_ENTRIES_PER_CHUNK) + { + unsigned chunkoffset; + + /* It's a bitmap. Unset bits. */ + for (chunkoffset = limit_chunkoffset; chunkoffset < BLOCKS_PER_CHUNK; + ++chunkoffset) + limit_chunk[chunkoffset / BLOCKS_PER_ENTRY] &= + ~(1 << (chunkoffset % BLOCKS_PER_ENTRY)); + } + else + { + unsigned i, + j = 0; + + /* It's an offset array. Filter out large offsets. */ + for (i = 0; i < entry->chunk_usage[limit_chunkno]; ++i) + { + Assert(j <= i); + if (limit_chunk[i] < limit_chunkoffset) + limit_chunk[j++] = limit_chunk[i]; + } + Assert(j <= entry->chunk_usage[limit_chunkno]); + entry->chunk_usage[limit_chunkno] = j; + } +} + +/* + * Mark a block in a given BlkRefTableEntry as known to have been modified. + */ +void +BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry, + ForkNumber forknum, + BlockNumber blknum) +{ + unsigned chunkno; + unsigned chunkoffset; + unsigned i; + + /* + * Which chunk should store the state of this block? And what is the + * offset of this block relative to the start of that chunk? + */ + chunkno = blknum / BLOCKS_PER_CHUNK; + chunkoffset = blknum % BLOCKS_PER_CHUNK; + + /* + * If 'nchunks' isn't big enough for us to be able to represent the state + * of this block, we need to enlarge our arrays. + */ + if (chunkno >= entry->nchunks) + { + unsigned max_chunks; + unsigned extra_chunks; + + /* + * New array size is a power of 2, at least 16, big enough so that + * chunkno will be a valid array index. + */ + max_chunks = Max(16, entry->nchunks); + while (max_chunks < chunkno + 1) + chunkno *= 2; + Assert(max_chunks > chunkno); + extra_chunks = max_chunks - entry->nchunks; + + if (entry->nchunks == 0) + { + entry->chunk_size = palloc0(sizeof(uint16) * max_chunks); + entry->chunk_usage = palloc0(sizeof(uint16) * max_chunks); + entry->chunk_data = + palloc0(sizeof(BlockRefTableChunk) * max_chunks); + } + else + { + entry->chunk_size = repalloc(entry->chunk_size, + sizeof(uint16) * max_chunks); + memset(&entry->chunk_size[entry->nchunks], 0, + extra_chunks * sizeof(uint16)); + entry->chunk_usage = repalloc(entry->chunk_usage, + sizeof(uint16) * max_chunks); + memset(&entry->chunk_usage[entry->nchunks], 0, + extra_chunks * sizeof(uint16)); + entry->chunk_data = repalloc(entry->chunk_data, + sizeof(BlockRefTableChunk) * max_chunks); + memset(&entry->chunk_data[entry->nchunks], 0, + extra_chunks * sizeof(BlockRefTableChunk)); + } + entry->nchunks = max_chunks; + } + + /* + * If the chunk that covers this block number doesn't exist yet, create it + * as an array and add the appropriate offset to it. We make it pretty + * small initially, because there might only be 1 or a few block + * references in this chunk and we don't want to use up too much memory. + */ + if (entry->chunk_size[chunkno] == 0) + { + entry->chunk_data[chunkno] = + palloc(sizeof(uint16) * INITIAL_ENTRIES_PER_CHUNK); + entry->chunk_size[chunkno] = INITIAL_ENTRIES_PER_CHUNK; + entry->chunk_data[chunkno][0] = chunkoffset; + entry->chunk_usage[chunkno] = 1; + return; + } + + /* + * If the number of entries in this chunk is already maximum, it must be a + * bitmap. Just set the appropriate bit. + */ + if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK) + { + BlockRefTableChunk chunk = entry->chunk_data[chunkno]; + + chunk[chunkoffset / BLOCKS_PER_ENTRY] |= + 1 << (chunkoffset % BLOCKS_PER_ENTRY); + return; + } + + /* + * There is an existing chunk and it's in array format. Let's find out + * whether it already has an entry for this block. If so, we do not need + * to do anything. + */ + for (i = 0; i < entry->chunk_usage[chunkno]; ++i) + { + if (entry->chunk_data[chunkno][i] == chunkoffset) + return; + } + + /* + * If the number of entries currently used is one less than the maximum, + * it's time to convert to bitmap format. + */ + if (entry->chunk_usage[chunkno] == MAX_ENTRIES_PER_CHUNK - 1) + { + BlockRefTableChunk newchunk; + unsigned j; + + /* Allocate a new chunk. */ + newchunk = palloc0(MAX_ENTRIES_PER_CHUNK * sizeof(uint16)); + + /* Set the bit for each existing entry. */ + for (j = 0; j < entry->chunk_usage[chunkno]; ++j) + { + unsigned coff = entry->chunk_data[chunkno][j]; + + newchunk[coff / BLOCKS_PER_ENTRY] |= + 1 << (coff % BLOCKS_PER_ENTRY); + } + + /* Set the bit for the new entry. */ + newchunk[chunkoffset / BLOCKS_PER_ENTRY] |= + 1 << (chunkoffset % BLOCKS_PER_ENTRY); + + /* Swap the new chunk into place and update metadata. */ + pfree(entry->chunk_data[chunkno]); + entry->chunk_data[chunkno] = newchunk; + entry->chunk_size[chunkno] = MAX_ENTRIES_PER_CHUNK; + entry->chunk_usage[chunkno] = MAX_ENTRIES_PER_CHUNK; + return; + } + + /* + * OK, we currently have an array, and we don't need to convert to a + * bitmap, but we do need to add a new element. If there's not enough + * room, we'll have to expand the array. + */ + if (entry->chunk_usage[chunkno] == entry->chunk_size[chunkno]) + { + unsigned newsize = entry->chunk_size[chunkno] * 2; + + Assert(newsize <= MAX_ENTRIES_PER_CHUNK); + entry->chunk_data[chunkno] = repalloc(entry->chunk_data[chunkno], + newsize * sizeof(uint16)); + entry->chunk_size[chunkno] = newsize; + } + + /* Now we can add the new entry. */ + entry->chunk_data[chunkno][entry->chunk_usage[chunkno]] = + chunkoffset; + entry->chunk_usage[chunkno]++; +} + +/* + * Release memory for a BlockRefTablEntry that was created by + * CreateBlockRefTableEntry. + */ +void +BlockRefTableFreeEntry(BlockRefTableEntry *entry) +{ + if (entry->chunk_size != NULL) + { + pfree(entry->chunk_size); + entry->chunk_size = NULL; + } + + if (entry->chunk_usage != NULL) + { + pfree(entry->chunk_usage); + entry->chunk_usage = NULL; + } + + if (entry->chunk_data != NULL) + { + pfree(entry->chunk_data); + entry->chunk_data = NULL; + } + + pfree(entry); +} + +/* + * Comparator for BlockRefTableSerializedEntry objects. + * + * We make the tablespace OID the first column of the sort key to match + * the on-disk tree structure. + */ +static int +BlockRefTableComparator(const void *a, const void *b) +{ + const BlockRefTableSerializedEntry *sa = a; + const BlockRefTableSerializedEntry *sb = b; + + if (sa->rlocator.spcOid > sb->rlocator.spcOid) + return 1; + if (sa->rlocator.spcOid < sb->rlocator.spcOid) + return -1; + + if (sa->rlocator.dbOid > sb->rlocator.dbOid) + return 1; + if (sa->rlocator.dbOid < sb->rlocator.dbOid) + return -1; + + if (sa->rlocator.relNumber > sb->rlocator.relNumber) + return 1; + if (sa->rlocator.relNumber < sb->rlocator.relNumber) + return -1; + + if (sa->forknum > sb->forknum) + return 1; + if (sa->forknum < sb->forknum) + return -1; + + return 0; +} + +/* + * Flush any buffered data out of a BlockRefTableBuffer. + */ +static void +BlockRefTableFlush(BlockRefTableBuffer *buffer) +{ + buffer->io_callback(buffer->io_callback_arg, buffer->data, buffer->used); + buffer->used = 0; +} + +/* + * Read data from a BlockRefTableBuffer, and update the running CRC + * calculation for the returned data (but not any data that we may have + * buffered but not yet actually returned). + */ +static void +BlockRefTableRead(BlockRefTableReader *reader, void *data, int length) +{ + BlockRefTableBuffer *buffer = &reader->buffer; + + /* Loop until read is fully satisfied. */ + while (length > 0) + { + if (buffer->cursor < buffer->used) + { + /* + * If any buffered data is available, use that to satisfy as much + * of the request as possible. + */ + int bytes_to_copy = Min(length, buffer->used - buffer->cursor); + + memcpy(data, &buffer->data[buffer->cursor], bytes_to_copy); + COMP_CRC32C(buffer->crc, &buffer->data[buffer->cursor], + bytes_to_copy); + buffer->cursor += bytes_to_copy; + data = ((char *) data) + bytes_to_copy; + length -= bytes_to_copy; + } + else if (length >= BUFSIZE) + { + /* + * If the request length is long, read directly into caller's + * buffer. + */ + int bytes_read; + + bytes_read = buffer->io_callback(buffer->io_callback_arg, + data, length); + COMP_CRC32C(buffer->crc, data, bytes_read); + data = ((char *) data) + bytes_read; + length -= bytes_read; + + /* If we didn't get anything, that's bad. */ + if (bytes_read == 0) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" ends unexpectedly", + reader->error_filename); + } + else + { + /* + * Refill our buffer. + */ + buffer->used = buffer->io_callback(buffer->io_callback_arg, + buffer->data, BUFSIZE); + buffer->cursor = 0; + + /* If we didn't get anything, that's bad. */ + if (buffer->used == 0) + reader->error_callback(reader->error_callback_arg, + "file \"%s\" ends unexpectedly", + reader->error_filename); + } + } +} + +/* + * Supply data to a BlockRefTableBuffer for write to the underlying File, + * and update the running CRC calculation for that data. + */ +static void +BlockRefTableWrite(BlockRefTableBuffer *buffer, void *data, int length) +{ + /* Update running CRC calculation. */ + COMP_CRC32C(buffer->crc, data, length); + + /* If the new data can't fit into the buffer, flush the buffer. */ + if (buffer->used + length > BUFSIZE) + { + buffer->io_callback(buffer->io_callback_arg, buffer->data, + buffer->used); + buffer->used = 0; + } + + /* If the new data would fill the buffer, or more, write it directly. */ + if (length >= BUFSIZE) + { + buffer->io_callback(buffer->io_callback_arg, data, length); + return; + } + + /* Otherwise, copy the new data into the buffer. */ + memcpy(&buffer->data[buffer->used], data, length); + buffer->used += length; + Assert(buffer->used <= BUFSIZE); +} + +/* + * Generate the sentinel and CRC required at the end of a block reference + * table file and flush them out of our internal buffer. + */ +static void +BlockRefTableFileTerminate(BlockRefTableBuffer *buffer) +{ + BlockRefTableSerializedEntry zentry = {{0}}; + pg_crc32c crc; + + /* Write a sentinel indicating that there are no more entries. */ + BlockRefTableWrite(buffer, &zentry, + sizeof(BlockRefTableSerializedEntry)); + + /* + * Writing the checksum will perturb the ongoing checksum calculation, so + * copy the state first and finalize the computation using the copy. + */ + crc = buffer->crc; + FIN_CRC32C(crc); + BlockRefTableWrite(buffer, &crc, sizeof(pg_crc32c)); + + /* Flush any leftover data out of our buffer. */ + BlockRefTableFlush(buffer); +} diff --git a/src/common/meson.build b/src/common/meson.build index 84eb100f04..12fd43e87f 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -4,6 +4,7 @@ common_sources = files( 'archive.c', 'base64.c', 'binaryheap.c', + 'blkreftable.c', 'checksum_helper.c', 'compression.c', 'controldata_utils.c', diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a14126d164..da71580364 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -209,6 +209,7 @@ extern int XLogFileOpen(XLogSegNo segno, TimeLineID tli); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); extern XLogSegNo XLogGetLastRemovedSegno(void); +extern XLogSegNo XLogGetOldestSegno(TimeLineID tli); extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); diff --git a/src/include/backup/walsummary.h b/src/include/backup/walsummary.h new file mode 100644 index 0000000000..8e3dc7b837 --- /dev/null +++ b/src/include/backup/walsummary.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * walsummary.h + * WAL summary management + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/backup/walsummary.h + * + *------------------------------------------------------------------------- + */ +#ifndef WALSUMMARY_H +#define WALSUMMARY_H + +#include + +#include "access/xlogdefs.h" +#include "nodes/pg_list.h" +#include "storage/fd.h" + +typedef struct WalSummaryIO +{ + File file; + off_t filepos; +} WalSummaryIO; + +typedef struct WalSummaryFile +{ + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimeLineID tli; +} WalSummaryFile; + +extern List *GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, + XLogRecPtr end_lsn); +extern List *FilterWalSummaries(List *wslist, TimeLineID tli, + XLogRecPtr start_lsn, XLogRecPtr end_lsn); +extern bool WalSummariesAreComplete(List *wslist, + XLogRecPtr start_lsn, XLogRecPtr end_lsn, + XLogRecPtr *missing_lsn); +extern File OpenWalSummaryFile(WalSummaryFile *ws, bool missing_ok); +extern void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, + time_t cutoff_time); + +extern int ReadWalSummary(void *wal_summary_io, void *data, int length); +extern int WriteWalSummary(void *wal_summary_io, void *data, int length); +extern void ReportWalSummaryError(void *callback_arg, char *fmt,...) pg_attribute_printf(2, 3); + +#endif /* WALSUMMARY_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 77e8b13764..916c8ec8d0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12099,4 +12099,23 @@ proname => 'any_value_transfn', prorettype => 'anyelement', proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' }, +{ oid => '8436', + descr => 'list of available WAL summary files', + proname => 'pg_available_wal_summaries', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 's', + prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,pg_lsn,pg_lsn}', + proargmodes => '{o,o,o}', + proargnames => '{tli,start_lsn,end_lsn}', + prosrc => 'pg_available_wal_summaries' }, +{ oid => '8437', + descr => 'contents of a WAL sumamry file', + proname => 'pg_wal_summary_contents', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 's', + prorettype => 'record', proargtypes => 'int8 pg_lsn pg_lsn', + proallargtypes => '{int8,pg_lsn,pg_lsn,oid,oid,oid,int2,int8,bool}', + proargmodes => '{i,i,i,o,o,o,o,o,o}', + proargnames => '{tli,start_lsn,end_lsn,relfilenode,reltablespace,reldatabase,relforknumber,relblocknumber,is_limit_block}', + prosrc => 'pg_wal_summary_contents' }, + ] diff --git a/src/include/common/blkreftable.h b/src/include/common/blkreftable.h new file mode 100644 index 0000000000..5141f3acd5 --- /dev/null +++ b/src/include/common/blkreftable.h @@ -0,0 +1,116 @@ +/*------------------------------------------------------------------------- + * + * blkreftable.h + * Block reference tables. + * + * A block reference table is used to keep track of which blocks have + * been modified by WAL records within a certain LSN range. + * + * For each relation fork, there is a "limit block number". All existing + * blocks greater than or equal to the limit block number must be + * considered modified; for those less than the limit block number, + * we maintain a bitmap. When a relation fork is created or dropped, + * the limit block number should be set to 0. When it's truncated, + * the limit block number should be set to the length in blocks to + * which it was truncated. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/common/blkreftable.h + * + *------------------------------------------------------------------------- + */ +#ifndef BLKREFTABLE_H +#define BLKREFTABLE_H + +#include "storage/block.h" +#include "storage/relfilelocator.h" + +/* Magic number for serialization file format. */ +#define BLOCKREFTABLE_MAGIC 0x652b137b + +typedef struct BlockRefTable BlockRefTable; +typedef struct BlockRefTableEntry BlockRefTableEntry; +typedef struct BlockRefTableReader BlockRefTableReader; +typedef struct BlockRefTableWriter BlockRefTableWriter; + +/* + * The return value of io_callback_fn should be the number of bytes read + * or written. If an error occurs, the functions should report it and + * not return. When used as a write callback, short writes should be retried + * or treated as errors, so that if the callback returns, the return value + * is always the request length. + * + * report_error_fn should not return. + */ +typedef int (*io_callback_fn) (void *callback_arg, void *data, int length); +typedef void (*report_error_fn) (void *calblack_arg, char *msg,...) pg_attribute_printf(2, 3); + + +/* + * Functions for manipulating an entire in-memory block reference table. + */ +extern BlockRefTable *CreateEmptyBlockRefTable(void); +extern void BlockRefTableSetLimitBlock(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber limit_block); +extern void BlockRefTableMarkBlockModified(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber blknum); +extern void WriteBlockRefTable(BlockRefTable *brtab, + io_callback_fn write_callback, + void *write_callback_arg); + +extern BlockRefTableEntry *BlockRefTableGetEntry(BlockRefTable *brtab, + const RelFileLocator *rlocator, + ForkNumber forknum, + BlockNumber *limit_block); +extern int BlockRefTableEntryGetBlocks(BlockRefTableEntry *entry, + BlockNumber start_blkno, + BlockNumber stop_blkno, + BlockNumber *blocks, + int nblocks); + +/* + * Functions for reading a block reference table incrementally from disk. + */ +extern BlockRefTableReader *CreateBlockRefTableReader(io_callback_fn read_callback, + void *read_callback_arg, + char *error_filename, + report_error_fn error_callback, + void *error_callback_arg); +extern bool BlockRefTableReaderNextRelation(BlockRefTableReader *reader, + RelFileLocator *rlocator, + ForkNumber *forknum, + BlockNumber *limit_block); +extern unsigned BlockRefTableReaderGetBlocks(BlockRefTableReader *reader, + BlockNumber *blocks, + int nblocks); +extern void DestroyBlockRefTableReader(BlockRefTableReader *reader); + +/* + * Functions for writing a block reference table incrementally to disk. + * + * Note that entries must be written in the proper order, that is, sorted by + * database, then tablespace, then relfilenumber, then fork number. Caller + * is responsible for supplying data in the correct order. If that seems hard, + * use an in-memory BlockRefTable instead. + */ +extern BlockRefTableWriter *CreateBlockRefTableWriter(io_callback_fn write_callback, + void *write_callback_arg); +extern void BlockRefTableWriteEntry(BlockRefTableWriter *writer, + BlockRefTableEntry *entry); +extern void DestroyBlockRefTableWriter(BlockRefTableWriter *writer); + +extern BlockRefTableEntry *CreateBlockRefTableEntry(RelFileLocator rlocator, + ForkNumber forknum); +extern void BlockRefTableEntrySetLimitBlock(BlockRefTableEntry *entry, + BlockNumber limit_block); +extern void BlockRefTableEntryMarkBlockModified(BlockRefTableEntry *entry, + ForkNumber forknum, + BlockNumber blknum); +extern void BlockRefTableFreeEntry(BlockRefTableEntry *entry); + +#endif /* BLKREFTABLE_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 1043a4d782..74bc2f97cb 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -336,6 +336,7 @@ typedef enum BackendType B_STARTUP, B_WAL_RECEIVER, B_WAL_SENDER, + B_WAL_SUMMARIZER, B_WAL_WRITER, } BackendType; @@ -442,6 +443,7 @@ typedef enum CheckpointerProcess, WalWriterProcess, WalReceiverProcess, + WalSummarizerProcess, NUM_AUXPROCTYPES /* Must be last! */ } AuxProcType; @@ -454,6 +456,7 @@ extern PGDLLIMPORT AuxProcType MyAuxProcType; #define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess) #define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess) #define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess) +#define AmWalSummarizerProcess() (MyAuxProcType == WalSummarizerProcess) /***************************************************************************** diff --git a/src/include/postmaster/walsummarizer.h b/src/include/postmaster/walsummarizer.h new file mode 100644 index 0000000000..180d3f34b9 --- /dev/null +++ b/src/include/postmaster/walsummarizer.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * walsummarizer.h + * + * Header file for background WAL summarization process. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/postmaster/walsummarizer.h + * + *------------------------------------------------------------------------- + */ +#ifndef WALSUMMARIZER_H +#define WALSUMMARIZER_H + +#include "access/xlogdefs.h" + +extern bool summarize_wal; +extern int wal_summary_keep_time; + +extern Size WalSummarizerShmemSize(void); +extern void WalSummarizerShmemInit(void); +extern void WalSummarizerMain(void) pg_attribute_noreturn(); + +extern XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, + bool *lsn_is_exact, + bool reset_pending_lsn); +extern void SetWalSummarizerLatch(void); +extern XLogRecPtr WaitForWalSummarization(XLogRecPtr lsn, long timeout, + XLogRecPtr *pending_lsn); + +#endif diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 4b25961249..e87fd25d64 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -417,11 +417,12 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs; * We set aside some extra PGPROC structures for auxiliary processes, * ie things that aren't full-fledged backends but need shmem access. * - * Background writer, checkpointer, WAL writer and archiver run during normal - * operation. Startup process and WAL receiver also consume 2 slots, but WAL - * writer is launched only after startup has exited, so we only need 5 slots. + * Background writer, checkpointer, WAL writer, WAL summarizer, and archiver + * run during normal operation. Startup process and WAL receiver also consume + * 2 slots, but WAL writer is launched only after startup has exited, so we + * only need 6 slots. */ -#define NUM_AUXILIARY_PROCS 5 +#define NUM_AUXILIARY_PROCS 6 /* configurable options */ extern PGDLLIMPORT int DeadlockTimeout; diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 0c38255961..eaa8c46dda 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -72,6 +72,7 @@ enum config_group WAL_RECOVERY, WAL_ARCHIVE_RECOVERY, WAL_RECOVERY_TARGET, + WAL_SUMMARIZATION, REPLICATION_SENDING, REPLICATION_PRIMARY, REPLICATION_STANDBY, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ba41149b88..9390049314 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4012,3 +4012,14 @@ yyscan_t z_stream z_streamp zic_t +BlockRefTable +BlockRefTableBuffer +BlockRefTableEntry +BlockRefTableKey +BlockRefTableReader +BlockRefTableSerializedEntry +BlockRefTableWriter +SummarizerReadLocalXLogPrivate +WalSummarizerData +WalSummaryFile +WalSummaryIO