diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index b694b2883c..192959ebc1 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28284,6 +28284,119 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset the pause, the rate of WAL generation and available disk space. + + There are also procedures to control the progress of recovery. + They are shown in . + These procedures may be executed only during recovery. + + + + Recovery Control Procedures + + + + + Procedure + + + Description + + + + + + + + + pg_wal_replay_wait + + pg_wal_replay_wait ( + target_lsn pg_lsn, + timeout bigint DEFAULT 0) + void + + + If timeout is not specified or zero, this + procedure returns once WAL is replayed upto + target_lsn. + If the timeout is specified (in + milliseconds) and greater than zero, the procedure waits until the + server actually replays the WAL upto target_lsn or + until the given time has passed. On timeout, an error is emitted. + + + + +
+ + + pg_wal_replay_wait waits till + target_lsn to be replayed on standby. + That is, after this function execution, the value returned by + pg_last_wal_replay_lsn should be greater or equal + to the target_lsn value. This is useful to achieve + read-your-writes-consistency, while using async replica for reads and + primary for writes. In that case lsn of the last + modification should be stored on the client application side or the + connection pooler side. + + + + You can use pg_wal_replay_wait to wait for + the pg_lsn value. For example, an application could update + the movie table and get the lsn after + changes just made. This example uses pg_current_wal_insert_lsn + on primary server to get the lsn given that + synchronous_commit could be set to + off. + + +postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; +UPDATE 100 +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + + + Then an application could run pg_wal_replay_wait + with the lsn obtained from primary. After that the + changes made of primary should be guaranteed to be visible on replica. + + +postgres=# CALL pg_wal_replay_wait('0/306EE20'); +CALL +postgres=# SELECT * FROM movie WHERE genre = 'Drama'; + genre +------- +(0 rows) + + + It may also happen that target lsn is not achieved + within the timeout. In that case the error is thrown. + + +postgres=# CALL pg_wal_replay_wait('0/306EE20', 100); +ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 + + + + + + pg_wal_replay_wait can't be used within + the transaction, another procedure or function. Any of them holds a + snapshot, which could prevent the replay of WAL records. + + +postgres=# BEGIN; +BEGIN +postgres=*# CALL pg_wal_replay_wait('0/306EE20'); +ERROR: pg_wal_replay_wait() must be only called in non-atomic context +DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function. + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..1446639ea0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -66,6 +66,7 @@ #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "catalog/pg_database.h" +#include "commands/waitlsn.h" #include "common/controldata_utils.h" #include "common/file_utils.h" #include "executor/instrument.h" @@ -6040,6 +6041,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before achieving the target LSN. + */ + WaitLSNSetLatches(InvalidXLogRecPtr); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 29c5bec084..24ab1b2b21 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -43,6 +43,7 @@ #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" +#include "commands/waitlsn.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" @@ -1828,6 +1829,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSN && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSN->minLSN))) + WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index fe2bb50f46..a79bb80c95 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false) RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE; +CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0) + LANGUAGE internal AS 'pg_wal_replay_wait'; + CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes( IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', OUT lsn pg_lsn, OUT xid xid, OUT data text) diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 48f7348f91..cede90c3b9 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -61,6 +61,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + waitlsn.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index 6dd00a4abd..7549be5dc3 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -50,4 +50,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'waitlsn.c', ) diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c new file mode 100644 index 0000000000..6679378156 --- /dev/null +++ b/src/backend/commands/waitlsn.c @@ -0,0 +1,348 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.c + * Implements waiting for the given LSN, which is used in + * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/waitlsn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "pgstat.h" +#include "fmgr.h" +#include "access/transam.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_type.h" +#include "commands/waitlsn.h" +#include "executor/spi.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" +#include "utils/timestamp.h" +#include "utils/fmgrprotos.h" + +/* Add to / delete from shared memory array */ +static void addLSNWaiter(XLogRecPtr lsn); +static void deleteLSNWaiter(void); + +struct WaitLSNState *waitLSN = NULL; +static volatile sig_atomic_t haveShmemItem = false; + +/* + * Report the amount of shared memory space needed for WaitLSNState + */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, procInfos); + size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); + return size; +} + +/* Initialize the WaitLSNState in the shared memory */ +void +WaitLSNShmemInit(void) +{ + bool found; + + waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState", + WaitLSNShmemSize(), + &found); + if (!found) + { + SpinLockInit(&waitLSN->mutex); + waitLSN->numWaitedProcs = 0; + pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX); + } +} + +/* + * Add the information about the LSN waiter backend to the shared memory + * array. + */ +static void +addLSNWaiter(XLogRecPtr lsn) +{ + WaitLSNProcInfo cur; + int i; + + SpinLockAcquire(&waitLSN->mutex); + + cur.procnum = MyProcNumber; + cur.waitLSN = lsn; + + for (i = 0; i < waitLSN->numWaitedProcs; i++) + { + if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN) + { + WaitLSNProcInfo tmp; + + tmp = waitLSN->procInfos[i]; + waitLSN->procInfos[i] = cur; + cur = tmp; + } + } + waitLSN->procInfos[i] = cur; + waitLSN->numWaitedProcs++; + + pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); + SpinLockRelease(&waitLSN->mutex); +} + +/* + * Delete the information about the LSN waiter backend from the shared memory + * array. + */ +static void +deleteLSNWaiter(void) +{ + int i; + bool found = false; + + SpinLockAcquire(&waitLSN->mutex); + + for (i = 0; i < waitLSN->numWaitedProcs; i++) + { + if (waitLSN->procInfos[i].procnum == MyProcNumber) + found = true; + + if (found && i < waitLSN->numWaitedProcs - 1) + { + waitLSN->procInfos[i] = waitLSN->procInfos[i + 1]; + } + } + + if (!found) + { + SpinLockRelease(&waitLSN->mutex); + return; + } + waitLSN->numWaitedProcs--; + + if (waitLSN->numWaitedProcs != 0) + pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); + else + pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + + SpinLockRelease(&waitLSN->mutex); +} + +/* + * Set latches of LSN waiters whose LSN has been replayed. Set latches of all + * LSN waiters when InvalidXLogRecPtr is given. + */ +void +WaitLSNSetLatches(XLogRecPtr currentLSN) +{ + int i; + int *wakeUpProcNums; + int numWakeUpProcs; + + wakeUpProcNums = palloc(sizeof(int) * MaxBackends); + + SpinLockAcquire(&waitLSN->mutex); + + /* + * Remember processes, whose waited LSNs are already replayed. We should + * set their latches later after spinlock release. + */ + for (i = 0; i < waitLSN->numWaitedProcs; i++) + { + if (!XLogRecPtrIsInvalid(currentLSN) && + waitLSN->procInfos[i].waitLSN > currentLSN) + break; + + wakeUpProcNums[i] = waitLSN->procInfos[i].procnum; + } + + /* + * Immediately remove those processes from the shmem array. Otherwise, + * shmem array items will be here till corresponding processes wake up and + * delete themselves. + */ + numWakeUpProcs = i; + for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++) + waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs]; + waitLSN->numWaitedProcs -= numWakeUpProcs; + + if (waitLSN->numWaitedProcs != 0) + pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); + else + pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + + SpinLockRelease(&waitLSN->mutex); + + /* + * Set latches for processes, whose waited LSNs are already replayed. This + * involves spinlocks. So, we shouldn't do this under a spinlock. + */ + for (i = 0; i < numWakeUpProcs; i++) + { + PGPROC *backend; + + backend = GetPGProcByNumber(wakeUpProcNums[i]); + SetLatch(&backend->procLatch); + } + pfree(wakeUpProcNums); +} + +/* + * Delete our item from shmem array if any. + */ +void +WaitLSNCleanup(void) +{ + if (haveShmemItem) + deleteLSNWaiter(); +} + +/* + * Wait using MyLatch till the given LSN is replayed, the postmaster dies or + * timeout happens. + */ +void +WaitForLSN(XLogRecPtr targetLSN, int64 timeout) +{ + XLogRecPtr currentLSN; + TimestampTz endtime; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSN); + + /* Should be only called by a backend */ + Assert(MyBackendType == B_BACKEND); + + if (!RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for LSN can only be executed during recovery."))); + + /* If target LSN is already replayed, exit immediately */ + if (targetLSN <= GetXLogReplayRecPtr(NULL)) + return; + + if (timeout > 0) + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); + + addLSNWaiter(targetLSN); + haveShmemItem = true; + + for (;;) + { + int rc; + int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; + long delay_ms = 0; + + /* Check if the waited LSN has been replayed */ + currentLSN = GetXLogReplayRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + + /* Recheck that recovery is still in-progress */ + if (!RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.", + LSN_FORMAT_ARGS(targetLSN), + LSN_FORMAT_ARGS(currentLSN)))); + + if (timeout > 0) + { + delay_ms = (endtime - GetCurrentTimestamp()) / 1000; + latch_events |= WL_TIMEOUT; + if (delay_ms <= 0) + break; + } + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, latch_events, delay_ms, + WAIT_EVENT_WAIT_FOR_WAL_REPLAY); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + if (targetLSN > currentLSN) + { + deleteLSNWaiter(); + haveShmemItem = false; + ereport(ERROR, + (errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", + LSN_FORMAT_ARGS(targetLSN), + LSN_FORMAT_ARGS(currentLSN)))); + } + else + { + haveShmemItem = false; + } +} + +Datum +pg_wal_replay_wait(PG_FUNCTION_ARGS) +{ + XLogRecPtr target_lsn = PG_GETARG_LSN(0); + int64 timeout = PG_GETARG_INT64(1); + CallContext *context = (CallContext *) fcinfo->context; + + if (timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"timeout\" must not be negative"))); + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why + * pg_wal_replay_wait() is a procedure, not a function. + * + * At first, we check that pg_wal_replay_wait() is called in a non-atomic + * context. That is, a procedure call isn't wrapped into a transaction, + * another procedure call, or a function call. + * + * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic + * context, CallStmt is processed with a snapshot. Thankfully, we can pop + * this snapshot, because PortalRunUtility() can tolerate this. + */ + if (context->atomic) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_wal_replay_wait() must be only called in non-atomic context"), + errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function."))); + + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + Assert(!ActiveSnapshotSet()); + InvalidateCatalogSnapshot(); + Assert(MyProc->xmin == InvalidTransactionId); + + (void) WaitForLSN(target_lsn, timeout); + + PG_RETURN_VOID(); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 521ed5418c..5aed90c935 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,6 +25,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, WaitEventExtensionShmemSize()); size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); + size = add_size(size, WaitLSNShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void) /* Initialize subsystems */ CreateOrAttachShmemStructs(); + /* + * Init array of Latches in shared memory for wait lsn + */ + WaitLSNShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 162b1f919d..4b830dc3c8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -862,6 +863,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d39d8d7e87..0d288d6b3d 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -79,6 +79,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 950f00bed4..f8fab27da9 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202404021 +#define CATALOG_VERSION_NO 202404022 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 134e3b22fd..153d816a05 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12153,6 +12153,11 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +{ oid => '16387', descr => 'wait for LSN with timeout', + proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void', + proargtypes => 'pg_lsn int8', proargnames => '{target_lsn,timeout}', + prosrc => 'pg_wal_replay_wait' }, + { oid => '6291', descr => 'arbitrary value from among input values', proname => 'any_value', prokind => 'a', proisstrict => 'f', prorettype => 'anyelement', proargtypes => 'anyelement', diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h new file mode 100644 index 0000000000..10ef63f0c0 --- /dev/null +++ b/src/include/commands/waitlsn.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.h + * Declarations for LSN waiting routines. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * src/include/commands/waitlsn.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_LSN_H +#define WAIT_LSN_H + +#include "postgres.h" +#include "port/atomics.h" +#include "storage/spin.h" +#include "tcop/dest.h" + +/* Shared memory structures */ +typedef struct WaitLSNProcInfo +{ + int procnum; + XLogRecPtr waitLSN; +} WaitLSNProcInfo; + +typedef struct WaitLSNState +{ + pg_atomic_uint64 minLSN; + slock_t mutex; + int numWaitedProcs; + WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + +extern PGDLLIMPORT struct WaitLSNState *waitLSN; + +extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout); +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatches(XLogRecPtr currentLSN); +extern void WaitLSNCleanup(void); + +#endif /* WAIT_LSN_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index b1eb77b1ec..712924c2fa 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -51,6 +51,7 @@ tests += { 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', + 't/043_wal_replay_wait.pl', ], }, } diff --git a/src/test/recovery/t/043_wal_replay_wait.pl b/src/test/recovery/t/043_wal_replay_wait.pl new file mode 100644 index 0000000000..bbd64aa67b --- /dev/null +++ b/src/test/recovery/t/043_wal_replay_wait.pl @@ -0,0 +1,97 @@ +# Checks waiting for the lsn replay on standby using +# pg_wal_replay_wait() procedure. +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Create a streaming standby with a 1 second delay from the backup +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf( + 'postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + + +# Make sure that pg_wal_replay_wait() works: add new content to +# primary and memorize primary's insert LSN, then wait for that LSN to be +# replayed on standby. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $output = $node_standby->safe_psql( + 'postgres', qq[ + CALL pg_wal_replay_wait('${lsn1}', 1000000); + SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); +]); + +# Make sure the current LSN on standby is at least as big as the LSN we +# observed on primary's before. +ok($output >= 0, + "standby reached the same LSN as primary after pg_wal_replay_wait()"); + +# Check that new data is visible after calling pg_wal_replay_wait() +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + CALL pg_wal_replay_wait('${lsn2}'); + SELECT count(*) FROM wait_test; +]); + +# Make sure the current LSN on standby and is the same as primary's LSN +ok($output eq 30, "standby reached the same LSN as primary"); + +# Check that waiting for unreachable LSN triggers the timeout. The +# unreachable LSN must be well in advance. So WAL records issued by +# the concurrent autovacuum could not affect that. +my $lsn3 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $stderr; +$node_standby->safe_psql('postgres', + "CALL pg_wal_replay_wait('${lsn2}', 10);"); +$node_standby->psql( + 'postgres', + "CALL pg_wal_replay_wait('${lsn3}', 1000);", + stderr => \$stderr); +ok( $stderr =~ /timed out while waiting for target LSN/, + "get timeout on waiting for unreachable LSN"); + +# Check that the standby promotion terminates the wait on LSN. Start +# waiting for unreachable LSN then promote. Check the log for the relevant +# error message. +my $psql_session = $node_standby->background_psql('postgres'); +$psql_session->query_until( + qr/start/, qq[ + \\echo start + CALL pg_wal_replay_wait('${lsn3}'); +]); + +my $log_offset = -s $node_standby->logfile; +$node_standby->promote; +$node_standby->wait_for_log('recovery is not in progress', $log_offset); + +$node_standby->stop; +$node_primary->stop; +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8bc8dd6f1c..d23e78b898 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3059,6 +3059,8 @@ WaitEventIO WaitEventIPC WaitEventSet WaitEventTimeout +WaitLSNProcInfo +WaitLSNState WaitPMResult WalCloseMethod WalCompression