Improve LISTEN startup time when there are many unread notifications.

If some existing listener is far behind, incoming new listener sessions
would start from that session's read pointer and then need to advance over
many already-committed notification messages, which they have no interest
in.  This was expensive in itself and also thrashed the pg_notify SLRU
buffers a lot more than necessary.  We can improve matters considerably
in typical scenarios, without much added cost, by starting from the
furthest-ahead read pointer, not the furthest-behind one.  We do have to
consider only sessions in our own database when doing this, which requires
an extra field in the data structure, but that's a pretty small cost.

Back-patch to 9.0 where the current LISTEN/NOTIFY logic was introduced.

Matt Newell, slightly adjusted by me
This commit is contained in:
Tom Lane 2015-09-30 23:32:23 -04:00
parent 47ae6bc5a9
commit 2d4336cf85
1 changed files with 45 additions and 5 deletions

View File

@ -198,12 +198,19 @@ typedef struct QueuePosition
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
/* choose logically larger QueuePosition */
#define QUEUE_POS_MAX(x,y) \
(asyncQueuePagePrecedesLogically((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
@ -222,6 +229,7 @@ typedef struct QueueBackendStatus
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
* AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
*
@ -232,8 +240,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* the global tail is equivalent to the tail
* of the "slowest" backend */
QueuePosition tail; /* the global tail is equivalent to the pos of
* the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@ -244,6 +252,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
/*
@ -477,6 +486,7 @@ AsyncShmemInit(void)
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@ -929,6 +939,10 @@ AtCommit_Notify(void)
static void
Exec_ListenPreCommit(void)
{
QueuePosition head;
QueuePosition max;
int i;
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
@ -956,10 +970,34 @@ Exec_ListenPreCommit(void)
* over already-committed notifications. This ensures we cannot miss any
* not-yet-committed notifications. We might get a few more but that
* doesn't hurt.
*
* In some scenarios there might be a lot of committed notifications that
* have not yet been pruned away (because some backend is being lazy about
* reading them). To reduce our startup time, we can look at other
* backends and adopt the maximum "pos" pointer of any backend that's in
* our database; any notifications it's already advanced over are surely
* committed and need not be re-examined by us. (We must consider only
* backends connected to our DB, because others will not have bothered to
* check committed-ness of notifications in our DB.) But we only bother
* with that if there's more than a page worth of notifications
* outstanding, otherwise scanning all the other backends isn't worth it.
*
* We need exclusive lock here so we can look at other backends' entries.
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
head = QUEUE_HEAD;
max = QUEUE_TAIL;
if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
{
for (i = 1; i <= MaxBackends; i++)
{
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
}
}
QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@ -975,7 +1013,8 @@ Exec_ListenPreCommit(void)
*
* This will also advance the global tail pointer if possible.
*/
asyncQueueReadAllNotifications();
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
}
/*
@ -1178,6 +1217,7 @@ asyncQueueUnregister(void)
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */