|
|
|
@ -103,12 +103,11 @@
|
|
|
|
|
* until we reach either a notification from an uncommitted transaction or
|
|
|
|
|
* the head pointer's position.
|
|
|
|
|
*
|
|
|
|
|
* 6. To avoid SLRU wraparound and limit disk space consumption, the tail
|
|
|
|
|
* pointer needs to be advanced so that old pages can be truncated.
|
|
|
|
|
* This is relatively expensive (notably, it requires an exclusive lock),
|
|
|
|
|
* so we don't want to do it often. We make sending backends do this work
|
|
|
|
|
* if they advanced the queue head into a new page, but only once every
|
|
|
|
|
* QUEUE_CLEANUP_DELAY pages.
|
|
|
|
|
* 6. To limit disk space consumption, the tail pointer needs to be advanced
|
|
|
|
|
* so that old pages can be truncated. This is relatively expensive
|
|
|
|
|
* (notably, it requires an exclusive lock), so we don't want to do it
|
|
|
|
|
* often. We make sending backends do this work if they advanced the queue
|
|
|
|
|
* head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
|
|
|
|
|
*
|
|
|
|
|
* An application that listens on the same channel it notifies will get
|
|
|
|
|
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
|
|
|
|
@ -120,7 +119,7 @@
|
|
|
|
|
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
|
|
|
|
|
* can be varied without affecting anything but performance. The maximum
|
|
|
|
|
* amount of notification data that can be queued at one time is determined
|
|
|
|
|
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
|
|
|
|
|
* by max_notify_queue_pages GUC.
|
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
@ -312,23 +311,8 @@ static SlruCtlData NotifyCtlData;
|
|
|
|
|
|
|
|
|
|
#define NotifyCtl (&NotifyCtlData)
|
|
|
|
|
#define QUEUE_PAGESIZE BLCKSZ
|
|
|
|
|
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages
|
|
|
|
|
* which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
|
|
|
|
|
* We could use as many segments as SlruScanDirectory() allows, but this gives
|
|
|
|
|
* us so much space already that it doesn't seem worth the trouble.
|
|
|
|
|
*
|
|
|
|
|
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
|
|
|
|
|
* pages, because more than that would confuse slru.c into thinking there
|
|
|
|
|
* was a wraparound condition. With the default BLCKSZ this means there
|
|
|
|
|
* can be up to 8GB of queued-and-not-read data.
|
|
|
|
|
*
|
|
|
|
|
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
|
|
|
|
|
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
|
|
|
|
|
*/
|
|
|
|
|
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
|
|
|
|
|
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* listenChannels identifies the channels we are actually listening to
|
|
|
|
@ -439,12 +423,15 @@ static bool amRegisteredListener = false;
|
|
|
|
|
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
|
|
|
|
|
static bool tryAdvanceTail = false;
|
|
|
|
|
|
|
|
|
|
/* GUC parameter */
|
|
|
|
|
/* GUC parameters */
|
|
|
|
|
bool Trace_notify = false;
|
|
|
|
|
|
|
|
|
|
/* For 8 KB pages this gives 8 GB of disk space */
|
|
|
|
|
int max_notify_queue_pages = 1048576;
|
|
|
|
|
|
|
|
|
|
/* local function prototypes */
|
|
|
|
|
static int64 asyncQueuePageDiff(int64 p, int64 q);
|
|
|
|
|
static bool asyncQueuePagePrecedes(int64 p, int64 q);
|
|
|
|
|
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
|
|
|
|
|
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
|
|
|
|
|
static void queue_listen(ListenActionKind action, const char *channel);
|
|
|
|
|
static void Async_UnlistenOnExit(int code, Datum arg);
|
|
|
|
|
static void Exec_ListenPreCommit(void);
|
|
|
|
@ -474,39 +461,23 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
|
|
|
|
|
static void ClearPendingActionsAndNotifies(void);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Compute the difference between two queue page numbers (i.e., p - q),
|
|
|
|
|
* accounting for wraparound.
|
|
|
|
|
* Compute the difference between two queue page numbers.
|
|
|
|
|
* Previously this function accounted for a wraparound.
|
|
|
|
|
*/
|
|
|
|
|
static int64
|
|
|
|
|
static inline int64
|
|
|
|
|
asyncQueuePageDiff(int64 p, int64 q)
|
|
|
|
|
{
|
|
|
|
|
int64 diff;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
|
|
|
|
|
* in the range 0..QUEUE_MAX_PAGE.
|
|
|
|
|
*/
|
|
|
|
|
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
|
|
|
|
|
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
|
|
|
|
|
|
|
|
|
|
diff = p - q;
|
|
|
|
|
if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
|
|
|
|
|
diff -= QUEUE_MAX_PAGE + 1;
|
|
|
|
|
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
|
|
|
|
|
diff += QUEUE_MAX_PAGE + 1;
|
|
|
|
|
return diff;
|
|
|
|
|
return p - q;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Is p < q, accounting for wraparound?
|
|
|
|
|
*
|
|
|
|
|
* Since asyncQueueIsFull() blocks creation of a page that could precede any
|
|
|
|
|
* extant page, we need not assess entries within a page.
|
|
|
|
|
* Determines whether p precedes q.
|
|
|
|
|
* Previously this function accounted for a wraparound.
|
|
|
|
|
*/
|
|
|
|
|
static bool
|
|
|
|
|
static inline bool
|
|
|
|
|
asyncQueuePagePrecedes(int64 p, int64 q)
|
|
|
|
|
{
|
|
|
|
|
return asyncQueuePageDiff(p, q) < 0;
|
|
|
|
|
return p < q;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -566,12 +537,13 @@ AsyncShmemInit(void)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set up SLRU management of the pg_notify data.
|
|
|
|
|
* Set up SLRU management of the pg_notify data. Note that long segment
|
|
|
|
|
* names are used in order to avoid wraparound.
|
|
|
|
|
*/
|
|
|
|
|
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
|
|
|
|
|
SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
|
|
|
|
|
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
|
|
|
|
|
SYNC_HANDLER_NONE, false);
|
|
|
|
|
SYNC_HANDLER_NONE, true);
|
|
|
|
|
|
|
|
|
|
if (!found)
|
|
|
|
|
{
|
|
|
|
@ -1305,27 +1277,11 @@ asyncQueueUnregister(void)
|
|
|
|
|
static bool
|
|
|
|
|
asyncQueueIsFull(void)
|
|
|
|
|
{
|
|
|
|
|
int nexthead;
|
|
|
|
|
int boundary;
|
|
|
|
|
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
|
|
|
|
|
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
|
|
|
|
|
int occupied = headPage - tailPage;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The queue is full if creating a new head page would create a page that
|
|
|
|
|
* logically precedes the current global tail pointer, ie, the head
|
|
|
|
|
* pointer would wrap around compared to the tail. We cannot create such
|
|
|
|
|
* a head page for fear of confusing slru.c. For safety we round the tail
|
|
|
|
|
* pointer back to a segment boundary (truncation logic in
|
|
|
|
|
* asyncQueueAdvanceTail does not do this, so doing it here is optional).
|
|
|
|
|
*
|
|
|
|
|
* Note that this test is *not* dependent on how much space there is on
|
|
|
|
|
* the current head page. This is necessary because asyncQueueAddEntries
|
|
|
|
|
* might try to create the next head page in any case.
|
|
|
|
|
*/
|
|
|
|
|
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
|
|
|
|
|
if (nexthead > QUEUE_MAX_PAGE)
|
|
|
|
|
nexthead = 0; /* wrap around */
|
|
|
|
|
boundary = QUEUE_STOP_PAGE;
|
|
|
|
|
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
|
|
|
|
|
return asyncQueuePagePrecedes(nexthead, boundary);
|
|
|
|
|
return occupied >= max_notify_queue_pages;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -1355,8 +1311,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
|
|
|
|
|
if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
|
|
|
|
|
{
|
|
|
|
|
pageno++;
|
|
|
|
|
if (pageno > QUEUE_MAX_PAGE)
|
|
|
|
|
pageno = 0; /* wrap around */
|
|
|
|
|
offset = 0;
|
|
|
|
|
pageJump = true;
|
|
|
|
|
}
|
|
|
|
@ -1433,9 +1387,6 @@ asyncQueueAddEntries(ListCell *nextNotify)
|
|
|
|
|
* If this is the first write since the postmaster started, we need to
|
|
|
|
|
* initialize the first page of the async SLRU. Otherwise, the current
|
|
|
|
|
* page should be initialized already, so just fetch it.
|
|
|
|
|
*
|
|
|
|
|
* (We could also take the first path when the SLRU position has just
|
|
|
|
|
* wrapped around, but re-zeroing the page is harmless in that case.)
|
|
|
|
|
*/
|
|
|
|
|
pageno = QUEUE_POS_PAGE(queue_head);
|
|
|
|
|
if (QUEUE_POS_IS_ZERO(queue_head))
|
|
|
|
@ -1548,20 +1499,12 @@ asyncQueueUsage(void)
|
|
|
|
|
{
|
|
|
|
|
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
|
|
|
|
|
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
|
|
|
|
|
int occupied;
|
|
|
|
|
|
|
|
|
|
occupied = headPage - tailPage;
|
|
|
|
|
int occupied = headPage - tailPage;
|
|
|
|
|
|
|
|
|
|
if (occupied == 0)
|
|
|
|
|
return (double) 0; /* fast exit for common case */
|
|
|
|
|
|
|
|
|
|
if (occupied < 0)
|
|
|
|
|
{
|
|
|
|
|
/* head has wrapped around, tail not yet */
|
|
|
|
|
occupied += QUEUE_MAX_PAGE + 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
|
|
|
|
|
return (double) occupied / (double) max_notify_queue_pages;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -2209,11 +2152,6 @@ asyncQueueAdvanceTail(void)
|
|
|
|
|
*/
|
|
|
|
|
SimpleLruTruncate(NotifyCtl, newtailpage);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
|
|
|
|
|
* for the segment immediately prior to the old tail, allowing fresh
|
|
|
|
|
* data into that segment.
|
|
|
|
|
*/
|
|
|
|
|
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
|
|
|
|
|
QUEUE_STOP_PAGE = newtailpage;
|
|
|
|
|
LWLockRelease(NotifyQueueLock);
|
|
|
|
|