diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 2826b7e43c..beef574076 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -202,12 +202,19 @@ typedef struct QueuePosition (x).page != (y).page ? (y) : \ (x).offset < (y).offset ? (x) : (y)) +/* choose logically larger QueuePosition */ +#define QUEUE_POS_MAX(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ + (x).page != (y).page ? (x) : \ + (x).offset > (y).offset ? (x) : (y)) + /* * Struct describing a listening backend's status */ typedef struct QueueBackendStatus { int32 pid; /* either a PID or InvalidPid */ + Oid dboid; /* backend's database OID, or InvalidOid */ QueuePosition pos; /* backend has read queue up to here */ } QueueBackendStatus; @@ -224,6 +231,7 @@ typedef struct QueueBackendStatus * When holding the lock in EXCLUSIVE mode, backends can inspect the entries * of other backends and also change the head and tail pointers. * + * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need both locks, we always first * get AsyncQueueLock and then AsyncCtlLock. * @@ -234,8 +242,8 @@ typedef struct QueueBackendStatus typedef struct AsyncQueueControl { QueuePosition head; /* head points to the next free location */ - QueuePosition tail; /* the global tail is equivalent to the tail - * of the "slowest" backend */ + QueuePosition tail; /* the global tail is equivalent to the pos of + * the "slowest" backend */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ @@ -246,6 +254,7 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) +#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) /* @@ -459,6 +468,7 @@ AsyncShmemInit(void) for (i = 0; i <= MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; + QUEUE_BACKEND_DBOID(i) = InvalidOid; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); } } @@ -905,6 +915,10 @@ AtCommit_Notify(void) static void Exec_ListenPreCommit(void) { + QueuePosition head; + QueuePosition max; + int i; + /* * Nothing to do if we are already listening to something, nor if we * already ran this routine in this transaction. @@ -932,10 +946,34 @@ Exec_ListenPreCommit(void) * over already-committed notifications. This ensures we cannot miss any * not-yet-committed notifications. We might get a few more but that * doesn't hurt. + * + * In some scenarios there might be a lot of committed notifications that + * have not yet been pruned away (because some backend is being lazy about + * reading them). To reduce our startup time, we can look at other + * backends and adopt the maximum "pos" pointer of any backend that's in + * our database; any notifications it's already advanced over are surely + * committed and need not be re-examined by us. (We must consider only + * backends connected to our DB, because others will not have bothered to + * check committed-ness of notifications in our DB.) But we only bother + * with that if there's more than a page worth of notifications + * outstanding, otherwise scanning all the other backends isn't worth it. + * + * We need exclusive lock here so we can look at other backends' entries. */ - LWLockAcquire(AsyncQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL; + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + head = QUEUE_HEAD; + max = QUEUE_TAIL; + if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head)) + { + for (i = 1; i <= MaxBackends; i++) + { + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); + } + } + QUEUE_BACKEND_POS(MyBackendId) = max; QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; + QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; LWLockRelease(AsyncQueueLock); /* Now we are listed in the global array, so remember we're listening */ @@ -951,7 +989,8 @@ Exec_ListenPreCommit(void) * * This will also advance the global tail pointer if possible. */ - asyncQueueReadAllNotifications(); + if (!QUEUE_POS_EQUAL(max, head)) + asyncQueueReadAllNotifications(); } /* @@ -1154,6 +1193,7 @@ asyncQueueUnregister(void) QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); /* ... then mark it invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; + QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; LWLockRelease(AsyncQueueLock); /* mark ourselves as no longer listed in the global array */