Rename SLRU structures and associated LWLocks.

Originally, the names assigned to SLRUs had no purpose other than
being shmem lookup keys, so not a lot of thought went into them.
As of v13, though, we're exposing them in the pg_stat_slru view and
the pg_stat_reset_slru function, so it seems advisable to take a bit
more care.  Rename them to names based on the associated on-disk
storage directories (which fortunately we *did* think about, to some
extent; since those are also visible to DBAs, consistency seems like
a good thing).  Also rename the associated LWLocks, since those names
are likewise user-exposed now as wait event names.

For the most part I only touched symbols used in the respective modules'
SimpleLruInit() calls, not the names of other related objects.  This
renaming could have been taken further, and maybe someday we will do so.
But for now it seems undesirable to change the names of any globally
visible functions or structs, so some inconsistency is unavoidable.

(But I *did* terminate "oldserxid" with prejudice, as I found that
name both unreadable and not descriptive of the SLRU's contents.)

Table 27.12 needs re-alphabetization now, but I'll leave that till
after the other LWLock renamings I have in mind.

Discussion: https://postgr.es/m/28683.1589405363@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2020-05-15 14:28:19 -04:00
parent 756abe2bc7
commit 5da14938f7
20 changed files with 406 additions and 382 deletions

View File

@ -1754,12 +1754,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to manage space allocation in shared memory.</entry>
</row>
<row>
<entry><literal>AsyncCtlLock</literal></entry>
<entry>Waiting to read or update shared notification state.</entry>
<entry><literal>NotifySLRULock</literal></entry>
<entry>Waiting to access the <command>NOTIFY</command> message SLRU
cache.</entry>
</row>
<row>
<entry><literal>AsyncQueueLock</literal></entry>
<entry>Waiting to read or update notification messages.</entry>
<entry><literal>NotifyQueueLock</literal></entry>
<entry>Waiting to read or update <command>NOTIFY</command> messages.</entry>
</row>
<row>
<entry><literal>AutoFileLock</literal></entry>
@ -1785,13 +1786,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
B-tree index.</entry>
</row>
<row>
<entry><literal>CLogControlLock</literal></entry>
<entry>Waiting to read or update transaction status.</entry>
<entry><literal>XactSLRULock</literal></entry>
<entry>Waiting to access the transaction status SLRU cache.</entry>
</row>
<row>
<entry><literal>CLogTruncationLock</literal></entry>
<entry><literal>XactTruncationLock</literal></entry>
<entry>Waiting to execute <function>pg_xact_status</function> or update
the oldest transaction id available to it.</entry>
the oldest transaction ID available to it.</entry>
</row>
<row>
<entry><literal>CheckpointLock</literal></entry>
@ -1802,8 +1803,8 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to manage fsync requests.</entry>
</row>
<row>
<entry><literal>CommitTsControlLock</literal></entry>
<entry>Waiting to read or update transaction commit timestamps.</entry>
<entry><literal>CommitTsSLRULock</literal></entry>
<entry>Waiting to access the commit timestamp SLRU cache.</entry>
</row>
<row>
<entry><literal>CommitTsLock</literal></entry>
@ -1828,12 +1829,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to read or update shared multixact state.</entry>
</row>
<row>
<entry><literal>MultiXactMemberControlLock</literal></entry>
<entry>Waiting to read or update multixact member mappings.</entry>
<entry><literal>MultiXactMemberSLRULock</literal></entry>
<entry>Waiting to access the multixact member SLRU cache.</entry>
</row>
<row>
<entry><literal>MultiXactOffsetControlLock</literal></entry>
<entry>Waiting to read or update multixact offset mappings.</entry>
<entry><literal>MultiXactOffsetSLRULock</literal></entry>
<entry>Waiting to access the multixact offset SLRU cache.</entry>
</row>
<row>
<entry><literal>MultiXactTruncationLock</literal></entry>
@ -1844,9 +1845,9 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to allocate or assign an OID.</entry>
</row>
<row>
<entry><literal>OldSerXidLock</literal></entry>
<entry>Waiting to read or record conflicting serializable
transactions.</entry>
<entry><literal>SerialSLRULock</literal></entry>
<entry>Waiting to access the serializable transaction conflict SLRU
cache.</entry>
</row>
<row>
<entry><literal>OldSnapshotTimeMapLock</literal></entry>
@ -1907,8 +1908,8 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to find or allocate space in shared memory.</entry>
</row>
<row>
<entry><literal>SubtransControlLock</literal></entry>
<entry>Waiting to read or update sub-transaction information.</entry>
<entry><literal>SubtransSLRULock</literal></entry>
<entry>Waiting to access the sub-transaction SLRU cache.</entry>
</row>
<row>
<entry><literal>SyncRepLock</literal></entry>
@ -1941,8 +1942,9 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting to allocate or assign a transaction id.</entry>
</row>
<row>
<entry><literal>async</literal></entry>
<entry>Waiting for I/O on an async (notify) buffer.</entry>
<entry><literal>NotifyBuffer</literal></entry>
<entry>Waiting for I/O on a <command>NOTIFY</command> message SLRU
buffer.</entry>
</row>
<row>
<entry><literal>buffer_content</literal></entry>
@ -1958,12 +1960,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
pool.</entry>
</row>
<row>
<entry><literal>clog</literal></entry>
<entry>Waiting for I/O on a clog (transaction status) buffer.</entry>
<entry><literal>XactBuffer</literal></entry>
<entry>Waiting for I/O on a transaction status SLRU buffer.</entry>
</row>
<row>
<entry><literal>commit_timestamp</literal></entry>
<entry>Waiting for I/O on commit timestamp buffer.</entry>
<entry><literal>CommitTsBuffer</literal></entry>
<entry>Waiting for I/O on a commit timestamp SLRU buffer.</entry>
</row>
<row>
<entry><literal>lock_manager</literal></entry>
@ -1971,16 +1973,17 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
join or exit a locking group (used by parallel query).</entry>
</row>
<row>
<entry><literal>multixact_member</literal></entry>
<entry>Waiting for I/O on a multixact_member buffer.</entry>
<entry><literal>MultiXactMember</literal></entry>
<entry>Waiting for I/O on a multixact member SLRU buffer.</entry>
</row>
<row>
<entry><literal>multixact_offset</literal></entry>
<entry>Waiting for I/O on a multixact offset buffer.</entry>
<entry><literal>MultiXactOffsetBuffer</literal></entry>
<entry>Waiting for I/O on a multixact offset SLRU buffer.</entry>
</row>
<row>
<entry><literal>oldserxid</literal></entry>
<entry>Waiting for I/O on an oldserxid buffer.</entry>
<entry><literal>SerialBuffer</literal></entry>
<entry>Waiting for I/O on a serializable transaction conflict SLRU
buffer.</entry>
</row>
<row>
<entry><literal>parallel_append</literal></entry>
@ -2018,8 +2021,8 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
in a parallel query.</entry>
</row>
<row>
<entry><literal>subtrans</literal></entry>
<entry>Waiting for I/O on a subtransaction buffer.</entry>
<entry><literal>SubtransBuffer</literal></entry>
<entry>Waiting for I/O on a sub-transaction SLRU buffer.</entry>
</row>
<row>
<entry><literal>tbm</literal></entry>
@ -4190,7 +4193,13 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</tgroup>
</table>
<indexterm>
<primary>SLRU</primary>
</indexterm>
<para>
<productname>PostgreSQL</productname> accesses certain on-disk information
via <firstterm>SLRU</firstterm> (simple least-recently-used) caches.
The <structname>pg_stat_slru</structname> view will contain
one row for each tracked SLRU cache, showing statistics about access
to cached pages.
@ -4484,11 +4493,15 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
Resets statistics to zero for a single SLRU cache, or for all SLRUs in
the cluster. If the argument is NULL, all counters shown in
the <structname>pg_stat_slru</structname> view for all SLRU caches are
reset. The argument can be one of <literal>async</literal>,
<literal>clog</literal>, <literal>commit_timestamp</literal>,
<literal>multixact_offset</literal>,
<literal>multixact_member</literal>, <literal>oldserxid</literal>, or
<literal>subtrans</literal> to reset the counters for only that entry.
reset. The argument can be one of
<literal>CommitTs</literal>,
<literal>MultiXactMember</literal>,
<literal>MultiXactOffset</literal>,
<literal>Notify</literal>,
<literal>Serial</literal>,
<literal>Subtrans</literal>, or
<literal>Xact</literal>
to reset the counters for only that entry.
If the argument is <literal>other</literal> (or indeed, any
unrecognized name), then the counters for all other SLRU caches, such
as extension-defined caches, are reset.

View File

@ -83,9 +83,9 @@
/*
* Link to shared-memory data structures for CLOG control
*/
static SlruCtlData ClogCtlData;
static SlruCtlData XactCtlData;
#define ClogCtl (&ClogCtlData)
#define XactCtl (&XactCtlData)
static int ZeroCLOGPage(int pageno, bool writeXlog);
@ -280,10 +280,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
"group clog threshold less than PGPROC cached subxids");
/*
* When there is contention on CLogControlLock, we try to group multiple
* When there is contention on XactSLRULock, we try to group multiple
* updates; a single leader process will perform transaction status
* updates for multiple backends so that the number of times
* CLogControlLock needs to be acquired is reduced.
* updates for multiple backends so that the number of times XactSLRULock
* needs to be acquired is reduced.
*
* For this optimization to be safe, the XID in MyPgXact and the subxids
* in MyProc must be the same as the ones for which we're setting the
@ -300,17 +300,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
nsubxids * sizeof(TransactionId)) == 0)
{
/*
* If we can immediately acquire CLogControlLock, we update the status
* of our own XID and release the lock. If not, try use group XID
* If we can immediately acquire XactSLRULock, we update the status of
* our own XID and release the lock. If not, try use group XID
* update. If that doesn't work out, fall back to waiting for the
* lock to perform an update for this transaction only.
*/
if (LWLockConditionalAcquire(CLogControlLock, LW_EXCLUSIVE))
if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE))
{
/* Got the lock without waiting! Do the update. */
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
return;
}
else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno))
@ -323,10 +323,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
}
/* Group update not applicable, or couldn't accept this page number. */
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
/*
@ -345,7 +345,7 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
Assert(status == TRANSACTION_STATUS_COMMITTED ||
status == TRANSACTION_STATUS_ABORTED ||
(status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
Assert(LWLockHeldByMeInMode(CLogControlLock, LW_EXCLUSIVE));
Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE));
/*
* If we're doing an async commit (ie, lsn is valid), then we must wait
@ -356,7 +356,7 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
* write-busy, since we don't care if the update reaches disk sooner than
* we think.
*/
slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
slotno = SimpleLruReadPage(XactCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
/*
* Set the main transaction id, if any.
@ -374,7 +374,7 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
{
for (i = 0; i < nsubxids; i++)
{
Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
TransactionIdSetStatusBit(subxids[i],
TRANSACTION_STATUS_SUB_COMMITTED,
lsn, slotno);
@ -388,20 +388,20 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
/* Set the subtransactions */
for (i = 0; i < nsubxids; i++)
{
Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
TransactionIdSetStatusBit(subxids[i], status, lsn, slotno);
}
ClogCtl->shared->page_dirty[slotno] = true;
XactCtl->shared->page_dirty[slotno] = true;
}
/*
* When we cannot immediately acquire CLogControlLock in exclusive mode at
* When we cannot immediately acquire XactSLRULock in exclusive mode at
* commit time, add ourselves to a list of processes that need their XIDs
* status update. The first process to add itself to the list will acquire
* CLogControlLock in exclusive mode and set transaction status as required
* XactSLRULock in exclusive mode and set transaction status as required
* on behalf of all group members. This avoids a great deal of contention
* around CLogControlLock when many processes are trying to commit at once,
* around XactSLRULock when many processes are trying to commit at once,
* since the lock need not be repeatedly handed off from one committing
* process to the next.
*
@ -493,7 +493,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
}
/* We are the leader. Acquire the lock on behalf of everyone. */
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/*
* Now that we've got the lock, clear the list of processes waiting for
@ -530,7 +530,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
}
/* We're done with the lock now. */
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
/*
* Now that we've released the lock, go back and wake everybody up. We
@ -559,7 +559,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
/*
* Sets the commit status of a single transaction.
*
* Must be called with CLogControlLock held
* Must be called with XactSLRULock held
*/
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
@ -570,7 +570,7 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
char byteval;
char curval;
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
/*
@ -610,8 +610,8 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
{
int lsnindex = GetLSNIndex(slotno, xid);
if (ClogCtl->shared->group_lsn[lsnindex] < lsn)
ClogCtl->shared->group_lsn[lsnindex] = lsn;
if (XactCtl->shared->group_lsn[lsnindex] < lsn)
XactCtl->shared->group_lsn[lsnindex] = lsn;
}
}
@ -643,15 +643,15 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
lsnindex = GetLSNIndex(slotno, xid);
*lsn = ClogCtl->shared->group_lsn[lsnindex];
*lsn = XactCtl->shared->group_lsn[lsnindex];
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
return status;
}
@ -690,9 +690,9 @@ CLOGShmemSize(void)
void
CLOGShmemInit(void)
{
ClogCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(ClogCtl, "clog", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
CLogControlLock, "pg_xact", LWTRANCHE_CLOG_BUFFERS);
XactCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER);
}
/*
@ -706,16 +706,16 @@ BootStrapCLOG(void)
{
int slotno;
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/* Create and zero the first page of the commit log */
slotno = ZeroCLOGPage(0, false);
/* Make sure it's written out */
SimpleLruWritePage(ClogCtl, slotno);
Assert(!ClogCtl->shared->page_dirty[slotno]);
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
/*
@ -732,7 +732,7 @@ ZeroCLOGPage(int pageno, bool writeXlog)
{
int slotno;
slotno = SimpleLruZeroPage(ClogCtl, pageno);
slotno = SimpleLruZeroPage(XactCtl, pageno);
if (writeXlog)
WriteZeroPageXlogRec(pageno);
@ -750,14 +750,14 @@ StartupCLOG(void)
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
int pageno = TransactionIdToPage(xid);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/*
* Initialize our idea of the latest page number.
*/
ClogCtl->shared->latest_page_number = pageno;
XactCtl->shared->latest_page_number = pageno;
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
/*
@ -769,12 +769,12 @@ TrimCLOG(void)
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
int pageno = TransactionIdToPage(xid);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/*
* Re-Initialize our idea of the latest page number.
*/
ClogCtl->shared->latest_page_number = pageno;
XactCtl->shared->latest_page_number = pageno;
/*
* Zero out the remainder of the current clog page. Under normal
@ -795,18 +795,18 @@ TrimCLOG(void)
int slotno;
char *byteptr;
slotno = SimpleLruReadPage(ClogCtl, pageno, false, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
slotno = SimpleLruReadPage(XactCtl, pageno, false, xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
/* Zero so-far-unused positions in the current byte */
*byteptr &= (1 << bshift) - 1;
/* Zero the rest of the page */
MemSet(byteptr + 1, 0, BLCKSZ - byteno - 1);
ClogCtl->shared->page_dirty[slotno] = true;
XactCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
/*
@ -817,7 +817,7 @@ ShutdownCLOG(void)
{
/* Flush dirty CLOG pages to disk */
TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(false);
SimpleLruFlush(ClogCtl, false);
SimpleLruFlush(XactCtl, false);
/*
* fsync pg_xact to ensure that any files flushed previously are durably
@ -836,7 +836,7 @@ CheckPointCLOG(void)
{
/* Flush dirty CLOG pages to disk */
TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(true);
SimpleLruFlush(ClogCtl, true);
SimpleLruFlush(XactCtl, true);
/*
* fsync pg_xact to ensure that any files flushed previously are durably
@ -871,12 +871,12 @@ ExtendCLOG(TransactionId newestXact)
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
@ -907,7 +907,7 @@ TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
cutoffPage = TransactionIdToPage(oldestXact);
/* Check to see if there's any files that could be removed */
if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
if (!SlruScanDirectory(XactCtl, SlruScanDirCbReportPresence, &cutoffPage))
return; /* nothing to remove */
/*
@ -928,7 +928,7 @@ TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
WriteTruncateXlogRec(cutoffPage, oldestXact, oldestxid_datoid);
/* Now we can remove the old CLOG segment(s) */
SimpleLruTruncate(ClogCtl, cutoffPage);
SimpleLruTruncate(XactCtl, cutoffPage);
}
@ -1007,13 +1007,13 @@ clog_redo(XLogReaderState *record)
memcpy(&pageno, XLogRecGetData(record), sizeof(int));
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
slotno = ZeroCLOGPage(pageno, false);
SimpleLruWritePage(ClogCtl, slotno);
Assert(!ClogCtl->shared->page_dirty[slotno]);
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
LWLockRelease(CLogControlLock);
LWLockRelease(XactSLRULock);
}
else if (info == CLOG_TRUNCATE)
{
@ -1025,11 +1025,11 @@ clog_redo(XLogReaderState *record)
* During XLOG replay, latest_page_number isn't set up yet; insert a
* suitable value to bypass the sanity test in SimpleLruTruncate.
*/
ClogCtl->shared->latest_page_number = xlrec.pageno;
XactCtl->shared->latest_page_number = xlrec.pageno;
AdvanceOldestClogXid(xlrec.oldestXact);
SimpleLruTruncate(ClogCtl, xlrec.pageno);
SimpleLruTruncate(XactCtl, xlrec.pageno);
}
else
elog(PANIC, "clog_redo: unknown op code %u", info);

View File

@ -235,7 +235,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
int slotno;
int i;
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
@ -245,13 +245,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
CommitTsCtl->shared->page_dirty[slotno] = true;
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
}
/*
* Sets the commit timestamp of a single transaction.
*
* Must be called with CommitTsControlLock held
* Must be called with CommitTsSLRULock held
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
@ -352,7 +352,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (nodeid)
*nodeid = entry.nodeid;
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
return *ts != 0;
}
@ -492,9 +492,9 @@ CommitTsShmemInit(void)
bool found;
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
CommitTsControlLock, "pg_commit_ts",
LWTRANCHE_COMMITTS_BUFFERS);
SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
CommitTsSLRULock, "pg_commit_ts",
LWTRANCHE_COMMITTS_BUFFER);
commitTsShared = ShmemInitStruct("CommitTs shared",
sizeof(CommitTimestampShared),
@ -649,9 +649,9 @@ ActivateCommitTs(void)
/*
* Re-Initialize our idea of the latest page number.
*/
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
CommitTsCtl->shared->latest_page_number = pageno;
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
/*
* If CommitTs is enabled, but it wasn't in the previous server run, we
@ -679,11 +679,11 @@ ActivateCommitTs(void)
{
int slotno;
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
}
/* Change the activation status in shared memory. */
@ -732,9 +732,9 @@ DeactivateCommitTs(void)
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
*/
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
}
/*
@ -804,12 +804,12 @@ ExtendCommitTs(TransactionId newestXact)
pageno = TransactionIdToCTsPage(newestXact);
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCommitTsPage(pageno, !InRecovery);
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
}
/*
@ -974,13 +974,13 @@ commit_ts_redo(XLogReaderState *record)
memcpy(&pageno, XLogRecGetData(record), sizeof(int));
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
LWLockRelease(CommitTsControlLock);
LWLockRelease(CommitTsSLRULock);
}
else if (info == COMMIT_TS_TRUNCATE)
{

View File

@ -192,8 +192,8 @@ static SlruCtlData MultiXactMemberCtlData;
/*
* MultiXact state shared across all backends. All this state is protected
* by MultiXactGenLock. (We also use MultiXactOffsetControlLock and
* MultiXactMemberControlLock to guard accesses to the two sets of SLRU
* by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and
* MultiXactMemberSLRULock to guard accesses to the two sets of SLRU
* buffers. For concurrency's sake, we avoid holding more than one of these
* locks at a time.)
*/
@ -850,7 +850,7 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactOffset *offptr;
int i;
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
@ -871,9 +871,9 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
/* Exchange our lock */
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
prev_pageno = -1;
@ -915,7 +915,7 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
}
/*
@ -1321,7 +1321,7 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
* time on every multixact creation.
*/
retry:
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
@ -1367,7 +1367,7 @@ retry:
if (nextMXOffset == 0)
{
/* Corner case 2: next multixact is still being filled in */
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
goto retry;
@ -1376,13 +1376,13 @@ retry:
length = nextMXOffset - offset;
}
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember));
*members = ptr;
/* Now get the members themselves. */
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
truelength = 0;
prev_pageno = -1;
@ -1422,7 +1422,7 @@ retry:
truelength++;
}
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
/*
* Copy the result into the local cache.
@ -1812,8 +1812,8 @@ MultiXactShmemSize(void)
mul_size(sizeof(MultiXactId) * 2, MaxOldestSlot))
size = SHARED_MULTIXACT_STATE_SIZE;
size = add_size(size, SimpleLruShmemSize(NUM_MXACTOFFSET_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(NUM_MXACTMEMBER_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS, 0));
return size;
}
@ -1829,13 +1829,13 @@ MultiXactShmemInit(void)
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
SimpleLruInit(MultiXactOffsetCtl,
"multixact_offset", NUM_MXACTOFFSET_BUFFERS, 0,
MultiXactOffsetControlLock, "pg_multixact/offsets",
LWTRANCHE_MXACTOFFSET_BUFFERS);
"MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0,
MultiXactOffsetSLRULock, "pg_multixact/offsets",
LWTRANCHE_MULTIXACTOFFSET_BUFFER);
SimpleLruInit(MultiXactMemberCtl,
"multixact_member", NUM_MXACTMEMBER_BUFFERS, 0,
MultiXactMemberControlLock, "pg_multixact/members",
LWTRANCHE_MXACTMEMBER_BUFFERS);
"MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0,
MultiXactMemberSLRULock, "pg_multixact/members",
LWTRANCHE_MULTIXACTMEMBER_BUFFER);
/* Initialize our shared state struct */
MultiXactState = ShmemInitStruct("Shared MultiXact State",
@ -1869,7 +1869,7 @@ BootStrapMultiXact(void)
{
int slotno;
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/* Create and zero the first page of the offsets log */
slotno = ZeroMultiXactOffsetPage(0, false);
@ -1878,9 +1878,9 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/* Create and zero the first page of the members log */
slotno = ZeroMultiXactMemberPage(0, false);
@ -1889,7 +1889,7 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
}
/*
@ -1952,7 +1952,7 @@ MaybeExtendOffsetSlru(void)
pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno))
{
@ -1967,7 +1967,7 @@ MaybeExtendOffsetSlru(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
}
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
}
/*
@ -2020,7 +2020,7 @@ TrimMultiXact(void)
LWLockRelease(MultiXactGenLock);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/*
* (Re-)Initialize our idea of the latest page number for offsets.
@ -2051,10 +2051,10 @@ TrimMultiXact(void)
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
/* And the same for members */
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/*
* (Re-)Initialize our idea of the latest page number for members.
@ -2089,7 +2089,7 @@ TrimMultiXact(void)
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
/* signal that we're officially up */
LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
@ -2402,12 +2402,12 @@ ExtendMultiXactOffset(MultiXactId multi)
pageno = MultiXactIdToOffsetPage(multi);
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactOffsetPage(pageno, true);
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
}
/*
@ -2443,12 +2443,12 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
pageno = MXOffsetToMemberPage(offset);
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactMemberPage(pageno, true);
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
}
/*
@ -2749,7 +2749,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result)
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
*result = offset;
return true;
@ -3230,13 +3230,13 @@ multixact_redo(XLogReaderState *record)
memcpy(&pageno, XLogRecGetData(record), sizeof(int));
LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
slotno = ZeroMultiXactOffsetPage(pageno, false);
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactOffsetControlLock);
LWLockRelease(MultiXactOffsetSLRULock);
}
else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
{
@ -3245,13 +3245,13 @@ multixact_redo(XLogReaderState *record)
memcpy(&pageno, XLogRecGetData(record), sizeof(int));
LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
slotno = ZeroMultiXactMemberPage(pageno, false);
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactMemberControlLock);
LWLockRelease(MultiXactMemberSLRULock);
}
else if (info == XLOG_MULTIXACT_CREATE_ID)
{

View File

@ -160,6 +160,17 @@ SimpleLruShmemSize(int nslots, int nlsns)
return BUFFERALIGN(sz) + BLCKSZ * nslots;
}
/*
* Initialize, or attach to, a simple LRU cache in shared memory.
*
* ctl: address of local (unshared) control structure.
* name: name of SLRU. (This is user-visible, pick with care!)
* nslots: number of page slots to use.
* nlsns: number of LSN groups per page (set to zero if not relevant).
* ctllock: LWLock to use to control access to the shared control structure.
* subdir: PGDATA-relative subdirectory that will contain the files.
* tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks.
*/
void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir, int tranche_id)

View File

@ -81,7 +81,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
Assert(TransactionIdIsValid(parent));
Assert(TransactionIdFollows(xid, parent));
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
@ -99,7 +99,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
SubTransCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(SubtransControlLock);
LWLockRelease(SubtransSLRULock);
}
/*
@ -129,7 +129,7 @@ SubTransGetParent(TransactionId xid)
parent = *ptr;
LWLockRelease(SubtransControlLock);
LWLockRelease(SubtransSLRULock);
return parent;
}
@ -191,9 +191,9 @@ void
SUBTRANSShmemInit(void)
{
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SimpleLruInit(SubTransCtl, "subtrans", NUM_SUBTRANS_BUFFERS, 0,
SubtransControlLock, "pg_subtrans",
LWTRANCHE_SUBTRANS_BUFFERS);
SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0,
SubtransSLRULock, "pg_subtrans",
LWTRANCHE_SUBTRANS_BUFFER);
/* Override default assumption that writes should be fsync'd */
SubTransCtl->do_fsync = false;
}
@ -213,7 +213,7 @@ BootStrapSUBTRANS(void)
{
int slotno;
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
/* Create and zero the first page of the subtrans log */
slotno = ZeroSUBTRANSPage(0);
@ -222,7 +222,7 @@ BootStrapSUBTRANS(void)
SimpleLruWritePage(SubTransCtl, slotno);
Assert(!SubTransCtl->shared->page_dirty[slotno]);
LWLockRelease(SubtransControlLock);
LWLockRelease(SubtransSLRULock);
}
/*
@ -259,7 +259,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
* Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
* the new page without regard to whatever was previously on disk.
*/
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
startPage = TransactionIdToPage(oldestActiveXID);
nextFullXid = ShmemVariableCache->nextFullXid;
@ -275,7 +275,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
}
(void) ZeroSUBTRANSPage(startPage);
LWLockRelease(SubtransControlLock);
LWLockRelease(SubtransSLRULock);
}
/*
@ -337,12 +337,12 @@ ExtendSUBTRANS(TransactionId newestXact)
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
/* Zero the page */
ZeroSUBTRANSPage(pageno);
LWLockRelease(SubtransControlLock);
LWLockRelease(SubtransSLRULock);
}

View File

@ -303,22 +303,22 @@ AdvanceNextFullTransactionIdPastXid(TransactionId xid)
/*
* Advance the cluster-wide value for the oldest valid clog entry.
*
* We must acquire CLogTruncationLock to advance the oldestClogXid. It's not
* We must acquire XactTruncationLock to advance the oldestClogXid. It's not
* necessary to hold the lock during the actual clog truncation, only when we
* advance the limit, as code looking up arbitrary xids is required to hold
* CLogTruncationLock from when it tests oldestClogXid through to when it
* XactTruncationLock from when it tests oldestClogXid through to when it
* completes the clog lookup.
*/
void
AdvanceOldestClogXid(TransactionId oldest_datfrozenxid)
{
LWLockAcquire(CLogTruncationLock, LW_EXCLUSIVE);
LWLockAcquire(XactTruncationLock, LW_EXCLUSIVE);
if (TransactionIdPrecedes(ShmemVariableCache->oldestClogXid,
oldest_datfrozenxid))
{
ShmemVariableCache->oldestClogXid = oldest_datfrozenxid;
}
LWLockRelease(CLogTruncationLock);
LWLockRelease(XactTruncationLock);
}
/*

View File

@ -107,7 +107,7 @@
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies.
*
* The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
@ -225,7 +225,7 @@ typedef struct QueuePosition
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
* should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
* should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
* catch up before the pages they'll need to read fall out of SLRU cache.
*/
#define QUEUE_CLEANUP_DELAY 4
@ -244,7 +244,7 @@ typedef struct QueueBackendStatus
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
*
* The AsyncQueueControl structure is protected by the AsyncQueueLock.
* The AsyncQueueControl structure is protected by the NotifyQueueLock.
*
* When holding the lock in SHARED mode, backends may only inspect their own
* entries as well as the head and tail pointers. Consequently we can allow a
@ -254,9 +254,9 @@ typedef struct QueueBackendStatus
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
* AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
* NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
* get NotifyQueueLock and then NotifySLRULock.
*
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
@ -292,9 +292,9 @@ static AsyncQueueControl *asyncQueueControl;
/*
* The SLRU buffer area through which we access the notification queue
*/
static SlruCtlData AsyncCtlData;
static SlruCtlData NotifyCtlData;
#define AsyncCtl (&AsyncCtlData)
#define NotifyCtl (&NotifyCtlData)
#define QUEUE_PAGESIZE BLCKSZ
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
@ -506,7 +506,7 @@ AsyncShmemSize(void)
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0));
return size;
}
@ -552,18 +552,18 @@ AsyncShmemInit(void)
/*
* Set up SLRU management of the pg_notify data.
*/
AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(AsyncCtl, "async", NUM_ASYNC_BUFFERS, 0,
AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS);
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER);
/* Override default assumption that writes should be fsync'd */
AsyncCtl->do_fsync = false;
NotifyCtl->do_fsync = false;
if (!found)
{
/*
* During start or reboot, clean out the pg_notify directory.
*/
(void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
}
}
@ -918,7 +918,7 @@ PreCommit_Notify(void)
* Make sure that we have an XID assigned to the current transaction.
* GetCurrentTransactionId is cheap if we already have an XID, but not
* so cheap if we don't, and we'd prefer not to do that work while
* holding AsyncQueueLock.
* holding NotifyQueueLock.
*/
(void) GetCurrentTransactionId();
@ -949,7 +949,7 @@ PreCommit_Notify(void)
{
/*
* Add the pending notifications to the queue. We acquire and
* release AsyncQueueLock once per page, which might be overkill
* release NotifyQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
* A full queue is very uncommon and should really not happen,
@ -959,14 +959,14 @@ PreCommit_Notify(void)
* transaction, but we have not yet committed to clog, so at this
* point in time we can still roll the transaction back.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
}
}
}
@ -1075,7 +1075,7 @@ Exec_ListenPreCommit(void)
* We need exclusive lock here so we can look at other backends' entries
* and manipulate the list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
head = QUEUE_HEAD;
max = QUEUE_TAIL;
prevListener = InvalidBackendId;
@ -1101,7 +1101,7 @@ Exec_ListenPreCommit(void)
QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
QUEUE_FIRST_LISTENER = MyBackendId;
}
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
/* Now we are listed in the global array, so remember we're listening */
amRegisteredListener = true;
@ -1308,7 +1308,7 @@ asyncQueueUnregister(void)
/*
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
@ -1327,7 +1327,7 @@ asyncQueueUnregister(void)
}
}
QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId;
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
@ -1336,7 +1336,7 @@ asyncQueueUnregister(void)
/*
* Test whether there is room to insert more notification messages.
*
* Caller must hold at least shared AsyncQueueLock.
* Caller must hold at least shared NotifyQueueLock.
*/
static bool
asyncQueueIsFull(void)
@ -1437,8 +1437,8 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* notification to write and return the first still-unwritten cell back.
* Eventually we will return NULL indicating all is done.
*
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
* locally in this function.
* We are holding NotifyQueueLock already from the caller and grab
* NotifySLRULock locally in this function.
*/
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
@ -1449,8 +1449,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
int offset;
int slotno;
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
/*
* We work with a local copy of QUEUE_HEAD, which we write back to shared
@ -1475,13 +1475,13 @@ asyncQueueAddEntries(ListCell *nextNotify)
*/
pageno = QUEUE_POS_PAGE(queue_head);
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(AsyncCtl, pageno);
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
slotno = SimpleLruReadPage(AsyncCtl, pageno, true,
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
InvalidTransactionId);
/* Note we mark the page dirty before writing in it */
AsyncCtl->shared->page_dirty[slotno] = true;
NotifyCtl->shared->page_dirty[slotno] = true;
while (nextNotify != NULL)
{
@ -1512,7 +1512,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
}
/* Now copy qe into the shared buffer page */
memcpy(AsyncCtl->shared->page_buffer[slotno] + offset,
memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
&qe,
qe.length);
@ -1527,7 +1527,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
* asyncQueueIsFull() ensured that there is room to create this
* page without overrunning the queue.
*/
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
/*
* If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
@ -1545,7 +1545,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Success, so update the global QUEUE_HEAD */
QUEUE_HEAD = queue_head;
LWLockRelease(AsyncCtlLock);
LWLockRelease(NotifySLRULock);
return nextNotify;
}
@ -1562,9 +1562,9 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
/* Advance the queue tail so we don't report a too-large result */
asyncQueueAdvanceTail();
LWLockAcquire(AsyncQueueLock, LW_SHARED);
LWLockAcquire(NotifyQueueLock, LW_SHARED);
usage = asyncQueueUsage();
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
PG_RETURN_FLOAT8(usage);
}
@ -1572,7 +1572,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
/*
* Return the fraction of the queue that is currently occupied.
*
* The caller must hold AsyncQueueLock in (at least) shared mode.
* The caller must hold NotifyQueueLock in (at least) shared mode.
*/
static double
asyncQueueUsage(void)
@ -1601,7 +1601,7 @@ asyncQueueUsage(void)
* This is unlikely given the size of the queue, but possible.
* The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
*
* Caller must hold exclusive AsyncQueueLock.
* Caller must hold exclusive NotifyQueueLock.
*/
static void
asyncQueueFillWarning(void)
@ -1665,7 +1665,7 @@ SignalBackends(void)
/*
* Identify backends that we need to signal. We don't want to send
* signals while holding the AsyncQueueLock, so this loop just builds a
* signals while holding the NotifyQueueLock, so this loop just builds a
* list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
@ -1676,7 +1676,7 @@ SignalBackends(void)
ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
count = 0;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
int32 pid = QUEUE_BACKEND_PID(i);
@ -1710,7 +1710,7 @@ SignalBackends(void)
ids[count] = i;
count++;
}
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
/* Now send signals */
for (int i = 0; i < count; i++)
@ -1720,7 +1720,7 @@ SignalBackends(void)
/*
* Note: assuming things aren't broken, a signal failure here could
* only occur if the target backend exited since we released
* AsyncQueueLock; which is unlikely but certainly possible. So we
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
@ -1930,12 +1930,12 @@ asyncQueueReadAllNotifications(void)
} page_buffer;
/* Fetch current state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
@ -1990,7 +1990,7 @@ asyncQueueReadAllNotifications(void)
* that happens it is critical that we not try to send the same message
* over and over again. Therefore, we place a PG_TRY block here that will
* forcibly advance our queue position before we lose control to an error.
* (We could alternatively retake AsyncQueueLock and move the position
* (We could alternatively retake NotifyQueueLock and move the position
* before handling each individual message, but that seems like too much
* lock traffic.)
*/
@ -2007,11 +2007,11 @@ asyncQueueReadAllNotifications(void)
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the AsyncCtlLock while we are examining the entries and
* possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
* holding the NotifySLRULock while we are examining the entries
* and possibly transmitting them to our frontend. Copy only the
* part of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
InvalidTransactionId);
if (curpage == QUEUE_POS_PAGE(head))
{
@ -2026,10 +2026,10 @@ asyncQueueReadAllNotifications(void)
copysize = QUEUE_PAGESIZE - curoffset;
}
memcpy(page_buffer.buf + curoffset,
AsyncCtl->shared->page_buffer[slotno] + curoffset,
NotifyCtl->shared->page_buffer[slotno] + curoffset,
copysize);
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease(AsyncCtlLock);
LWLockRelease(NotifySLRULock);
/*
* Process messages up to the stop position, end of page, or an
@ -2040,7 +2040,7 @@ asyncQueueReadAllNotifications(void)
* But if it has, we will receive (or have already received and
* queued) another signal and come here again.
*
* We are not holding AsyncQueueLock here! The queue can only
* We are not holding NotifyQueueLock here! The queue can only
* extend beyond the head pointer (see above) and we leave our
* backend's pointer where it is so nobody will truncate or
* rewrite pages under us. Especially we don't want to hold a lock
@ -2054,9 +2054,9 @@ asyncQueueReadAllNotifications(void)
PG_FINALLY();
{
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
}
PG_END_TRY();
@ -2070,7 +2070,7 @@ asyncQueueReadAllNotifications(void)
*
* The current page must have been fetched into page_buffer from shared
* memory. (We could access the page right in shared memory, but that
* would imply holding the AsyncCtlLock throughout this routine.)
* would imply holding the NotifySLRULock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* uncommitted transaction, or reach the end of the page.
@ -2177,7 +2177,7 @@ asyncQueueAdvanceTail(void)
int newtailpage;
int boundary;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
@ -2186,7 +2186,7 @@ asyncQueueAdvanceTail(void)
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
LWLockRelease(AsyncQueueLock);
LWLockRelease(NotifyQueueLock);
/*
* We can truncate something if the global tail advanced across an SLRU
@ -2200,10 +2200,10 @@ asyncQueueAdvanceTail(void)
if (asyncQueuePagePrecedes(oldtailpage, boundary))
{
/*
* SimpleLruTruncate() will ask for AsyncCtlLock but will also release
* the lock again.
* SimpleLruTruncate() will ask for NotifySLRULock but will also
* release the lock again.
*/
SimpleLruTruncate(AsyncCtl, newtailpage);
SimpleLruTruncate(NotifyCtl, newtailpage);
}
}

View File

@ -147,13 +147,13 @@ PgStat_MsgBgWriter BgWriterStats;
* all SLRUs without an explicit entry (e.g. SLRUs in extensions).
*/
static const char *const slru_names[] = {
"async",
"clog",
"commit_timestamp",
"multixact_offset",
"multixact_member",
"oldserxid",
"subtrans",
"CommitTs",
"MultiXactMember",
"MultiXactOffset",
"Notify",
"Serial",
"Subtrans",
"Xact",
"other" /* has to be last */
};

View File

@ -182,7 +182,7 @@ static const char *const excludeDirContents[] =
/*
* Old contents are loaded for possible debugging but are not required for
* normal operation, see OldSerXidInit().
* normal operation, see SerialInit().
*/
"pg_serial",

View File

@ -124,20 +124,20 @@ extern slock_t *ShmemLock;
*/
static const char *const BuiltinTrancheNames[] = {
/* LWTRANCHE_CLOG_BUFFERS: */
"clog",
/* LWTRANCHE_COMMITTS_BUFFERS: */
"commit_timestamp",
/* LWTRANCHE_SUBTRANS_BUFFERS: */
"subtrans",
/* LWTRANCHE_MXACTOFFSET_BUFFERS: */
"multixact_offset",
/* LWTRANCHE_MXACTMEMBER_BUFFERS: */
"multixact_member",
/* LWTRANCHE_ASYNC_BUFFERS: */
"async",
/* LWTRANCHE_OLDSERXID_BUFFERS: */
"oldserxid",
/* LWTRANCHE_XACT_BUFFER: */
"XactBuffer",
/* LWTRANCHE_COMMITTS_BUFFER: */
"CommitTSBuffer",
/* LWTRANCHE_SUBTRANS_BUFFER: */
"SubtransBuffer",
/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
"MultiXactOffsetBuffer",
/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
"MultiXactMemberBuffer",
/* LWTRANCHE_NOTIFY_BUFFER: */
"NotifyBuffer",
/* LWTRANCHE_SERIAL_BUFFER: */
"SerialBuffer",
/* LWTRANCHE_WAL_INSERT: */
"wal_insert",
/* LWTRANCHE_BUFFER_CONTENT: */

View File

@ -15,11 +15,11 @@ WALBufMappingLock 7
WALWriteLock 8
ControlFileLock 9
CheckpointLock 10
CLogControlLock 11
SubtransControlLock 12
XactSLRULock 11
SubtransSLRULock 12
MultiXactGenLock 13
MultiXactOffsetControlLock 14
MultiXactMemberControlLock 15
MultiXactOffsetSLRULock 14
MultiXactMemberSLRULock 15
RelCacheInitLock 16
CheckpointerCommLock 17
TwoPhaseStateLock 18
@ -30,22 +30,22 @@ AutovacuumLock 22
AutovacuumScheduleLock 23
SyncScanLock 24
RelationMappingLock 25
AsyncCtlLock 26
AsyncQueueLock 27
NotifySLRULock 26
NotifyQueueLock 27
SerializableXactHashLock 28
SerializableFinishedListLock 29
SerializablePredicateLockListLock 30
OldSerXidLock 31
SerialSLRULock 31
SyncRepLock 32
BackgroundWorkerLock 33
DynamicSharedMemoryControlLock 34
AutoFileLock 35
ReplicationSlotAllocationLock 36
ReplicationSlotControlLock 37
CommitTsControlLock 38
CommitTsSLRULock 38
CommitTsLock 39
ReplicationOriginLock 40
MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
CLogTruncationLock 44
XactTruncationLock 44

View File

@ -211,7 +211,7 @@
#include "utils/snapmgr.h"
/* Uncomment the next line to test the graceful degradation code. */
/* #define TEST_OLDSERXID */
/* #define TEST_SUMMARIZE_SERIAL */
/*
* Test the most selective fields first, for performance.
@ -316,37 +316,37 @@
/*
* The SLRU buffer area through which we access the old xids.
*/
static SlruCtlData OldSerXidSlruCtlData;
static SlruCtlData SerialSlruCtlData;
#define OldSerXidSlruCtl (&OldSerXidSlruCtlData)
#define SerialSlruCtl (&SerialSlruCtlData)
#define OLDSERXID_PAGESIZE BLCKSZ
#define OLDSERXID_ENTRYSIZE sizeof(SerCommitSeqNo)
#define OLDSERXID_ENTRIESPERPAGE (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
#define SERIAL_PAGESIZE BLCKSZ
#define SERIAL_ENTRYSIZE sizeof(SerCommitSeqNo)
#define SERIAL_ENTRIESPERPAGE (SERIAL_PAGESIZE / SERIAL_ENTRYSIZE)
/*
* Set maximum pages based on the number needed to track all transactions.
*/
#define OLDSERXID_MAX_PAGE (MaxTransactionId / OLDSERXID_ENTRIESPERPAGE)
#define SERIAL_MAX_PAGE (MaxTransactionId / SERIAL_ENTRIESPERPAGE)
#define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
#define SerialNextPage(page) (((page) >= SERIAL_MAX_PAGE) ? 0 : (page) + 1)
#define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
(OldSerXidSlruCtl->shared->page_buffer[slotno] + \
((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
#define SerialValue(slotno, xid) (*((SerCommitSeqNo *) \
(SerialSlruCtl->shared->page_buffer[slotno] + \
((((uint32) (xid)) % SERIAL_ENTRIESPERPAGE) * SERIAL_ENTRYSIZE))))
#define OldSerXidPage(xid) (((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE)
#define SerialPage(xid) (((uint32) (xid)) / SERIAL_ENTRIESPERPAGE)
typedef struct OldSerXidControlData
typedef struct SerialControlData
{
int headPage; /* newest initialized page */
TransactionId headXid; /* newest valid Xid in the SLRU */
TransactionId tailXid; /* oldest xmin we might be interested in */
} OldSerXidControlData;
} SerialControlData;
typedef struct OldSerXidControlData *OldSerXidControl;
typedef struct SerialControlData *SerialControl;
static OldSerXidControl oldSerXidControl;
static SerialControl serialControl;
/*
* When the oldest committed transaction on the "finished" list is moved to
@ -438,11 +438,11 @@ static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT
static void ReleaseRWConflict(RWConflict conflict);
static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
static bool OldSerXidPagePrecedesLogically(int p, int q);
static void OldSerXidInit(void);
static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
static void OldSerXidSetActiveSerXmin(TransactionId xid);
static bool SerialPagePrecedesLogically(int p, int q);
static void SerialInit(void);
static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
static void SerialSetActiveSerXmin(TransactionId xid);
static uint32 predicatelock_hash(const void *key, Size keysize);
static void SummarizeOldestCommittedSxact(void);
@ -784,26 +784,26 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
/*------------------------------------------------------------------------*/
/*
* We will work on the page range of 0..OLDSERXID_MAX_PAGE.
* We will work on the page range of 0..SERIAL_MAX_PAGE.
* Compares using wraparound logic, as is required by slru.c.
*/
static bool
OldSerXidPagePrecedesLogically(int p, int q)
SerialPagePrecedesLogically(int p, int q)
{
int diff;
/*
* We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2. Both inputs should
* be in the range 0..OLDSERXID_MAX_PAGE.
* We have to compare modulo (SERIAL_MAX_PAGE+1)/2. Both inputs should be
* in the range 0..SERIAL_MAX_PAGE.
*/
Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
Assert(p >= 0 && p <= SERIAL_MAX_PAGE);
Assert(q >= 0 && q <= SERIAL_MAX_PAGE);
diff = p - q;
if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
diff -= OLDSERXID_MAX_PAGE + 1;
else if (diff < -((int) (OLDSERXID_MAX_PAGE + 1) / 2))
diff += OLDSERXID_MAX_PAGE + 1;
if (diff >= ((SERIAL_MAX_PAGE + 1) / 2))
diff -= SERIAL_MAX_PAGE + 1;
else if (diff < -((int) (SERIAL_MAX_PAGE + 1) / 2))
diff += SERIAL_MAX_PAGE + 1;
return diff < 0;
}
@ -811,25 +811,25 @@ OldSerXidPagePrecedesLogically(int p, int q)
* Initialize for the tracking of old serializable committed xids.
*/
static void
OldSerXidInit(void)
SerialInit(void)
{
bool found;
/*
* Set up SLRU management of the pg_serial data.
*/
OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
SimpleLruInit(OldSerXidSlruCtl, "oldserxid",
NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial",
LWTRANCHE_OLDSERXID_BUFFERS);
SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
SimpleLruInit(SerialSlruCtl, "Serial",
NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial",
LWTRANCHE_SERIAL_BUFFER);
/* Override default assumption that writes should be fsync'd */
OldSerXidSlruCtl->do_fsync = false;
SerialSlruCtl->do_fsync = false;
/*
* Create or attach to the OldSerXidControl structure.
* Create or attach to the SerialControl structure.
*/
oldSerXidControl = (OldSerXidControl)
ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
serialControl = (SerialControl)
ShmemInitStruct("SerialControlData", sizeof(SerialControlData), &found);
Assert(found == IsUnderPostmaster);
if (!found)
@ -837,9 +837,9 @@ OldSerXidInit(void)
/*
* Set control information to reflect empty SLRU.
*/
oldSerXidControl->headPage = -1;
oldSerXidControl->headXid = InvalidTransactionId;
oldSerXidControl->tailXid = InvalidTransactionId;
serialControl->headPage = -1;
serialControl->headXid = InvalidTransactionId;
serialControl->tailXid = InvalidTransactionId;
}
}
@ -849,7 +849,7 @@ OldSerXidInit(void)
* An invalid commitSeqNo means that there were no conflicts out from xid.
*/
static void
OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
{
TransactionId tailXid;
int targetPage;
@ -859,16 +859,16 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
Assert(TransactionIdIsValid(xid));
targetPage = OldSerXidPage(xid);
targetPage = SerialPage(xid);
LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
/*
* If no serializable transactions are active, there shouldn't be anything
* to push out to the SLRU. Hitting this assert would mean there's
* something wrong with the earlier cleanup logic.
*/
tailXid = oldSerXidControl->tailXid;
tailXid = serialControl->tailXid;
Assert(TransactionIdIsValid(tailXid));
/*
@ -877,41 +877,41 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
* any new pages that enter the tailXid-headXid range as we advance
* headXid.
*/
if (oldSerXidControl->headPage < 0)
if (serialControl->headPage < 0)
{
firstZeroPage = OldSerXidPage(tailXid);
firstZeroPage = SerialPage(tailXid);
isNewPage = true;
}
else
{
firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
targetPage);
firstZeroPage = SerialNextPage(serialControl->headPage);
isNewPage = SerialPagePrecedesLogically(serialControl->headPage,
targetPage);
}
if (!TransactionIdIsValid(oldSerXidControl->headXid)
|| TransactionIdFollows(xid, oldSerXidControl->headXid))
oldSerXidControl->headXid = xid;
if (!TransactionIdIsValid(serialControl->headXid)
|| TransactionIdFollows(xid, serialControl->headXid))
serialControl->headXid = xid;
if (isNewPage)
oldSerXidControl->headPage = targetPage;
serialControl->headPage = targetPage;
if (isNewPage)
{
/* Initialize intervening pages. */
while (firstZeroPage != targetPage)
{
(void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
firstZeroPage = OldSerXidNextPage(firstZeroPage);
(void) SimpleLruZeroPage(SerialSlruCtl, firstZeroPage);
firstZeroPage = SerialNextPage(firstZeroPage);
}
slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
slotno = SimpleLruZeroPage(SerialSlruCtl, targetPage);
}
else
slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
slotno = SimpleLruReadPage(SerialSlruCtl, targetPage, true, xid);
OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
SerialValue(slotno, xid) = minConflictCommitSeqNo;
SerialSlruCtl->shared->page_dirty[slotno] = true;
LWLockRelease(OldSerXidLock);
LWLockRelease(SerialSLRULock);
}
/*
@ -920,7 +920,7 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
* will be returned.
*/
static SerCommitSeqNo
OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
SerialGetMinConflictCommitSeqNo(TransactionId xid)
{
TransactionId headXid;
TransactionId tailXid;
@ -929,10 +929,10 @@ OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
Assert(TransactionIdIsValid(xid));
LWLockAcquire(OldSerXidLock, LW_SHARED);
headXid = oldSerXidControl->headXid;
tailXid = oldSerXidControl->tailXid;
LWLockRelease(OldSerXidLock);
LWLockAcquire(SerialSLRULock, LW_SHARED);
headXid = serialControl->headXid;
tailXid = serialControl->tailXid;
LWLockRelease(SerialSLRULock);
if (!TransactionIdIsValid(headXid))
return 0;
@ -944,13 +944,13 @@ OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
return 0;
/*
* The following function must be called without holding OldSerXidLock,
* The following function must be called without holding SerialSLRULock,
* but will return with that lock held, which must then be released.
*/
slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
OldSerXidPage(xid), xid);
val = OldSerXidValue(slotno, xid);
LWLockRelease(OldSerXidLock);
slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl,
SerialPage(xid), xid);
val = SerialValue(slotno, xid);
LWLockRelease(SerialSLRULock);
return val;
}
@ -961,9 +961,9 @@ OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
* the SLRU can be discarded.
*/
static void
OldSerXidSetActiveSerXmin(TransactionId xid)
SerialSetActiveSerXmin(TransactionId xid)
{
LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
/*
* When no sxacts are active, nothing overlaps, set the xid values to
@ -973,9 +973,9 @@ OldSerXidSetActiveSerXmin(TransactionId xid)
*/
if (!TransactionIdIsValid(xid))
{
oldSerXidControl->tailXid = InvalidTransactionId;
oldSerXidControl->headXid = InvalidTransactionId;
LWLockRelease(OldSerXidLock);
serialControl->tailXid = InvalidTransactionId;
serialControl->headXid = InvalidTransactionId;
LWLockRelease(SerialSLRULock);
return;
}
@ -987,22 +987,22 @@ OldSerXidSetActiveSerXmin(TransactionId xid)
*/
if (RecoveryInProgress())
{
Assert(oldSerXidControl->headPage < 0);
if (!TransactionIdIsValid(oldSerXidControl->tailXid)
|| TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
Assert(serialControl->headPage < 0);
if (!TransactionIdIsValid(serialControl->tailXid)
|| TransactionIdPrecedes(xid, serialControl->tailXid))
{
oldSerXidControl->tailXid = xid;
serialControl->tailXid = xid;
}
LWLockRelease(OldSerXidLock);
LWLockRelease(SerialSLRULock);
return;
}
Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
|| TransactionIdFollows(xid, oldSerXidControl->tailXid));
Assert(!TransactionIdIsValid(serialControl->tailXid)
|| TransactionIdFollows(xid, serialControl->tailXid));
oldSerXidControl->tailXid = xid;
serialControl->tailXid = xid;
LWLockRelease(OldSerXidLock);
LWLockRelease(SerialSLRULock);
}
/*
@ -1016,19 +1016,19 @@ CheckPointPredicate(void)
{
int tailPage;
LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
/* Exit quickly if the SLRU is currently not in use. */
if (oldSerXidControl->headPage < 0)
if (serialControl->headPage < 0)
{
LWLockRelease(OldSerXidLock);
LWLockRelease(SerialSLRULock);
return;
}
if (TransactionIdIsValid(oldSerXidControl->tailXid))
if (TransactionIdIsValid(serialControl->tailXid))
{
/* We can truncate the SLRU up to the page containing tailXid */
tailPage = OldSerXidPage(oldSerXidControl->tailXid);
tailPage = SerialPage(serialControl->tailXid);
}
else
{
@ -1042,14 +1042,14 @@ CheckPointPredicate(void)
* won't be removed until XID horizon advances enough to make it
* current again.
*/
tailPage = oldSerXidControl->headPage;
oldSerXidControl->headPage = -1;
tailPage = serialControl->headPage;
serialControl->headPage = -1;
}
LWLockRelease(OldSerXidLock);
LWLockRelease(SerialSLRULock);
/* Truncate away pages that are no longer required */
SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
SimpleLruTruncate(SerialSlruCtl, tailPage);
/*
* Flush dirty SLRU pages to disk
@ -1061,7 +1061,7 @@ CheckPointPredicate(void)
* before deleting the file in which they sit, which would be completely
* pointless.
*/
SimpleLruFlush(OldSerXidSlruCtl, true);
SimpleLruFlush(SerialSlruCtl, true);
}
/*------------------------------------------------------------------------*/
@ -1275,7 +1275,7 @@ InitPredicateLocks(void)
* Initialize the SLRU storage for old committed serializable
* transactions.
*/
OldSerXidInit();
SerialInit();
}
/*
@ -1324,8 +1324,8 @@ PredicateLockShmemSize(void)
size = add_size(size, sizeof(SHM_QUEUE));
/* Shared memory structures for SLRU tracking of old committed xids. */
size = add_size(size, sizeof(OldSerXidControlData));
size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
size = add_size(size, sizeof(SerialControlData));
size = add_size(size, SimpleLruShmemSize(NUM_SERIAL_BUFFERS, 0));
return size;
}
@ -1462,8 +1462,8 @@ SummarizeOldestCommittedSxact(void)
/* Add to SLRU summary information. */
if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
SerialAdd(sxact->topXid, SxactHasConflictOut(sxact)
? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
/* Summarize and release the detail. */
ReleaseOneSerializableXact(sxact, false, true);
@ -1727,7 +1727,7 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
* (in particular, an elog(ERROR) in procarray.c would cause us to leak
* the sxact). Consider refactoring to avoid this.
*/
#ifdef TEST_OLDSERXID
#ifdef TEST_SUMMARIZE_SERIAL
SummarizeOldestCommittedSxact();
#endif
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
@ -1782,7 +1782,7 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
Assert(PredXact->SxactGlobalXminCount == 0);
PredXact->SxactGlobalXmin = snapshot->xmin;
PredXact->SxactGlobalXminCount = 1;
OldSerXidSetActiveSerXmin(snapshot->xmin);
SerialSetActiveSerXmin(snapshot->xmin);
}
else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
{
@ -3231,7 +3231,7 @@ SetNewSxactGlobalXmin(void)
}
}
OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
SerialSetActiveSerXmin(PredXact->SxactGlobalXmin);
}
/*
@ -4084,7 +4084,7 @@ CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot s
*/
SerCommitSeqNo conflictCommitSeqNo;
conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
conflictCommitSeqNo = SerialGetMinConflictCommitSeqNo(xid);
if (conflictCommitSeqNo != 0)
{
if (conflictCommitSeqNo != InvalidSerCommitSeqNo
@ -5069,7 +5069,7 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
{
PredXact->SxactGlobalXmin = sxact->xmin;
PredXact->SxactGlobalXminCount = 1;
OldSerXidSetActiveSerXmin(sxact->xmin);
SerialSetActiveSerXmin(sxact->xmin);
}
else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
{

View File

@ -82,7 +82,7 @@ typedef struct
* to the low 32 bits of the transaction ID (i.e. the actual XID, without the
* epoch).
*
* The caller must hold CLogTruncationLock since it's dealing with arbitrary
* The caller must hold XactTruncationLock since it's dealing with arbitrary
* XIDs, and must continue to hold it until it's done with any clog lookups
* relating to those XIDs.
*/
@ -118,13 +118,13 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
U64FromFullTransactionId(fxid)))));
/*
* ShmemVariableCache->oldestClogXid is protected by CLogTruncationLock,
* ShmemVariableCache->oldestClogXid is protected by XactTruncationLock,
* but we don't acquire that lock here. Instead, we require the caller to
* acquire it, because the caller is presumably going to look up the
* returned XID. If we took and released the lock within this function, a
* CLOG truncation could occur before the caller finished with the XID.
*/
Assert(LWLockHeldByMe(CLogTruncationLock));
Assert(LWLockHeldByMe(XactTruncationLock));
/*
* If the transaction ID has wrapped around, it's definitely too old to
@ -672,7 +672,7 @@ pg_xact_status(PG_FUNCTION_ARGS)
* We must protect against concurrent truncation of clog entries to avoid
* an I/O error on SLRU lookup.
*/
LWLockAcquire(CLogTruncationLock, LW_SHARED);
LWLockAcquire(XactTruncationLock, LW_SHARED);
if (TransactionIdInRecentPast(fxid, &xid))
{
Assert(TransactionIdIsValid(xid));
@ -706,7 +706,7 @@ pg_xact_status(PG_FUNCTION_ARGS)
{
status = NULL;
}
LWLockRelease(CLogTruncationLock);
LWLockRelease(XactTruncationLock);
if (status == NULL)
PG_RETURN_NULL();

View File

@ -75,7 +75,7 @@ static const char *excludeDirContents[] =
/*
* Old contents are loaded for possible debugging but are not required for
* normal operation, see OldSerXidInit().
* normal operation, see SerialInit().
*/
"pg_serial",

View File

@ -29,8 +29,8 @@
#define MaxMultiXactOffset ((MultiXactOffset) 0xFFFFFFFF)
/* Number of SLRU buffers to use for multixact */
#define NUM_MXACTOFFSET_BUFFERS 8
#define NUM_MXACTMEMBER_BUFFERS 16
#define NUM_MULTIXACTOFFSET_BUFFERS 8
#define NUM_MULTIXACTMEMBER_BUFFERS 16
/*
* Possible multixact lock modes ("status"). The first four modes are for

View File

@ -197,7 +197,7 @@ typedef struct VariableCacheData
* aborted */
/*
* These fields are protected by CLogTruncationLock
* These fields are protected by XactTruncationLock
*/
TransactionId oldestClogXid; /* oldest it's safe to look up in clog */

View File

@ -18,7 +18,7 @@
/*
* The number of SLRU page buffers we use for the notification queue.
*/
#define NUM_ASYNC_BUFFERS 8
#define NUM_NOTIFY_BUFFERS 8
extern bool Trace_notify;
extern volatile sig_atomic_t notifyInterruptPending;

View File

@ -195,13 +195,13 @@ extern void LWLockInitialize(LWLock *lock, int tranche_id);
*/
typedef enum BuiltinTrancheIds
{
LWTRANCHE_CLOG_BUFFERS = NUM_INDIVIDUAL_LWLOCKS,
LWTRANCHE_COMMITTS_BUFFERS,
LWTRANCHE_SUBTRANS_BUFFERS,
LWTRANCHE_MXACTOFFSET_BUFFERS,
LWTRANCHE_MXACTMEMBER_BUFFERS,
LWTRANCHE_ASYNC_BUFFERS,
LWTRANCHE_OLDSERXID_BUFFERS,
LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_SUBTRANS_BUFFER,
LWTRANCHE_MULTIXACTOFFSET_BUFFER,
LWTRANCHE_MULTIXACTMEMBER_BUFFER,
LWTRANCHE_NOTIFY_BUFFER,
LWTRANCHE_SERIAL_BUFFER,
LWTRANCHE_WAL_INSERT,
LWTRANCHE_BUFFER_CONTENT,
LWTRANCHE_BUFFER_IO_IN_PROGRESS,

View File

@ -27,8 +27,8 @@ extern int max_predicate_locks_per_relation;
extern int max_predicate_locks_per_page;
/* Number of SLRU buffers to use for predicate locking */
#define NUM_OLDSERXID_BUFFERS 16
/* Number of SLRU buffers to use for Serial SLRU */
#define NUM_SERIAL_BUFFERS 16
/*
* A handle used for sharing SERIALIZABLEXACT objects between the participants