diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 8e1c821cc5..2514ffb644 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -234,7 +234,7 @@ typedef struct QueueBackendStatus * When holding AsyncQueueLock in EXCLUSIVE mode, backends can inspect the * entries of other backends and also change the head pointer. When holding * both AsyncQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends can - * change the tail pointer. + * change the tail pointers. * * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need multiple locks, we first get @@ -249,6 +249,8 @@ typedef struct AsyncQueueControl QueuePosition head; /* head points to the next free location */ QueuePosition tail; /* the global tail is equivalent to the pos of * the "slowest" backend */ + int stopPage; /* oldest unrecycled page; must be <= + * tail.page */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ @@ -258,6 +260,7 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) +#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) @@ -467,6 +470,7 @@ AsyncShmemInit(void) SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); + QUEUE_STOP_PAGE = 0; asyncQueueControl->lastQueueFillWarn = 0; /* zero'th entry won't be used, but let's initialize it anyway */ for (i = 0; i <= MaxBackends; i++) @@ -1239,7 +1243,7 @@ asyncQueueIsFull(void) nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; if (nexthead > QUEUE_MAX_PAGE) nexthead = 0; /* wrap around */ - boundary = QUEUE_POS_PAGE(QUEUE_TAIL); + boundary = QUEUE_STOP_PAGE; boundary -= boundary % SLRU_PAGES_PER_SEGMENT; return asyncQueuePagePrecedes(nexthead, boundary); } @@ -1430,6 +1434,11 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) * Return the fraction of the queue that is currently occupied. * * The caller must hold AsyncQueueLock in (at least) shared mode. + * + * Note: we measure the distance to the logical tail page, not the physical + * tail page. In some sense that's wrong, but the relative position of the + * physical tail is affected by details such as SLRU segment boundaries, + * so that a result based on that is unpleasantly unstable. */ static double asyncQueueUsage(void) @@ -2018,7 +2027,23 @@ asyncQueueAdvanceTail(void) /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); - /* Compute the new tail. */ + /* + * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact + * (ie, exactly match at least one backend's queue position), so it must + * be updated atomically with the actual computation. Since v13, we could + * get away with not doing it like that, but it seems prudent to keep it + * so. + * + * Also, because incoming backends will scan forward from QUEUE_TAIL, that + * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is + * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest + * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL), + * there are pages we can truncate but haven't yet finished doing so. + * + * For concurrency's sake, we don't want to hold AsyncQueueLock while + * performing SimpleLruTruncate. This is OK because no backend will try + * to access the pages we are in the midst of truncating. + */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; for (i = 1; i <= MaxBackends; i++) @@ -2026,7 +2051,8 @@ asyncQueueAdvanceTail(void) if (QUEUE_BACKEND_PID(i) != InvalidPid) min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); } - oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); + QUEUE_TAIL = min; + oldtailpage = QUEUE_STOP_PAGE; LWLockRelease(AsyncQueueLock); /* @@ -2045,16 +2071,16 @@ asyncQueueAdvanceTail(void) * the lock again. */ SimpleLruTruncate(AsyncCtl, newtailpage); - } - /* - * Advertise the new tail. This changes asyncQueueIsFull()'s verdict for - * the segment immediately prior to the new tail, allowing fresh data into - * that segment. - */ - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); - QUEUE_TAIL = min; - LWLockRelease(AsyncQueueLock); + /* + * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict + * for the segment immediately prior to the old tail, allowing fresh + * data into that segment. + */ + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + QUEUE_STOP_PAGE = newtailpage; + LWLockRelease(AsyncQueueLock); + } LWLockRelease(NotifyQueueTailLock); }