From f3ff7bf83bce11709add9a6d31d4bebe95e086e3 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Sun, 7 Apr 2024 14:06:30 +0200 Subject: [PATCH] Add XLogCtl->logInsertResult This tracks the position of WAL that's been fully copied into WAL buffers by all processes emitting WAL. (For some reason we call that "WAL insertion"). This is updated using atomic monotonic advance during WaitXLogInsertionsToFinish, which is not when the insertions actually occur, but it's the only place where we know where have all the insertions have completed. This value is useful in WALReadFromBuffers, which can verify that callers don't try to read past what has been inserted. (However, more infrastructure is needed in order to actually use WAL after the flush point, since it could be lost.) The value is also useful in WaitXLogInsertionsToFinish() itself, since we can now exit quickly when all WAL has been already inserted, without even having to take any locks. --- src/backend/access/transam/xlog.c | 40 ++++++++++++++++++++++++++++++- src/include/port/atomics.h | 36 ++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b4499cda7b..e3fb26f5ab 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -469,6 +469,7 @@ typedef struct XLogCtlData XLogRecPtr lastSegSwitchLSN; /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logInsertResult; /* last byte + 1 inserted to buffers */ pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ @@ -1499,6 +1500,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr inserted; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* + * Check if there's any work to do. Use a barrier to ensure we get the + * freshest value. + */ + inserted = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult); + if (upto <= inserted) + return inserted; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + /* + * Advance the limit we know to have been inserted and return the freshest + * value we know of, which might be beyond what we requested if somebody + * is concurrently doing this with an 'upto' pointer ahead of us. + */ + finishedUpto = pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult, + finishedUpto); + return finishedUpto; } @@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr inserted; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been inserted into WAL + * buffers before we try to read it. + */ + inserted = pg_atomic_read_u64(&XLogCtl->logInsertResult); + if (startptr + count > inserted) + ereport(ERROR, + errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), + LSN_FORMAT_ARGS(inserted))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { XLogRecPtr Flush; XLogRecPtr Write; + XLogRecPtr Insert; Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); pg_read_barrier(); Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + pg_read_barrier(); + Insert = pg_atomic_read_u64(&XLogCtl->logInsertResult); /* WAL written to disk is always ahead of WAL flushed */ Assert(Write >= Flush); + + /* WAL inserted to buffers is always ahead of WAL written */ + Assert(Insert >= Write); } #endif } @@ -4951,6 +4987,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); @@ -5979,6 +6016,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->logInsertResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index ff47782cdb..78987f3154 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. Returns the latest value observed, which + * may or may not be the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline uint64 +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64_impl(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return currval; + } + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(&currval, 8); +#endif + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_)) + break; + } + + return Max(target_, currval); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */