diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml index c23bbfb4e7..66d9ad7cb2 100644 --- a/doc/src/sgml/ref/begin.sgml +++ b/doc/src/sgml/ref/begin.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ] +BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ] [ WAIT FOR LSN lsn_value [TIMEOUT number_of_milliseconds ] ] where transaction_mode is one of: @@ -63,6 +63,17 @@ BEGIN [ WORK | TRANSACTION ] [ transaction_mode was executed. + + + The WAIT FOR clause allows to wait for the target log + sequence number (LSN) to be replayed on standby before + starting the transaction in PostgreSQL databases + with master-standby asynchronous replication. Wait time can be limited by + specifying a timeout, which is measured in milliseconds and must be a positive + integer. If LSN was not reached before timeout, transaction + doesn't begin. Waiting can be interrupted by cancelling + BEGIN command. + @@ -146,6 +157,10 @@ BEGIN; different purpose in embedded SQL. You are advised to be careful about the transaction semantics when porting database applications. + + + There is no WAIT FOR clause in the SQL standard. + diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml index d6cd1d4177..b94ab00b40 100644 --- a/doc/src/sgml/ref/start_transaction.sgml +++ b/doc/src/sgml/ref/start_transaction.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -START TRANSACTION [ transaction_mode [, ...] ] +START TRANSACTION [ transaction_mode [, ...] ] [ WAIT FOR LSN lsn_value [TIMEOUT number_of_milliseconds ] ] where transaction_mode is one of: @@ -40,6 +40,17 @@ START TRANSACTION [ transaction_mode was executed. This is the same as the command. + + + The WAIT FOR clause allows to wait for the target log + sequence number (LSN) to be replayed on standby before + starting the transaction in PostgreSQL databases + with master-standby asynchronous replication. Wait time can be limited by + specifying a timeout, which is measured in milliseconds and must be a positive + integer. If LSN was not reached before timeout, transaction + doesn't begin. Waiting can be interrupted by cancelling + START TRANSACTION command. + @@ -78,6 +89,10 @@ START TRANSACTION [ transaction_mode + + There is no WAIT FOR clause in the SQL standard. + + See also the compatibility section of . diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ec55d68d27..1651e15e89 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -42,6 +42,7 @@ #include "catalog/pg_database.h" #include "commands/progress.h" #include "commands/tablespace.h" +#include "commands/wait.h" #include "common/controldata_utils.h" #include "executor/instrument.h" #include "miscadmin.h" @@ -7154,6 +7155,7 @@ StartupXLOG(void) do { bool switchedTLI = false; + XLogRecPtr minWaitedLSN; #ifdef WAL_DEBUG if (XLOG_DEBUG || @@ -7357,6 +7359,17 @@ StartupXLOG(void) break; } + /* + * If we replayed an LSN that someone was waiting for, set + * latches in shared memory array to notify the waiter. + */ + minWaitedLSN = WaitLSNGetMin(); + if (!XLogRecPtrIsInvalid(minWaitedLSN) && + minWaitedLSN <= XLogCtl->lastReplayedEndRecPtr) + { + WaitLSNSetLatch(XLogCtl->lastReplayedEndRecPtr); + } + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index d4815d3ce6..9b310926c1 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -57,6 +57,7 @@ OBJS = \ user.o \ vacuum.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 0000000000..b7aee5b794 --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,295 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR clause for BEGIN and START TRANSACTION commands. + * This clause allows waiting for given LSN to be replayed on standby. + * + * Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/wait.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/backendid.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/timestamp.h" + +/* + * Shared memory structure representing information about LSNs, which backends + * are waiting for replay. + */ +typedef struct +{ + slock_t mutex; /* mutex protecting the fields below */ + int max_backend_id; /* max backend_id present in lsns[] */ + pg_atomic_uint64 min_lsn; /* minimal waited LSN */ + /* per-backend array of waited LSNs */ + XLogRecPtr lsns[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + +static WaitLSNState * state; + +/* + * Add the wait event of the current backend to shared memory array + */ +static void +WaitLSNAdd(XLogRecPtr lsn_to_wait) +{ + SpinLockAcquire(&state->mutex); + if (state->max_backend_id < MyBackendId) + state->max_backend_id = MyBackendId; + + state->lsns[MyBackendId] = lsn_to_wait; + + if (lsn_to_wait < state->min_lsn.value) + state->min_lsn.value = lsn_to_wait; + SpinLockRelease(&state->mutex); +} + +/* + * Delete wait event of the current backend from the shared memory array. + */ +void +WaitLSNDelete(void) +{ + int i; + XLogRecPtr deleted_lsn; + + SpinLockAcquire(&state->mutex); + + deleted_lsn = state->lsns[MyBackendId]; + state->lsns[MyBackendId] = InvalidXLogRecPtr; + + /* If we are deleting the minimal LSN, then choose the next min_lsn */ + if (!XLogRecPtrIsInvalid(deleted_lsn) && + deleted_lsn == state->min_lsn.value) + { + state->min_lsn.value = InvalidXLogRecPtr; + for (i = 2; i <= state->max_backend_id; i++) + { + if (!XLogRecPtrIsInvalid(state->lsns[i]) && + (state->lsns[i] < state->min_lsn.value || + XLogRecPtrIsInvalid(state->min_lsn.value))) + { + state->min_lsn.value = state->lsns[i]; + } + } + } + + /* If deleting from the end of the array, shorten the array's used part */ + if (state->max_backend_id == MyBackendId) + { + for (i = (MyBackendId); i >= 2; i--) + if (!XLogRecPtrIsInvalid(state->lsns[i])) + { + state->max_backend_id = i; + break; + } + } + + SpinLockRelease(&state->mutex); +} + +/* + * Report amount of shared memory space needed for WaitLSNState + */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, lsns); + size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr))); + return size; +} + +/* + * Initialize an shared memory structure for waiting for LSN + */ +void +WaitLSNShmemInit(void) +{ + bool found; + uint32 i; + + state = (WaitLSNState *) ShmemInitStruct("pg_wait_lsn", + WaitLSNShmemSize(), + &found); + if (!found) + { + SpinLockInit(&state->mutex); + + for (i = 0; i < (MaxBackends + 1); i++) + state->lsns[i] = InvalidXLogRecPtr; + + state->max_backend_id = 0; + pg_atomic_init_u64(&state->min_lsn, InvalidXLogRecPtr); + } +} + +/* + * Set latches in shared memory to signal that new LSN has been replayed + */ +void +WaitLSNSetLatch(XLogRecPtr cur_lsn) +{ + uint32 i; + int max_backend_id; + PGPROC *backend; + + SpinLockAcquire(&state->mutex); + max_backend_id = state->max_backend_id; + + for (i = 2; i <= max_backend_id; i++) + { + backend = BackendIdGetProc(i); + + if (backend && state->lsns[i] != 0 && + state->lsns[i] <= cur_lsn) + { + SetLatch(&backend->procLatch); + } + } + SpinLockRelease(&state->mutex); +} + +/* + * Get minimal LSN that some backend is waiting for + */ +XLogRecPtr +WaitLSNGetMin(void) +{ + return state->min_lsn.value; +} + +/* + * On WAIT use a latch to wait till LSN is replayed, postmaster dies or timeout + * happens. Timeout is specified in milliseconds. Returns true if LSN was + * reached and false otherwise. + */ +bool +WaitLSNUtility(XLogRecPtr target_lsn, const int timeout_ms) +{ + XLogRecPtr cur_lsn; + int latch_events; + float8 endtime; + bool res = false; + bool wait_forever = (timeout_ms <= 0); + + endtime = GetNowFloat() + timeout_ms / 1000.0; + + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + /* Check if we already reached the needed LSN */ + cur_lsn = GetXLogReplayRecPtr(NULL); + if (cur_lsn >= target_lsn) + return true; + + WaitLSNAdd(target_lsn); + ResetLatch(MyLatch); + + /* Recheck if LSN was reached while WaitLSNAdd() and ResetLatch() */ + cur_lsn = GetXLogReplayRecPtr(NULL); + if (cur_lsn >= target_lsn) + return true; + + for (;;) + { + int rc; + float8 time_left = 0; + long time_left_ms = 0; + + time_left = endtime - GetNowFloat(); + + /* Use 1 second as the default timeout to check for interrupts */ + if (wait_forever || time_left < 0 || time_left > 1.0) + time_left_ms = 1000; + else + time_left_ms = (long) ceil(time_left * 1000.0); + + /* If interrupt, LockErrorCleanup() will do WaitLSNDelete() for us */ + CHECK_FOR_INTERRUPTS(); + + /* If postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + rc = WaitLatch(MyLatch, latch_events, time_left_ms, + WAIT_EVENT_CLIENT_READ); + + ResetLatch(MyLatch); + + if (rc & WL_LATCH_SET) + cur_lsn = GetXLogReplayRecPtr(NULL); + + if (rc & WL_TIMEOUT) + { + time_left = endtime - GetNowFloat(); + /* If the time specified by user has passed, stop waiting */ + if (!wait_forever && time_left <= 0.0) + break; + cur_lsn = GetXLogReplayRecPtr(NULL); + } + + /* If LSN has been replayed */ + if (target_lsn <= cur_lsn) + break; + } + + WaitLSNDelete(); + + if (cur_lsn < target_lsn) + ereport(WARNING, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("didn't start transaction because LSN was not reached"), + errhint("Try to increase wait timeout."))); + else + res = true; + + return res; +} + +/* + * Implementation of WAIT FOR clause for BEGIN and START TRANSACTION commands + */ +int +WaitLSNMain(WaitClause *stmt, DestReceiver *dest) +{ + TupleDesc tupdesc; + TupOutputState *tstate; + XLogRecPtr target_lsn; + bool res = false; + + target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn))); + res = WaitLSNUtility(target_lsn, stmt->timeout); + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0); + + /* Prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple); + + /* Send the result */ + do_text_output_oneline(tstate, res ? "t" : "f"); + end_tup_output(tstate); + return res; +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 1525c0de72..db179becab 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3748,10 +3748,22 @@ _copyTransactionStmt(const TransactionStmt *from) COPY_STRING_FIELD(savepoint_name); COPY_STRING_FIELD(gid); COPY_SCALAR_FIELD(chain); + COPY_NODE_FIELD(wait); return newnode; } +static WaitClause * +_copyWaitClause(const WaitClause *from) +{ + WaitClause *newnode = makeNode(WaitClause); + + COPY_STRING_FIELD(lsn); + COPY_SCALAR_FIELD(timeout); + + return newnode; +}; + static CompositeTypeStmt * _copyCompositeTypeStmt(const CompositeTypeStmt *from) { @@ -5339,6 +5351,9 @@ copyObjectImpl(const void *from) case T_TransactionStmt: retval = _copyTransactionStmt(from); break; + case T_WaitClause: + retval = _copyWaitClause(from); + break; case T_CompositeTypeStmt: retval = _copyCompositeTypeStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 4f34189ab5..854d484f60 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1541,6 +1541,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b) COMPARE_STRING_FIELD(savepoint_name); COMPARE_STRING_FIELD(gid); COMPARE_SCALAR_FIELD(chain); + COMPARE_NODE_FIELD(wait); + + return true; +} + +static bool +_equalWaitClause(const WaitClause *a, const WaitClause *b) +{ + COMPARE_STRING_FIELD(lsn); + COMPARE_SCALAR_FIELD(timeout); return true; } @@ -3391,6 +3401,9 @@ equal(const void *a, const void *b) case T_TransactionStmt: retval = _equalTransactionStmt(a, b); break; + case T_WaitClause: + retval = _equalWaitClause(a, b); + break; case T_CompositeTypeStmt: retval = _equalCompositeTypeStmt(a, b); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 5b826509eb..47753b42c6 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2784,6 +2784,28 @@ _outDefElem(StringInfo str, const DefElem *node) WRITE_LOCATION_FIELD(location); } +static void +_outTransactionStmt(StringInfo str, const TransactionStmt *node) +{ + WRITE_NODE_TYPE("TRANSACTIONSTMT"); + + WRITE_STRING_FIELD(savepoint_name); + WRITE_STRING_FIELD(gid); + WRITE_NODE_FIELD(options); + WRITE_BOOL_FIELD(chain); + WRITE_ENUM_FIELD(kind, TransactionStmtKind); + WRITE_NODE_FIELD(wait); +} + +static void +_outWaitClause(StringInfo str, const WaitClause *node) +{ + WRITE_NODE_TYPE("WAITCLAUSE"); + + WRITE_STRING_FIELD(lsn); + WRITE_UINT_FIELD(timeout); +} + static void _outTableLikeClause(StringInfo str, const TableLikeClause *node) { @@ -4334,6 +4356,12 @@ outNode(StringInfo str, const void *obj) case T_PartitionRangeDatum: _outPartitionRangeDatum(str, obj); break; + case T_TransactionStmt: + _outTransactionStmt(str, obj); + break; + case T_WaitClause: + _outWaitClause(str, obj); + break; default: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 1219ac8c26..ea1084fa3c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -601,6 +601,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type PartitionBoundSpec %type hash_partbound %type hash_partbound_elem +%type wait_time +%type wait_for /* * Non-keyword token types. These are hard-wired into the "flex" lexer. @@ -670,7 +672,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -701,7 +703,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN - TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM + TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P TRUNCATE TRUSTED TYPE_P TYPES_P @@ -711,7 +713,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAIT WHEN WHERE WHITESPACE_P WINDOW + WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -9955,18 +9958,20 @@ TransactionStmt: n->chain = $3; $$ = (Node *)n; } - | BEGIN_P opt_transaction transaction_mode_list_or_empty + | BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for { TransactionStmt *n = makeNode(TransactionStmt); n->kind = TRANS_STMT_BEGIN; n->options = $3; + n->wait = $4; $$ = (Node *)n; } - | START TRANSACTION transaction_mode_list_or_empty + | START TRANSACTION transaction_mode_list_or_empty wait_for { TransactionStmt *n = makeNode(TransactionStmt); n->kind = TRANS_STMT_START; n->options = $3; + n->wait = $4; $$ = (Node *)n; } | COMMIT opt_transaction opt_transaction_chain @@ -14240,6 +14245,25 @@ xml_passing_mech: | BY VALUE_P ; +/* + * WAIT FOR clause of BEGIN and START TRANSACTION statements + */ +wait_for: + WAIT FOR LSN Sconst wait_time + { + WaitClause *n = makeNode(WaitClause); + n->lsn = $4; + n->timeout = $5; + $$ = (Node *)n; + } + | /* EMPTY */ { $$ = NULL; } + ; + +wait_time: + TIMEOUT Iconst { $$ = $2; } + | /* EMPTY */ { $$ = 0; } + ; + /* * Aggregate decoration clauses @@ -15391,6 +15415,7 @@ unreserved_keyword: | LOCK_P | LOCKED | LOGGED + | LSN | MAPPING | MATCH | MATERIALIZED @@ -15518,6 +15543,7 @@ unreserved_keyword: | TEMPORARY | TEXT_P | TIES + | TIMEOUT | TRANSACTION | TRANSFORM | TRIGGER @@ -15544,6 +15570,7 @@ unreserved_keyword: | VIEW | VIEWS | VOLATILE + | WAIT | WHITESPACE_P | WITHIN | WITHOUT diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..417840a8f1 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, WaitLSNShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); + /* + * Init array of Latches in shared memory for WAIT + */ + WaitLSNShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 9938cddb57..baecb39787 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -38,6 +38,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -717,6 +718,9 @@ LockErrorCleanup(void) AbortStrongLockAcquire(); + /* If BEGIN WAIT FOR LSN was interrupted, then stop waiting for that LSN */ + WaitLSNDelete(); + /* Nothing to do if we weren't waiting for a lock */ if (lockAwaited == NULL) { diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index b1f7f6e2d0..f516bd22ea 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -57,6 +57,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/wait.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -591,6 +592,18 @@ standard_ProcessUtility(PlannedStmt *pstmt, case TRANS_STMT_START: { ListCell *lc; + WaitClause *waitstmt = (WaitClause *) stmt->wait; + + /* WAIT FOR cannot be used on master */ + if (stmt->wait && !RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("WAIT FOR can only be " + "used on standby"))); + + /* If needed to WAIT FOR something but failed */ + if (stmt->wait && WaitLSNMain(waitstmt, dest) == 0) + break; BeginTransactionBlock(); foreach(lc, stmt->options) diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index ee340fb0f0..03f997cba7 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -372,8 +372,6 @@ pg_sleep(PG_FUNCTION_ARGS) * less than the specified time when WaitLatch is terminated early by a * non-query-canceling signal such as SIGHUP. */ -#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) - endtime = GetNowFloat() + secs; for (;;) diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 0000000000..2a95c95d2b --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H + +#include "tcop/dest.h" +#include "nodes/parsenodes.h" + +extern bool WaitLSNUtility(XLogRecPtr lsn, const int timeout_ms); +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatch(XLogRecPtr cur_lsn); +extern XLogRecPtr WaitLSNGetMin(void); +extern int WaitLSNMain(WaitClause *stmt, DestReceiver *dest); +extern void WaitLSNDelete(void); + +#endif /* WAIT_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 381d84b4e4..822827aa32 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -492,6 +492,7 @@ typedef enum NodeTag T_StartReplicationCmd, T_TimeLineHistoryCmd, T_SQLCmd, + T_WaitClause, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 518abe42c1..7ad3ddbf57 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1431,6 +1431,17 @@ typedef struct OnConflictClause int location; /* token location, or -1 if unknown */ } OnConflictClause; +/* + * WaitClause - + * representation of WAIT FOR clause for BEGIN and START TRANSACTION. + */ +typedef struct WaitClause +{ + NodeTag type; + char *lsn; /* LSN to wait for */ + int timeout; /* Number of milliseconds to limit wait time */ +} WaitClause; + /* * CommonTableExpr - * representation of WITH list element @@ -3060,6 +3071,7 @@ typedef struct TransactionStmt char *savepoint_name; /* for savepoint commands */ char *gid; /* for two-phase-commit related commands */ bool chain; /* AND CHAIN option */ + Node *wait; /* WAIT FOR clause */ } TransactionStmt; /* ---------------------- diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 08f22ce211..6e1848fe4c 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD) +PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD) @@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD) PG_KEYWORD("then", THEN, RESERVED_KEYWORD) PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD) PG_KEYWORD("time", TIME, COL_NAME_KEYWORD) +PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD) PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD) PG_KEYWORD("to", TO, RESERVED_KEYWORD) PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD) @@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD) +PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD) diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 03a1de569f..eaeeb79c41 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -109,4 +109,6 @@ extern int date2isoyearday(int year, int mon, int mday); extern bool TimestampTimestampTzRequiresRewrite(void); +#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) + #endif /* TIMESTAMP_H */ diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl new file mode 100644 index 0000000000..3db25bd661 --- /dev/null +++ b/src/test/recovery/t/020_begin_wait.pl @@ -0,0 +1,85 @@ +# Checks for BEGIN WAIT FOR LSN +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1); +$node_master->start; + +# And some content and take a backup +$node_master->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_master->backup($backup_name); + +# Using the backup, create a streaming standby with a 1 second delay +my $node_standby = get_new_node('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_master, $backup_name, + has_streaming => 1); +$node_standby->append_conf('postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + + +# Check that timeouts make us wait for the specified time (1s here) +my $current_time = $node_standby->safe_psql('postgres', "SELECT now()"); +my $two_seconds = 2000; # in milliseconds +my $start_time = time(); +$node_standby->safe_psql('postgres', + "BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $two_seconds"); +my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds +ok($time_waited >= $two_seconds, "WAIT FOR TIMEOUT waits for enough time"); + + +# Check that timeouts let us stop waiting right away, before reaching target LSN +$node_master->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); +my ($ret, $out, $err) = $node_standby->psql('postgres', + "BEGIN WAIT FOR LSN '$lsn1' TIMEOUT 1"); + +ok($ret == 0, "zero return value when failed to WAIT FOR LSN on standby"); +ok($err =~ /WARNING: didn't start transaction because LSN was not reached/, + "correct error message when failed to WAIT FOR LSN on standby"); +ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN"); + + +# Check that WAIT FOR works fine and reaches target LSN if given no timeout + +# Add data on master, memorize master's last LSN +$node_master->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); + +# Wait for it to appear on replica, memorize replica's last LSN +$node_standby->safe_psql('postgres', + "BEGIN WAIT FOR LSN '$lsn2'"); +my $reached_lsn = $node_standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + +# Make sure that master's and replica's LSNs are the same after WAIT +my $compare_lsns = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)"); +ok($compare_lsns eq 0, + "standby reached the same LSN as master before starting transaction"); + + +# Make sure that it's not allowed to use WAIT FOR on master +($ret, $out, $err) = $node_master->psql('postgres', + "BEGIN WAIT FOR LSN '0/FFFFFFFF'"); + +ok($ret != 0, "non-zero return value when trying to WAIT FOR LSN on master"); +ok($err =~ /ERROR: WAIT FOR can only be used on standby/, + "correct error message when trying to WAIT FOR LSN on master"); +ok($out eq '', "empty output when trying to WAIT FOR LSN on master"); + + +$node_standby->stop; +$node_master->stop; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 525d58e7f0..020f75c5e2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2621,6 +2621,7 @@ WSABUF WSADATA WSANETWORKEVENTS WSAPROTOCOL_INFO +WaitClause WaitEvent WaitEventActivity WaitEventClient @@ -2628,6 +2629,7 @@ WaitEventIO WaitEventIPC WaitEventSet WaitEventTimeout +WaitLSNState WaitPMResult WalCloseMethod WalLevel