Compare commits

...

4 Commits

Author SHA1 Message Date
Alexander Korotkov a60b8a58f4 Add SLRU tests for 64-bit page case
4ed8f0913b added 64-bit page numbering for SLRU.  This commit adds tests for
page numbers higher than 2^32.

Author: Maxim Orlov
Reviewed-by: Aleksander Alekseev, Alexander Korotkov
Discussion: https://postgr.es/m/CACG%3DezZe1NQSCnfHOr78AtAZxJZeCvxrts0ygrxYwe%3DpyyjVWA%40mail.gmail.com
Discussion: https://postgr.es/m/CAJ7c6TPDOYBYrnCAeyndkBktO0WG2xSdYduTF0nxq%2BvfkmTF5Q%40mail.gmail.com
2023-11-29 01:44:01 +02:00
Alexander Korotkov 5a1dfde833 Make use FullTransactionId in 2PC filenames
Switch from using TransactionId to FullTransactionId in naming of 2PC files.
Transaction state file in the pg_twophase directory now have extra 8 bytes in
the name to address an epoch of a given xid.

Author: Maxim Orlov, Aleksander Alekseev, Alexander Korotkov, Teodor Sigaev
Author: Nikita Glukhov, Pavel Borisov, Yura Sokolov
Reviewed-by: Jacob Champion, Heikki Linnakangas, Alexander Korotkov
Reviewed-by: Japin Li, Pavel Borisov, Tom Lane, Peter Eisentraut, Andres Freund
Reviewed-by: Andrey Borodin, Dilip Kumar, Aleksander Alekseev
Discussion: https://postgr.es/m/CACG%3DezZe1NQSCnfHOr78AtAZxJZeCvxrts0ygrxYwe%3DpyyjVWA%40mail.gmail.com
Discussion: https://postgr.es/m/CAJ7c6TPDOYBYrnCAeyndkBktO0WG2xSdYduTF0nxq%2BvfkmTF5Q%40mail.gmail.com
2023-11-29 01:43:36 +02:00
Alexander Korotkov 2cdf131c46 Use larger segment file names for pg_notify
This avoids the wraparound in async.c and removes the corresponding code
complexity. The maximum amount of allocated SLRU pages for NOTIFY / LISTEN
queue is now determined by the max_notify_queue_pages GUC. The default
value is 1048576. It allows to consume up to 8 GB of disk space which is
exactly the limit we had previously.

Author: Maxim Orlov, Aleksander Alekseev, Alexander Korotkov, Teodor Sigaev
Author: Nikita Glukhov, Pavel Borisov, Yura Sokolov
Reviewed-by: Jacob Champion, Heikki Linnakangas, Alexander Korotkov
Reviewed-by: Japin Li, Pavel Borisov, Tom Lane, Peter Eisentraut, Andres Freund
Reviewed-by: Andrey Borodin, Dilip Kumar, Aleksander Alekseev
Discussion: https://postgr.es/m/CACG%3DezZe1NQSCnfHOr78AtAZxJZeCvxrts0ygrxYwe%3DpyyjVWA%40mail.gmail.com
Discussion: https://postgr.es/m/CAJ7c6TPDOYBYrnCAeyndkBktO0WG2xSdYduTF0nxq%2BvfkmTF5Q%40mail.gmail.com
2023-11-29 01:41:48 +02:00
Alexander Korotkov 4ed8f0913b Index SLRUs by 64-bit integers rather than by 32-bit integers
We've had repeated bugs in the area of handling SLRU wraparound in the past,
some of which have caused data loss. Switching to an indexing system for SLRUs
that does not wrap around should allow us to get rid of a whole bunch
of problems and improve the overall reliability of the system.

This particular patch however only changes the indexing and doesn't address
the wraparound per se. This is going to be done in the following patches.

Author: Maxim Orlov, Aleksander Alekseev, Alexander Korotkov, Teodor Sigaev
Author: Nikita Glukhov, Pavel Borisov, Yura Sokolov
Reviewed-by: Jacob Champion, Heikki Linnakangas, Alexander Korotkov
Reviewed-by: Japin Li, Pavel Borisov, Tom Lane, Peter Eisentraut, Andres Freund
Reviewed-by: Andrey Borodin, Dilip Kumar, Aleksander Alekseev
Discussion: https://postgr.es/m/CACG%3DezZe1NQSCnfHOr78AtAZxJZeCvxrts0ygrxYwe%3DpyyjVWA%40mail.gmail.com
Discussion: https://postgr.es/m/CAJ7c6TPDOYBYrnCAeyndkBktO0WG2xSdYduTF0nxq%2BvfkmTF5Q%40mail.gmail.com
2023-11-29 01:40:56 +02:00
26 changed files with 578 additions and 294 deletions

View File

@ -2151,6 +2151,22 @@ include_dir 'conf.d'
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-max-notify-queue-pages" xreflabel="max_notify_queue_pages">
<term><varname>max_notify_queue_pages</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_notify_queue_pages</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the maximum amount of allocated pages for
<xref linkend="sql-notify"/> / <xref linkend="sql-listen"/> queue.
The default value is 1048576. For 8 KB pages it allows to consume
up to 8 GB of disk space.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</sect2> </sect2>

View File

@ -148,6 +148,7 @@ Asynchronous notification "virtual" received from server process with PID 8448.
<simplelist type="inline"> <simplelist type="inline">
<member><xref linkend="sql-notify"/></member> <member><xref linkend="sql-notify"/></member>
<member><xref linkend="sql-unlisten"/></member> <member><xref linkend="sql-unlisten"/></member>
<member><xref linkend="guc-max-notify-queue-pages"/></member>
</simplelist> </simplelist>
</refsect1> </refsect1>
</refentry> </refentry>

View File

@ -228,6 +228,7 @@ Asynchronous notification "foo" with payload "payload" received from server proc
<simplelist type="inline"> <simplelist type="inline">
<member><xref linkend="sql-listen"/></member> <member><xref linkend="sql-listen"/></member>
<member><xref linkend="sql-unlisten"/></member> <member><xref linkend="sql-unlisten"/></member>
<member><xref linkend="guc-max-notify-queue-pages"/></member>
</simplelist> </simplelist>
</refsect1> </refsect1>
</refentry> </refentry>

View File

@ -25,18 +25,18 @@ clog_desc(StringInfo buf, XLogReaderState *record)
if (info == CLOG_ZEROPAGE) if (info == CLOG_ZEROPAGE)
{ {
int pageno; int64 pageno;
memcpy(&pageno, rec, sizeof(int)); memcpy(&pageno, rec, sizeof(pageno));
appendStringInfo(buf, "page %d", pageno); appendStringInfo(buf, "page %lld", (long long) pageno);
} }
else if (info == CLOG_TRUNCATE) else if (info == CLOG_TRUNCATE)
{ {
xl_clog_truncate xlrec; xl_clog_truncate xlrec;
memcpy(&xlrec, rec, sizeof(xl_clog_truncate)); memcpy(&xlrec, rec, sizeof(xl_clog_truncate));
appendStringInfo(buf, "page %d; oldestXact %u", appendStringInfo(buf, "page %lld; oldestXact %u",
xlrec.pageno, xlrec.oldestXact); (long long) xlrec.pageno, xlrec.oldestXact);
} }
} }

View File

@ -26,17 +26,17 @@ commit_ts_desc(StringInfo buf, XLogReaderState *record)
if (info == COMMIT_TS_ZEROPAGE) if (info == COMMIT_TS_ZEROPAGE)
{ {
int pageno; int64 pageno;
memcpy(&pageno, rec, sizeof(int)); memcpy(&pageno, rec, sizeof(pageno));
appendStringInfo(buf, "%d", pageno); appendStringInfo(buf, "%lld", (long long) pageno);
} }
else if (info == COMMIT_TS_TRUNCATE) else if (info == COMMIT_TS_TRUNCATE)
{ {
xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) rec; xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) rec;
appendStringInfo(buf, "pageno %d, oldestXid %u", appendStringInfo(buf, "pageno %lld, oldestXid %u",
trunc->pageno, trunc->oldestXid); (long long) trunc->pageno, trunc->oldestXid);
} }
} }

View File

@ -55,10 +55,10 @@ multixact_desc(StringInfo buf, XLogReaderState *record)
if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE || if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE ||
info == XLOG_MULTIXACT_ZERO_MEM_PAGE) info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
{ {
int pageno; int64 pageno;
memcpy(&pageno, rec, sizeof(int)); memcpy(&pageno, rec, sizeof(pageno));
appendStringInfo(buf, "%d", pageno); appendStringInfo(buf, "%lld", (long long) pageno);
} }
else if (info == XLOG_MULTIXACT_CREATE_ID) else if (info == XLOG_MULTIXACT_CREATE_ID)
{ {

View File

@ -62,7 +62,17 @@
#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE) #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1) #define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1)
#define TransactionIdToPage(xid) ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
/*
* Although we return an int64 the actual value can't currently exceed
* 0xFFFFFFFF/CLOG_XACTS_PER_PAGE.
*/
static inline int64
TransactionIdToPage(TransactionId xid)
{
return xid / (int64) CLOG_XACTS_PER_PAGE;
}
#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) #define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToByte(xid) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE) #define TransactionIdToByte(xid) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
#define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE) #define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
@ -89,24 +99,24 @@ static SlruCtlData XactCtlData;
#define XactCtl (&XactCtlData) #define XactCtl (&XactCtlData)
static int ZeroCLOGPage(int pageno, bool writeXlog); static int ZeroCLOGPage(int64 pageno, bool writeXlog);
static bool CLOGPagePrecedes(int page1, int page2); static bool CLOGPagePrecedes(int64 page1, int64 page2);
static void WriteZeroPageXlogRec(int pageno); static void WriteZeroPageXlogRec(int64 pageno);
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact, static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXact,
Oid oldestXactDb); Oid oldestXactDb);
static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids, static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, TransactionId *subxids, XidStatus status,
XLogRecPtr lsn, int pageno, XLogRecPtr lsn, int64 pageno,
bool all_xact_same_page); bool all_xact_same_page);
static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status, static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status,
XLogRecPtr lsn, int slotno); XLogRecPtr lsn, int slotno);
static void set_status_by_pages(int nsubxids, TransactionId *subxids, static void set_status_by_pages(int nsubxids, TransactionId *subxids,
XidStatus status, XLogRecPtr lsn); XidStatus status, XLogRecPtr lsn);
static bool TransactionGroupUpdateXidStatus(TransactionId xid, static bool TransactionGroupUpdateXidStatus(TransactionId xid,
XidStatus status, XLogRecPtr lsn, int pageno); XidStatus status, XLogRecPtr lsn, int64 pageno);
static void TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, static void TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, TransactionId *subxids, XidStatus status,
XLogRecPtr lsn, int pageno); XLogRecPtr lsn, int64 pageno);
/* /*
@ -162,7 +172,7 @@ void
TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn) TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
{ {
int pageno = TransactionIdToPage(xid); /* get page of parent */ int64 pageno = TransactionIdToPage(xid); /* get page of parent */
int i; int i;
Assert(status == TRANSACTION_STATUS_COMMITTED || Assert(status == TRANSACTION_STATUS_COMMITTED ||
@ -236,7 +246,7 @@ static void
set_status_by_pages(int nsubxids, TransactionId *subxids, set_status_by_pages(int nsubxids, TransactionId *subxids,
XidStatus status, XLogRecPtr lsn) XidStatus status, XLogRecPtr lsn)
{ {
int pageno = TransactionIdToPage(subxids[0]); int64 pageno = TransactionIdToPage(subxids[0]);
int offset = 0; int offset = 0;
int i = 0; int i = 0;
@ -245,7 +255,7 @@ set_status_by_pages(int nsubxids, TransactionId *subxids,
while (i < nsubxids) while (i < nsubxids)
{ {
int num_on_page = 0; int num_on_page = 0;
int nextpageno; int64 nextpageno;
do do
{ {
@ -271,7 +281,7 @@ set_status_by_pages(int nsubxids, TransactionId *subxids,
static void static void
TransactionIdSetPageStatus(TransactionId xid, int nsubxids, TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, TransactionId *subxids, XidStatus status,
XLogRecPtr lsn, int pageno, XLogRecPtr lsn, int64 pageno,
bool all_xact_same_page) bool all_xact_same_page)
{ {
/* Can't use group update when PGPROC overflows. */ /* Can't use group update when PGPROC overflows. */
@ -337,7 +347,7 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
static void static void
TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, TransactionId *subxids, XidStatus status,
XLogRecPtr lsn, int pageno) XLogRecPtr lsn, int64 pageno)
{ {
int slotno; int slotno;
int i; int i;
@ -411,7 +421,7 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
*/ */
static bool static bool
TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
XLogRecPtr lsn, int pageno) XLogRecPtr lsn, int64 pageno)
{ {
volatile PROC_HDR *procglobal = ProcGlobal; volatile PROC_HDR *procglobal = ProcGlobal;
PGPROC *proc = MyProc; PGPROC *proc = MyProc;
@ -637,7 +647,7 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
XidStatus XidStatus
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
{ {
int pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid); int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno; int slotno;
@ -697,7 +707,7 @@ CLOGShmemInit(void)
XactCtl->PagePrecedes = CLOGPagePrecedes; XactCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE, SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER, XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER,
SYNC_HANDLER_CLOG); SYNC_HANDLER_CLOG, false);
SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE); SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
} }
@ -734,7 +744,7 @@ BootStrapCLOG(void)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
ZeroCLOGPage(int pageno, bool writeXlog) ZeroCLOGPage(int64 pageno, bool writeXlog)
{ {
int slotno; int slotno;
@ -754,7 +764,7 @@ void
StartupCLOG(void) StartupCLOG(void)
{ {
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
int pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
@ -773,7 +783,7 @@ void
TrimCLOG(void) TrimCLOG(void)
{ {
TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
int pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
@ -838,7 +848,7 @@ CheckPointCLOG(void)
void void
ExtendCLOG(TransactionId newestXact) ExtendCLOG(TransactionId newestXact)
{ {
int pageno; int64 pageno;
/* /*
* No work except at first XID of a page. But beware: just after * No work except at first XID of a page. But beware: just after
@ -877,7 +887,7 @@ ExtendCLOG(TransactionId newestXact)
void void
TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid) TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
{ {
int cutoffPage; int64 cutoffPage;
/* /*
* The cutoff point is the start of the segment containing oldestXact. We * The cutoff point is the start of the segment containing oldestXact. We
@ -930,7 +940,7 @@ TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
* don't optimize that edge case. * don't optimize that edge case.
*/ */
static bool static bool
CLOGPagePrecedes(int page1, int page2) CLOGPagePrecedes(int64 page1, int64 page2)
{ {
TransactionId xid1; TransactionId xid1;
TransactionId xid2; TransactionId xid2;
@ -949,10 +959,10 @@ CLOGPagePrecedes(int page1, int page2)
* Write a ZEROPAGE xlog record * Write a ZEROPAGE xlog record
*/ */
static void static void
WriteZeroPageXlogRec(int pageno) WriteZeroPageXlogRec(int64 pageno)
{ {
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) (&pageno), sizeof(int)); XLogRegisterData((char *) (&pageno), sizeof(pageno));
(void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE);
} }
@ -963,7 +973,7 @@ WriteZeroPageXlogRec(int pageno)
* in TruncateCLOG(). * in TruncateCLOG().
*/ */
static void static void
WriteTruncateXlogRec(int pageno, TransactionId oldestXact, Oid oldestXactDb) WriteTruncateXlogRec(int64 pageno, TransactionId oldestXact, Oid oldestXactDb)
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
xl_clog_truncate xlrec; xl_clog_truncate xlrec;
@ -991,10 +1001,10 @@ clog_redo(XLogReaderState *record)
if (info == CLOG_ZEROPAGE) if (info == CLOG_ZEROPAGE)
{ {
int pageno; int64 pageno;
int slotno; int slotno;
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);

View File

@ -65,8 +65,17 @@ typedef struct CommitTimestampEntry
#define COMMIT_TS_XACTS_PER_PAGE \ #define COMMIT_TS_XACTS_PER_PAGE \
(BLCKSZ / SizeOfCommitTimestampEntry) (BLCKSZ / SizeOfCommitTimestampEntry)
#define TransactionIdToCTsPage(xid) \
((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE) /*
* Although we return an int64 the actual value can't currently exceed
* 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
*/
static inline int64
TransactionIdToCTsPage(TransactionId xid)
{
return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
}
#define TransactionIdToCTsEntry(xid) \ #define TransactionIdToCTsEntry(xid) \
((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE) ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
@ -103,16 +112,16 @@ bool track_commit_timestamp;
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts, TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int pageno); RepOriginId nodeid, int64 pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
RepOriginId nodeid, int slotno); RepOriginId nodeid, int slotno);
static void error_commit_ts_disabled(void); static void error_commit_ts_disabled(void);
static int ZeroCommitTsPage(int pageno, bool writeXlog); static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2); static bool CommitTsPagePrecedes(int64 page1, int64 page2);
static void ActivateCommitTs(void); static void ActivateCommitTs(void);
static void DeactivateCommitTs(void); static void DeactivateCommitTs(void);
static void WriteZeroPageXlogRec(int pageno); static void WriteZeroPageXlogRec(int64 pageno);
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid); static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
/* /*
* TransactionTreeSetCommitTsData * TransactionTreeSetCommitTsData
@ -170,7 +179,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
i = 0; i = 0;
for (;;) for (;;)
{ {
int pageno = TransactionIdToCTsPage(headxid); int64 pageno = TransactionIdToCTsPage(headxid);
int j; int j;
for (j = i; j < nsubxids; j++) for (j = i; j < nsubxids; j++)
@ -214,7 +223,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
static void static void
SetXidCommitTsInPage(TransactionId xid, int nsubxids, SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts, TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int pageno) RepOriginId nodeid, int64 pageno)
{ {
int slotno; int slotno;
int i; int i;
@ -266,7 +275,7 @@ bool
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
RepOriginId *nodeid) RepOriginId *nodeid)
{ {
int pageno = TransactionIdToCTsPage(xid); int64 pageno = TransactionIdToCTsPage(xid);
int entryno = TransactionIdToCTsEntry(xid); int entryno = TransactionIdToCTsEntry(xid);
int slotno; int slotno;
CommitTimestampEntry entry; CommitTimestampEntry entry;
@ -523,7 +532,8 @@ CommitTsShmemInit(void)
SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0, SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
CommitTsSLRULock, "pg_commit_ts", CommitTsSLRULock, "pg_commit_ts",
LWTRANCHE_COMMITTS_BUFFER, LWTRANCHE_COMMITTS_BUFFER,
SYNC_HANDLER_COMMIT_TS); SYNC_HANDLER_COMMIT_TS,
false);
SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE); SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
commitTsShared = ShmemInitStruct("CommitTs shared", commitTsShared = ShmemInitStruct("CommitTs shared",
@ -569,7 +579,7 @@ BootStrapCommitTs(void)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
ZeroCommitTsPage(int pageno, bool writeXlog) ZeroCommitTsPage(int64 pageno, bool writeXlog)
{ {
int slotno; int slotno;
@ -662,7 +672,7 @@ static void
ActivateCommitTs(void) ActivateCommitTs(void)
{ {
TransactionId xid; TransactionId xid;
int pageno; int64 pageno;
/* If we've done this already, there's nothing to do */ /* If we've done this already, there's nothing to do */
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
@ -795,7 +805,7 @@ CheckPointCommitTs(void)
void void
ExtendCommitTs(TransactionId newestXact) ExtendCommitTs(TransactionId newestXact)
{ {
int pageno; int64 pageno;
/* /*
* Nothing to do if module not enabled. Note we do an unlocked read of * Nothing to do if module not enabled. Note we do an unlocked read of
@ -833,7 +843,7 @@ ExtendCommitTs(TransactionId newestXact)
void void
TruncateCommitTs(TransactionId oldestXact) TruncateCommitTs(TransactionId oldestXact)
{ {
int cutoffPage; int64 cutoffPage;
/* /*
* The cutoff point is the start of the segment containing oldestXact. We * The cutoff point is the start of the segment containing oldestXact. We
@ -918,7 +928,7 @@ AdvanceOldestCommitTsXid(TransactionId oldestXact)
* oldestXact=N+2.1, it would be precious at oldestXact=N+2.9. * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
*/ */
static bool static bool
CommitTsPagePrecedes(int page1, int page2) CommitTsPagePrecedes(int64 page1, int64 page2)
{ {
TransactionId xid1; TransactionId xid1;
TransactionId xid2; TransactionId xid2;
@ -937,10 +947,10 @@ CommitTsPagePrecedes(int page1, int page2)
* Write a ZEROPAGE xlog record * Write a ZEROPAGE xlog record
*/ */
static void static void
WriteZeroPageXlogRec(int pageno) WriteZeroPageXlogRec(int64 pageno)
{ {
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) (&pageno), sizeof(int)); XLogRegisterData((char *) (&pageno), sizeof(pageno));
(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
} }
@ -948,7 +958,7 @@ WriteZeroPageXlogRec(int pageno)
* Write a TRUNCATE xlog record * Write a TRUNCATE xlog record
*/ */
static void static void
WriteTruncateXlogRec(int pageno, TransactionId oldestXid) WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
{ {
xl_commit_ts_truncate xlrec; xl_commit_ts_truncate xlrec;
@ -973,10 +983,10 @@ commit_ts_redo(XLogReaderState *record)
if (info == COMMIT_TS_ZEROPAGE) if (info == COMMIT_TS_ZEROPAGE)
{ {
int pageno; int64 pageno;
int slotno; int slotno;
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);

View File

@ -354,10 +354,10 @@ static void mXactCachePut(MultiXactId multi, int nmembers,
static char *mxstatus_to_string(MultiXactStatus status); static char *mxstatus_to_string(MultiXactStatus status);
/* management of SLRU infrastructure */ /* management of SLRU infrastructure */
static int ZeroMultiXactOffsetPage(int pageno, bool writeXlog); static int ZeroMultiXactOffsetPage(int64 pageno, bool writeXlog);
static int ZeroMultiXactMemberPage(int pageno, bool writeXlog); static int ZeroMultiXactMemberPage(int64 pageno, bool writeXlog);
static bool MultiXactOffsetPagePrecedes(int page1, int page2); static bool MultiXactOffsetPagePrecedes(int64 page1, int64 page2);
static bool MultiXactMemberPagePrecedes(int page1, int page2); static bool MultiXactMemberPagePrecedes(int64 page1, int64 page2);
static bool MultiXactOffsetPrecedes(MultiXactOffset offset1, static bool MultiXactOffsetPrecedes(MultiXactOffset offset1,
MultiXactOffset offset2); MultiXactOffset offset2);
static void ExtendMultiXactOffset(MultiXactId multi); static void ExtendMultiXactOffset(MultiXactId multi);
@ -366,7 +366,7 @@ static bool MultiXactOffsetWouldWrap(MultiXactOffset boundary,
MultiXactOffset start, uint32 distance); MultiXactOffset start, uint32 distance);
static bool SetOffsetVacuumLimit(bool is_startup); static bool SetOffsetVacuumLimit(bool is_startup);
static bool find_multixact_start(MultiXactId multi, MultiXactOffset *result); static bool find_multixact_start(MultiXactId multi, MultiXactOffset *result);
static void WriteMZeroPageXlogRec(int pageno, uint8 info); static void WriteMZeroPageXlogRec(int64 pageno, uint8 info);
static void WriteMTruncateXlogRec(Oid oldestMultiDB, static void WriteMTruncateXlogRec(Oid oldestMultiDB,
MultiXactId startTruncOff, MultiXactId startTruncOff,
MultiXactId endTruncOff, MultiXactId endTruncOff,
@ -864,8 +864,8 @@ static void
RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
int nmembers, MultiXactMember *members) int nmembers, MultiXactMember *members)
{ {
int pageno; int64 pageno;
int prev_pageno; int64 prev_pageno;
int entryno; int entryno;
int slotno; int slotno;
MultiXactOffset *offptr; MultiXactOffset *offptr;
@ -1225,8 +1225,8 @@ int
GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
bool from_pgupgrade, bool isLockOnly) bool from_pgupgrade, bool isLockOnly)
{ {
int pageno; int64 pageno;
int prev_pageno; int64 prev_pageno;
int entryno; int entryno;
int slotno; int slotno;
MultiXactOffset *offptr; MultiXactOffset *offptr;
@ -1854,13 +1854,15 @@ MultiXactShmemInit(void)
"MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0, "MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0,
MultiXactOffsetSLRULock, "pg_multixact/offsets", MultiXactOffsetSLRULock, "pg_multixact/offsets",
LWTRANCHE_MULTIXACTOFFSET_BUFFER, LWTRANCHE_MULTIXACTOFFSET_BUFFER,
SYNC_HANDLER_MULTIXACT_OFFSET); SYNC_HANDLER_MULTIXACT_OFFSET,
false);
SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE); SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE);
SimpleLruInit(MultiXactMemberCtl, SimpleLruInit(MultiXactMemberCtl,
"MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0, "MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0,
MultiXactMemberSLRULock, "pg_multixact/members", MultiXactMemberSLRULock, "pg_multixact/members",
LWTRANCHE_MULTIXACTMEMBER_BUFFER, LWTRANCHE_MULTIXACTMEMBER_BUFFER,
SYNC_HANDLER_MULTIXACT_MEMBER); SYNC_HANDLER_MULTIXACT_MEMBER,
false);
/* doesn't call SimpleLruTruncate() or meet criteria for unit tests */ /* doesn't call SimpleLruTruncate() or meet criteria for unit tests */
/* Initialize our shared state struct */ /* Initialize our shared state struct */
@ -1928,7 +1930,7 @@ BootStrapMultiXact(void)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
ZeroMultiXactOffsetPage(int pageno, bool writeXlog) ZeroMultiXactOffsetPage(int64 pageno, bool writeXlog)
{ {
int slotno; int slotno;
@ -1944,7 +1946,7 @@ ZeroMultiXactOffsetPage(int pageno, bool writeXlog)
* Ditto, for MultiXactMember * Ditto, for MultiXactMember
*/ */
static int static int
ZeroMultiXactMemberPage(int pageno, bool writeXlog) ZeroMultiXactMemberPage(int64 pageno, bool writeXlog)
{ {
int slotno; int slotno;
@ -1974,7 +1976,7 @@ ZeroMultiXactMemberPage(int pageno, bool writeXlog)
static void static void
MaybeExtendOffsetSlru(void) MaybeExtendOffsetSlru(void)
{ {
int pageno; int64 pageno;
pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact); pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
@ -2009,7 +2011,7 @@ StartupMultiXact(void)
{ {
MultiXactId multi = MultiXactState->nextMXact; MultiXactId multi = MultiXactState->nextMXact;
MultiXactOffset offset = MultiXactState->nextOffset; MultiXactOffset offset = MultiXactState->nextOffset;
int pageno; int64 pageno;
/* /*
* Initialize offset's idea of the latest page number. * Initialize offset's idea of the latest page number.
@ -2034,7 +2036,7 @@ TrimMultiXact(void)
MultiXactOffset offset; MultiXactOffset offset;
MultiXactId oldestMXact; MultiXactId oldestMXact;
Oid oldestMXactDB; Oid oldestMXactDB;
int pageno; int64 pageno;
int entryno; int entryno;
int flagsoff; int flagsoff;
@ -2403,7 +2405,7 @@ MultiXactAdvanceOldest(MultiXactId oldestMulti, Oid oldestMultiDB)
static void static void
ExtendMultiXactOffset(MultiXactId multi) ExtendMultiXactOffset(MultiXactId multi)
{ {
int pageno; int64 pageno;
/* /*
* No work except at first MultiXactId of a page. But beware: just after * No work except at first MultiXactId of a page. But beware: just after
@ -2452,7 +2454,7 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
flagsbit = MXOffsetToFlagsBitShift(offset); flagsbit = MXOffsetToFlagsBitShift(offset);
if (flagsoff == 0 && flagsbit == 0) if (flagsoff == 0 && flagsbit == 0)
{ {
int pageno; int64 pageno;
pageno = MXOffsetToMemberPage(offset); pageno = MXOffsetToMemberPage(offset);
@ -2735,7 +2737,7 @@ static bool
find_multixact_start(MultiXactId multi, MultiXactOffset *result) find_multixact_start(MultiXactId multi, MultiXactOffset *result)
{ {
MultiXactOffset offset; MultiXactOffset offset;
int pageno; int64 pageno;
int entryno; int entryno;
int slotno; int slotno;
MultiXactOffset *offptr; MultiXactOffset *offptr;
@ -2854,7 +2856,7 @@ MultiXactMemberFreezeThreshold(void)
typedef struct mxtruncinfo typedef struct mxtruncinfo
{ {
int earliestExistingPage; int64 earliestExistingPage;
} mxtruncinfo; } mxtruncinfo;
/* /*
@ -2862,7 +2864,7 @@ typedef struct mxtruncinfo
* This callback determines the earliest existing page number. * This callback determines the earliest existing page number.
*/ */
static bool static bool
SlruScanDirCbFindEarliest(SlruCtl ctl, char *filename, int segpage, void *data) SlruScanDirCbFindEarliest(SlruCtl ctl, char *filename, int64 segpage, void *data)
{ {
mxtruncinfo *trunc = (mxtruncinfo *) data; mxtruncinfo *trunc = (mxtruncinfo *) data;
@ -3113,7 +3115,7 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid newOldestMultiDB)
* translational symmetry. * translational symmetry.
*/ */
static bool static bool
MultiXactOffsetPagePrecedes(int page1, int page2) MultiXactOffsetPagePrecedes(int64 page1, int64 page2)
{ {
MultiXactId multi1; MultiXactId multi1;
MultiXactId multi2; MultiXactId multi2;
@ -3133,7 +3135,7 @@ MultiXactOffsetPagePrecedes(int page1, int page2)
* purposes. There is no "invalid offset number" so use the numbers verbatim. * purposes. There is no "invalid offset number" so use the numbers verbatim.
*/ */
static bool static bool
MultiXactMemberPagePrecedes(int page1, int page2) MultiXactMemberPagePrecedes(int64 page1, int64 page2)
{ {
MultiXactOffset offset1; MultiXactOffset offset1;
MultiXactOffset offset2; MultiXactOffset offset2;
@ -3191,10 +3193,10 @@ MultiXactOffsetPrecedes(MultiXactOffset offset1, MultiXactOffset offset2)
* OFFSETs page (info shows which) * OFFSETs page (info shows which)
*/ */
static void static void
WriteMZeroPageXlogRec(int pageno, uint8 info) WriteMZeroPageXlogRec(int64 pageno, uint8 info)
{ {
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) (&pageno), sizeof(int)); XLogRegisterData((char *) (&pageno), sizeof(pageno));
(void) XLogInsert(RM_MULTIXACT_ID, info); (void) XLogInsert(RM_MULTIXACT_ID, info);
} }
@ -3239,10 +3241,10 @@ multixact_redo(XLogReaderState *record)
if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE) if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE)
{ {
int pageno; int64 pageno;
int slotno; int slotno;
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
@ -3254,10 +3256,10 @@ multixact_redo(XLogReaderState *record)
} }
else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE) else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
{ {
int pageno; int64 pageno;
int slotno; int slotno;
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
@ -3299,7 +3301,7 @@ multixact_redo(XLogReaderState *record)
else if (info == XLOG_MULTIXACT_TRUNCATE_ID) else if (info == XLOG_MULTIXACT_TRUNCATE_ID)
{ {
xl_multixact_truncate xlrec; xl_multixact_truncate xlrec;
int pageno; int64 pageno;
memcpy(&xlrec, XLogRecGetData(record), memcpy(&xlrec, XLogRecGetData(record),
SizeOfMultiXactTruncate); SizeOfMultiXactTruncate);

View File

@ -60,8 +60,33 @@
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#define SlruFileName(ctl, path, seg) \ static int inline
snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg) SlruFileName(SlruCtl ctl, char *path, int64 segno)
{
if (ctl->long_segment_names)
{
/*
* We could use 16 characters here but the disadvantage would be that
* the SLRU segments will be hard to distinguish from WAL segments.
*
* For this reason we use 15 characters. It is enough but also means
* that in the future we can't decrease SLRU_PAGES_PER_SEGMENT easily.
*/
Assert(segno >= 0 && segno <= INT64CONST(0xFFFFFFFFFFFFFFF));
return snprintf(path, MAXPGPATH, "%s/%015llX", ctl->Dir,
(long long) segno);
}
else
{
/*
* Despite the fact that %04X format string is used up to 24 bit
* integers are allowed. See SlruCorrectSegmentFilenameLength()
*/
Assert(segno >= 0 && segno <= INT64CONST(0xFFFFFF));
return snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir,
(unsigned int) segno);
}
}
/* /*
* During SimpleLruWriteAll(), we will usually not need to write more than one * During SimpleLruWriteAll(), we will usually not need to write more than one
@ -75,7 +100,7 @@ typedef struct SlruWriteAllData
{ {
int num_files; /* # files actually open */ int num_files; /* # files actually open */
int fd[MAX_WRITEALL_BUFFERS]; /* their FD's */ int fd[MAX_WRITEALL_BUFFERS]; /* their FD's */
int segno[MAX_WRITEALL_BUFFERS]; /* their log seg#s */ int64 segno[MAX_WRITEALL_BUFFERS]; /* their log seg#s */
} SlruWriteAllData; } SlruWriteAllData;
typedef struct SlruWriteAllData *SlruWriteAll; typedef struct SlruWriteAllData *SlruWriteAll;
@ -138,15 +163,16 @@ static int slru_errno;
static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
static void SimpleLruWaitIO(SlruCtl ctl, int slotno); static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata); static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata);
static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno); static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno,
SlruWriteAll fdata); SlruWriteAll fdata);
static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid); static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int pageno); static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
int segpage, void *data); int64 segpage, void *data);
static void SlruInternalDeleteSegment(SlruCtl ctl, int segno); static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
/* /*
* Initialization of shared memory * Initialization of shared memory
@ -162,7 +188,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */ sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */
sz += MAXALIGN(nslots * sizeof(SlruPageStatus)); /* page_status[] */ sz += MAXALIGN(nslots * sizeof(SlruPageStatus)); /* page_status[] */
sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */ sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int64)); /* page_number[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
@ -187,7 +213,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
void void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir, int tranche_id, LWLock *ctllock, const char *subdir, int tranche_id,
SyncRequestHandler sync_handler) SyncRequestHandler sync_handler, bool long_segment_names)
{ {
SlruShared shared; SlruShared shared;
bool found; bool found;
@ -226,8 +252,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
offset += MAXALIGN(nslots * sizeof(SlruPageStatus)); offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
shared->page_dirty = (bool *) (ptr + offset); shared->page_dirty = (bool *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(bool)); offset += MAXALIGN(nslots * sizeof(bool));
shared->page_number = (int *) (ptr + offset); shared->page_number = (int64 *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int64));
shared->page_lru_count = (int *) (ptr + offset); shared->page_lru_count = (int *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(int)); offset += MAXALIGN(nslots * sizeof(int));
@ -266,6 +292,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
*/ */
ctl->shared = shared; ctl->shared = shared;
ctl->sync_handler = sync_handler; ctl->sync_handler = sync_handler;
ctl->long_segment_names = long_segment_names;
strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir)); strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir));
} }
@ -278,7 +305,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
int int
SimpleLruZeroPage(SlruCtl ctl, int pageno) SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int slotno; int slotno;
@ -393,7 +420,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
int int
SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
TransactionId xid) TransactionId xid)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
@ -493,7 +520,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
* It is unspecified whether the lock will be shared or exclusive. * It is unspecified whether the lock will be shared or exclusive.
*/ */
int int
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int slotno; int slotno;
@ -540,7 +567,7 @@ static void
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int pageno = shared->page_number[slotno]; int64 pageno = shared->page_number[slotno];
bool ok; bool ok;
/* If a write is in progress, wait for it to finish */ /* If a write is in progress, wait for it to finish */
@ -624,9 +651,9 @@ SimpleLruWritePage(SlruCtl ctl, int slotno)
* large enough to contain the given page. * large enough to contain the given page.
*/ */
bool bool
SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno) SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
{ {
int segno = pageno / SLRU_PAGES_PER_SEGMENT; int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT; int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ; int offset = rpageno * BLCKSZ;
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -682,10 +709,10 @@ SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
* read/write operations. We could cache one virtual file pointer ... * read/write operations. We could cache one virtual file pointer ...
*/ */
static bool static bool
SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno) SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT; int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT; int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
off_t offset = rpageno * BLCKSZ; off_t offset = rpageno * BLCKSZ;
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -754,10 +781,10 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
* SimpleLruWriteAll. * SimpleLruWriteAll.
*/ */
static bool static bool
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata) SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT; int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT; int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
off_t offset = rpageno * BLCKSZ; off_t offset = rpageno * BLCKSZ;
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -929,9 +956,9 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruWriteAll fdata)
* SlruPhysicalWritePage. Call this after cleaning up shared-memory state. * SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
*/ */
static void static void
SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid) SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
{ {
int segno = pageno / SLRU_PAGES_PER_SEGMENT; int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT; int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ; int offset = rpageno * BLCKSZ;
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -1014,7 +1041,7 @@ SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
SlruSelectLRUPage(SlruCtl ctl, int pageno) SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
@ -1025,10 +1052,10 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
int cur_count; int cur_count;
int bestvalidslot = 0; /* keep compiler quiet */ int bestvalidslot = 0; /* keep compiler quiet */
int best_valid_delta = -1; int best_valid_delta = -1;
int best_valid_page_number = 0; /* keep compiler quiet */ int64 best_valid_page_number = 0; /* keep compiler quiet */
int bestinvalidslot = 0; /* keep compiler quiet */ int bestinvalidslot = 0; /* keep compiler quiet */
int best_invalid_delta = -1; int best_invalid_delta = -1;
int best_invalid_page_number = 0; /* keep compiler quiet */ int64 best_invalid_page_number = 0; /* keep compiler quiet */
/* See if page already has a buffer assigned */ /* See if page already has a buffer assigned */
for (slotno = 0; slotno < shared->num_slots; slotno++) for (slotno = 0; slotno < shared->num_slots; slotno++)
@ -1069,7 +1096,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
for (slotno = 0; slotno < shared->num_slots; slotno++) for (slotno = 0; slotno < shared->num_slots; slotno++)
{ {
int this_delta; int this_delta;
int this_page_number; int64 this_page_number;
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
return slotno; return slotno;
@ -1159,7 +1186,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
SlruWriteAllData fdata; SlruWriteAllData fdata;
int slotno; int slotno;
int pageno = 0; int64 pageno = 0;
int i; int i;
bool ok; bool ok;
@ -1224,7 +1251,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
* after it has accrued freshly-written data. * after it has accrued freshly-written data.
*/ */
void void
SimpleLruTruncate(SlruCtl ctl, int cutoffPage) SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int slotno; int slotno;
@ -1302,7 +1329,7 @@ restart:
* they either can't yet contain anything, or have already been cleaned out. * they either can't yet contain anything, or have already been cleaned out.
*/ */
static void static void
SlruInternalDeleteSegment(SlruCtl ctl, int segno) SlruInternalDeleteSegment(SlruCtl ctl, int64 segno)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -1325,7 +1352,7 @@ SlruInternalDeleteSegment(SlruCtl ctl, int segno)
* Delete an individual SLRU segment, identified by the segment number. * Delete an individual SLRU segment, identified by the segment number.
*/ */
void void
SlruDeleteSegment(SlruCtl ctl, int segno) SlruDeleteSegment(SlruCtl ctl, int64 segno)
{ {
SlruShared shared = ctl->shared; SlruShared shared = ctl->shared;
int slotno; int slotno;
@ -1389,9 +1416,9 @@ restart:
* first>=cutoff && last>=cutoff: no; every page of this segment is too young * first>=cutoff && last>=cutoff: no; every page of this segment is too young
*/ */
static bool static bool
SlruMayDeleteSegment(SlruCtl ctl, int segpage, int cutoffPage) SlruMayDeleteSegment(SlruCtl ctl, int64 segpage, int64 cutoffPage)
{ {
int seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1; int64 seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1;
Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0); Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
@ -1405,7 +1432,7 @@ SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
{ {
TransactionId lhs, TransactionId lhs,
rhs; rhs;
int newestPage, int64 newestPage,
oldestPage; oldestPage;
TransactionId newestXact, TransactionId newestXact,
oldestXact; oldestXact;
@ -1498,9 +1525,10 @@ SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
* one containing the page passed as "data". * one containing the page passed as "data".
*/ */
bool bool
SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data) SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage,
void *data)
{ {
int cutoffPage = *(int *) data; int64 cutoffPage = *(int64 *) data;
if (SlruMayDeleteSegment(ctl, segpage, cutoffPage)) if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
return true; /* found one; don't iterate any more */ return true; /* found one; don't iterate any more */
@ -1513,9 +1541,10 @@ SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data
* This callback deletes segments prior to the one passed in as "data". * This callback deletes segments prior to the one passed in as "data".
*/ */
static bool static bool
SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data) SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int64 segpage,
void *data)
{ {
int cutoffPage = *(int *) data; int64 cutoffPage = *(int64 *) data;
if (SlruMayDeleteSegment(ctl, segpage, cutoffPage)) if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT); SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT);
@ -1528,13 +1557,37 @@ SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
* This callback deletes all segments. * This callback deletes all segments.
*/ */
bool bool
SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data) SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
{ {
SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT); SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT);
return false; /* keep going */ return false; /* keep going */
} }
/*
* An internal function used by SlruScanDirectory().
*
* Returns true if a file with a name of a given length may be a correct
* SLRU segment.
*/
static inline bool
SlruCorrectSegmentFilenameLength(SlruCtl ctl, size_t len)
{
if (ctl->long_segment_names)
return (len == 15); /* see SlruFileName() */
else
/*
* Commit 638cf09e76d allowed 5-character lengths. Later commit
* 73c986adde5 allowed 6-character length.
*
* Note: There is an ongoing plan to migrate all SLRUs to 64-bit page
* numbers, and the corresponding 15-character file names, which may
* eventually deprecate the support for 4, 5, and 6-character names.
*/
return (len == 4 || len == 5 || len == 6);
}
/* /*
* Scan the SimpleLru directory and apply a callback to each file found in it. * Scan the SimpleLru directory and apply a callback to each file found in it.
* *
@ -1556,8 +1609,8 @@ SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool retval = false; bool retval = false;
DIR *cldir; DIR *cldir;
struct dirent *clde; struct dirent *clde;
int segno; int64 segno;
int segpage; int64 segpage;
cldir = AllocateDir(ctl->Dir); cldir = AllocateDir(ctl->Dir);
while ((clde = ReadDir(cldir, ctl->Dir)) != NULL) while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
@ -1566,10 +1619,10 @@ SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
len = strlen(clde->d_name); len = strlen(clde->d_name);
if ((len == 4 || len == 5 || len == 6) && if (SlruCorrectSegmentFilenameLength(ctl, len) &&
strspn(clde->d_name, "0123456789ABCDEF") == len) strspn(clde->d_name, "0123456789ABCDEF") == len)
{ {
segno = (int) strtol(clde->d_name, NULL, 16); segno = strtoi64(clde->d_name, NULL, 16);
segpage = segno * SLRU_PAGES_PER_SEGMENT; segpage = segno * SLRU_PAGES_PER_SEGMENT;
elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s", elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",

View File

@ -51,7 +51,16 @@
/* We need four bytes per xact */ /* We need four bytes per xact */
#define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId)) #define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId))
#define TransactionIdToPage(xid) ((xid) / (TransactionId) SUBTRANS_XACTS_PER_PAGE) /*
* Although we return an int64 the actual value can't currently exceed
* 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE.
*/
static inline int64
TransactionIdToPage(TransactionId xid)
{
return xid / (int64) SUBTRANS_XACTS_PER_PAGE;
}
#define TransactionIdToEntry(xid) ((xid) % (TransactionId) SUBTRANS_XACTS_PER_PAGE) #define TransactionIdToEntry(xid) ((xid) % (TransactionId) SUBTRANS_XACTS_PER_PAGE)
@ -63,8 +72,8 @@ static SlruCtlData SubTransCtlData;
#define SubTransCtl (&SubTransCtlData) #define SubTransCtl (&SubTransCtlData)
static int ZeroSUBTRANSPage(int pageno); static int ZeroSUBTRANSPage(int64 pageno);
static bool SubTransPagePrecedes(int page1, int page2); static bool SubTransPagePrecedes(int64 page1, int64 page2);
/* /*
@ -73,7 +82,7 @@ static bool SubTransPagePrecedes(int page1, int page2);
void void
SubTransSetParent(TransactionId xid, TransactionId parent) SubTransSetParent(TransactionId xid, TransactionId parent)
{ {
int pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
int entryno = TransactionIdToEntry(xid); int entryno = TransactionIdToEntry(xid);
int slotno; int slotno;
TransactionId *ptr; TransactionId *ptr;
@ -108,7 +117,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
TransactionId TransactionId
SubTransGetParent(TransactionId xid) SubTransGetParent(TransactionId xid)
{ {
int pageno = TransactionIdToPage(xid); int64 pageno = TransactionIdToPage(xid);
int entryno = TransactionIdToEntry(xid); int entryno = TransactionIdToEntry(xid);
int slotno; int slotno;
TransactionId *ptr; TransactionId *ptr;
@ -193,7 +202,8 @@ SUBTRANSShmemInit(void)
SubTransCtl->PagePrecedes = SubTransPagePrecedes; SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0, SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0,
SubtransSLRULock, "pg_subtrans", SubtransSLRULock, "pg_subtrans",
LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE); LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE,
false);
SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE); SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
} }
@ -233,7 +243,7 @@ BootStrapSUBTRANS(void)
* Control lock must be held at entry, and will be held at exit. * Control lock must be held at entry, and will be held at exit.
*/ */
static int static int
ZeroSUBTRANSPage(int pageno) ZeroSUBTRANSPage(int64 pageno)
{ {
return SimpleLruZeroPage(SubTransCtl, pageno); return SimpleLruZeroPage(SubTransCtl, pageno);
} }
@ -249,8 +259,8 @@ void
StartupSUBTRANS(TransactionId oldestActiveXID) StartupSUBTRANS(TransactionId oldestActiveXID)
{ {
FullTransactionId nextXid; FullTransactionId nextXid;
int startPage; int64 startPage;
int endPage; int64 endPage;
/* /*
* Since we don't expect pg_subtrans to be valid across crashes, we * Since we don't expect pg_subtrans to be valid across crashes, we
@ -307,7 +317,7 @@ CheckPointSUBTRANS(void)
void void
ExtendSUBTRANS(TransactionId newestXact) ExtendSUBTRANS(TransactionId newestXact)
{ {
int pageno; int64 pageno;
/* /*
* No work except at first XID of a page. But beware: just after * No work except at first XID of a page. But beware: just after
@ -337,7 +347,7 @@ ExtendSUBTRANS(TransactionId newestXact)
void void
TruncateSUBTRANS(TransactionId oldestXact) TruncateSUBTRANS(TransactionId oldestXact)
{ {
int cutoffPage; int64 cutoffPage;
/* /*
* The cutoff point is the start of the segment containing oldestXact. We * The cutoff point is the start of the segment containing oldestXact. We
@ -359,7 +369,7 @@ TruncateSUBTRANS(TransactionId oldestXact)
* Analogous to CLOGPagePrecedes(). * Analogous to CLOGPagePrecedes().
*/ */
static bool static bool
SubTransPagePrecedes(int page1, int page2) SubTransPagePrecedes(int64 page1, int64 page2)
{ {
TransactionId xid1; TransactionId xid1;
TransactionId xid2; TransactionId xid2;

View File

@ -942,8 +942,46 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
/* State file support */ /* State file support */
/************************************************************************/ /************************************************************************/
#define TwoPhaseFilePath(path, xid) \ /*
snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid) * Compute the FullTransactionId for the given TransactionId.
*
* The wrap logic is safe here because the span of active xids cannot exceed one
* epoch at any given time.
*/
static inline FullTransactionId
AdjustToFullTransactionId(TransactionId xid)
{
FullTransactionId nextFullXid;
TransactionId nextXid;
uint32 epoch;
Assert(TransactionIdIsValid(xid));
LWLockAcquire(XidGenLock, LW_SHARED);
nextFullXid = ShmemVariableCache->nextXid;
LWLockRelease(XidGenLock);
nextXid = XidFromFullTransactionId(nextFullXid);
epoch = EpochFromFullTransactionId(nextFullXid);
if (unlikely(xid > nextXid))
{
/* Wraparound occured, must be from a prev epoch. */
Assert(epoch > 0);
epoch--;
}
return FullTransactionIdFromEpochAndXid(epoch, xid);
}
static inline int
TwoPhaseFilePath(char *path, TransactionId xid)
{
FullTransactionId fxid = AdjustToFullTransactionId(xid);
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
EpochFromFullTransactionId(fxid),
XidFromFullTransactionId(fxid));
}
/* /*
* 2PC state file format: * 2PC state file format:
@ -1882,13 +1920,15 @@ restoreTwoPhaseData(void)
cldir = AllocateDir(TWOPHASE_DIR); cldir = AllocateDir(TWOPHASE_DIR);
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
{ {
if (strlen(clde->d_name) == 8 && if (strlen(clde->d_name) == 16 &&
strspn(clde->d_name, "0123456789ABCDEF") == 8) strspn(clde->d_name, "0123456789ABCDEF") == 16)
{ {
TransactionId xid; TransactionId xid;
FullTransactionId fxid;
char *buf; char *buf;
xid = (TransactionId) strtoul(clde->d_name, NULL, 16); fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
xid = XidFromFullTransactionId(fxid);
buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
true, false, false); true, false, false);

View File

@ -103,12 +103,11 @@
* until we reach either a notification from an uncommitted transaction or * until we reach either a notification from an uncommitted transaction or
* the head pointer's position. * the head pointer's position.
* *
* 6. To avoid SLRU wraparound and limit disk space consumption, the tail * 6. To limit disk space consumption, the tail pointer needs to be advanced
* pointer needs to be advanced so that old pages can be truncated. * so that old pages can be truncated. This is relatively expensive
* This is relatively expensive (notably, it requires an exclusive lock), * (notably, it requires an exclusive lock), so we don't want to do it
* so we don't want to do it often. We make sending backends do this work * often. We make sending backends do this work if they advanced the queue
* if they advanced the queue head into a new page, but only once every * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
* QUEUE_CLEANUP_DELAY pages.
* *
* An application that listens on the same channel it notifies will get * An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@ -120,7 +119,7 @@
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS) * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
* can be varied without affecting anything but performance. The maximum * can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined * amount of notification data that can be queued at one time is determined
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. * by max_notify_queue_pages GUC.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -196,7 +195,7 @@ typedef struct AsyncQueueEntry
*/ */
typedef struct QueuePosition typedef struct QueuePosition
{ {
int page; /* SLRU page number */ int64 page; /* SLRU page number */
int offset; /* byte offset within page */ int offset; /* byte offset within page */
} QueuePosition; } QueuePosition;
@ -312,23 +311,8 @@ static SlruCtlData NotifyCtlData;
#define NotifyCtl (&NotifyCtlData) #define NotifyCtl (&NotifyCtlData)
#define QUEUE_PAGESIZE BLCKSZ #define QUEUE_PAGESIZE BLCKSZ
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/* #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
* Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages
* which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
* We could use as many segments as SlruScanDirectory() allows, but this gives
* us so much space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
/* /*
* listenChannels identifies the channels we are actually listening to * listenChannels identifies the channels we are actually listening to
@ -439,12 +423,15 @@ static bool amRegisteredListener = false;
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false; static bool tryAdvanceTail = false;
/* GUC parameter */ /* GUC parameters */
bool Trace_notify = false; bool Trace_notify = false;
/* For 8 KB pages this gives 8 GB of disk space */
int max_notify_queue_pages = 1048576;
/* local function prototypes */ /* local function prototypes */
static int asyncQueuePageDiff(int p, int q); static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static bool asyncQueuePagePrecedes(int p, int q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel); static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg); static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void); static void Exec_ListenPreCommit(void);
@ -474,39 +461,23 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void); static void ClearPendingActionsAndNotifies(void);
/* /*
* Compute the difference between two queue page numbers (i.e., p - q), * Compute the difference between two queue page numbers.
* accounting for wraparound. * Previously this function accounted for a wraparound.
*/ */
static int static inline int64
asyncQueuePageDiff(int p, int q) asyncQueuePageDiff(int64 p, int64 q)
{ {
int diff; return p - q;
/*
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
* in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
diff = p - q;
if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
return diff;
} }
/* /*
* Is p < q, accounting for wraparound? * Determines whether p precedes q.
* * Previously this function accounted for a wraparound.
* Since asyncQueueIsFull() blocks creation of a page that could precede any
* extant page, we need not assess entries within a page.
*/ */
static bool static inline bool
asyncQueuePagePrecedes(int p, int q) asyncQueuePagePrecedes(int64 p, int64 q)
{ {
return asyncQueuePageDiff(p, q) < 0; return p < q;
} }
/* /*
@ -566,12 +537,13 @@ AsyncShmemInit(void)
} }
/* /*
* Set up SLRU management of the pg_notify data. * Set up SLRU management of the pg_notify data. Note that long segment
* names are used in order to avoid wraparound.
*/ */
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
SYNC_HANDLER_NONE); SYNC_HANDLER_NONE, true);
if (!found) if (!found)
{ {
@ -1305,27 +1277,11 @@ asyncQueueUnregister(void)
static bool static bool
asyncQueueIsFull(void) asyncQueueIsFull(void)
{ {
int nexthead; int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int boundary; int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied = headPage - tailPage;
/* return occupied >= max_notify_queue_pages;
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
* pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (truncation logic in
* asyncQueueAdvanceTail does not do this, so doing it here is optional).
*
* Note that this test is *not* dependent on how much space there is on
* the current head page. This is necessary because asyncQueueAddEntries
* might try to create the next head page in any case.
*/
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */
boundary = QUEUE_STOP_PAGE;
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedes(nexthead, boundary);
} }
/* /*
@ -1336,7 +1292,7 @@ asyncQueueIsFull(void)
static bool static bool
asyncQueueAdvance(volatile QueuePosition *position, int entryLength) asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
{ {
int pageno = QUEUE_POS_PAGE(*position); int64 pageno = QUEUE_POS_PAGE(*position);
int offset = QUEUE_POS_OFFSET(*position); int offset = QUEUE_POS_OFFSET(*position);
bool pageJump = false; bool pageJump = false;
@ -1355,8 +1311,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
{ {
pageno++; pageno++;
if (pageno > QUEUE_MAX_PAGE)
pageno = 0; /* wrap around */
offset = 0; offset = 0;
pageJump = true; pageJump = true;
} }
@ -1409,7 +1363,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
{ {
AsyncQueueEntry qe; AsyncQueueEntry qe;
QueuePosition queue_head; QueuePosition queue_head;
int pageno; int64 pageno;
int offset; int offset;
int slotno; int slotno;
@ -1433,9 +1387,6 @@ asyncQueueAddEntries(ListCell *nextNotify)
* If this is the first write since the postmaster started, we need to * If this is the first write since the postmaster started, we need to
* initialize the first page of the async SLRU. Otherwise, the current * initialize the first page of the async SLRU. Otherwise, the current
* page should be initialized already, so just fetch it. * page should be initialized already, so just fetch it.
*
* (We could also take the first path when the SLRU position has just
* wrapped around, but re-zeroing the page is harmless in that case.)
*/ */
pageno = QUEUE_POS_PAGE(queue_head); pageno = QUEUE_POS_PAGE(queue_head);
if (QUEUE_POS_IS_ZERO(queue_head)) if (QUEUE_POS_IS_ZERO(queue_head))
@ -1548,20 +1499,12 @@ asyncQueueUsage(void)
{ {
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied; int occupied = headPage - tailPage;
occupied = headPage - tailPage;
if (occupied == 0) if (occupied == 0)
return (double) 0; /* fast exit for common case */ return (double) 0; /* fast exit for common case */
if (occupied < 0) return (double) occupied / (double) max_notify_queue_pages;
{
/* head has wrapped around, tail not yet */
occupied += QUEUE_MAX_PAGE + 1;
}
return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
} }
/* /*
@ -2209,11 +2152,6 @@ asyncQueueAdvanceTail(void)
*/ */
SimpleLruTruncate(NotifyCtl, newtailpage); SimpleLruTruncate(NotifyCtl, newtailpage);
/*
* Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
* for the segment immediately prior to the old tail, allowing fresh
* data into that segment.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
QUEUE_STOP_PAGE = newtailpage; QUEUE_STOP_PAGE = newtailpage;
LWLockRelease(NotifyQueueLock); LWLockRelease(NotifyQueueLock);

View File

@ -437,7 +437,7 @@ static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT
static void ReleaseRWConflict(RWConflict conflict); static void ReleaseRWConflict(RWConflict conflict);
static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact); static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
static bool SerialPagePrecedesLogically(int page1, int page2); static bool SerialPagePrecedesLogically(int64 page1, int64 page2);
static void SerialInit(void); static void SerialInit(void);
static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo); static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid); static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
@ -724,7 +724,7 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
* Analogous to CLOGPagePrecedes(). * Analogous to CLOGPagePrecedes().
*/ */
static bool static bool
SerialPagePrecedesLogically(int page1, int page2) SerialPagePrecedesLogically(int64 page1, int64 page2)
{ {
TransactionId xid1; TransactionId xid1;
TransactionId xid2; TransactionId xid2;
@ -744,7 +744,7 @@ SerialPagePrecedesLogicallyUnitTests(void)
{ {
int per_page = SERIAL_ENTRIESPERPAGE, int per_page = SERIAL_ENTRIESPERPAGE,
offset = per_page / 2; offset = per_page / 2;
int newestPage, int64 newestPage,
oldestPage, oldestPage,
headPage, headPage,
targetPage; targetPage;
@ -809,7 +809,8 @@ SerialInit(void)
SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically; SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
SimpleLruInit(SerialSlruCtl, "Serial", SimpleLruInit(SerialSlruCtl, "Serial",
NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial", NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial",
LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE); LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE,
false);
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
SerialPagePrecedesLogicallyUnitTests(); SerialPagePrecedesLogicallyUnitTests();
#endif #endif
@ -842,9 +843,9 @@ static void
SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
{ {
TransactionId tailXid; TransactionId tailXid;
int targetPage; int64 targetPage;
int slotno; int slotno;
int firstZeroPage; int64 firstZeroPage;
bool isNewPage; bool isNewPage;
Assert(TransactionIdIsValid(xid)); Assert(TransactionIdIsValid(xid));

View File

@ -2687,6 +2687,16 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"max_notify_queue_pages", PGC_POSTMASTER, RESOURCES_DISK,
gettext_noop("Sets the maximum number of allocated pages for NOTIFY / LISTEN queue."),
NULL,
},
&max_notify_queue_pages,
1048576, 64, INT_MAX,
NULL, NULL, NULL
},
{ {
{"wal_decode_buffer_size", PGC_POSTMASTER, WAL_RECOVERY, {"wal_decode_buffer_size", PGC_POSTMASTER, WAL_RECOVERY,
gettext_noop("Buffer size for reading ahead in the WAL during recovery."), gettext_noop("Buffer size for reading ahead in the WAL during recovery."),

View File

@ -166,6 +166,9 @@
#temp_file_limit = -1 # limits per-process temp file space #temp_file_limit = -1 # limits per-process temp file space
# in kilobytes, or -1 for no limit # in kilobytes, or -1 for no limit
#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated
# for NOTIFY / LISTEN queue
# - Kernel Resources - # - Kernel Resources -
#max_files_per_process = 1000 # min 64 #max_files_per_process = 1000 # min 64

View File

@ -31,7 +31,7 @@ typedef int XidStatus;
typedef struct xl_clog_truncate typedef struct xl_clog_truncate
{ {
int pageno; int64 pageno;
TransactionId oldestXact; TransactionId oldestXact;
Oid oldestXactDb; Oid oldestXactDb;
} xl_clog_truncate; } xl_clog_truncate;

View File

@ -60,7 +60,7 @@ typedef struct xl_commit_ts_set
typedef struct xl_commit_ts_truncate typedef struct xl_commit_ts_truncate
{ {
int pageno; int64 pageno;
TransactionId oldestXid; TransactionId oldestXid;
} xl_commit_ts_truncate; } xl_commit_ts_truncate;

View File

@ -64,7 +64,7 @@ typedef struct SlruSharedData
char **page_buffer; char **page_buffer;
SlruPageStatus *page_status; SlruPageStatus *page_status;
bool *page_dirty; bool *page_dirty;
int *page_number; int64 *page_number;
int *page_lru_count; int *page_lru_count;
LWLockPadded *buffer_locks; LWLockPadded *buffer_locks;
@ -95,7 +95,7 @@ typedef struct SlruSharedData
* this is not critical data, since we use it only to avoid swapping out * this is not critical data, since we use it only to avoid swapping out
* the latest page. * the latest page.
*/ */
int latest_page_number; int64 latest_page_number;
/* SLRU's index for statistics purposes (might not be unique) */ /* SLRU's index for statistics purposes (might not be unique) */
int slru_stats_idx; int slru_stats_idx;
@ -127,7 +127,15 @@ typedef struct SlruCtlData
* the behavior of this callback has no functional implications.) Use * the behavior of this callback has no functional implications.) Use
* SlruPagePrecedesUnitTests() in SLRUs meeting its criteria. * SlruPagePrecedesUnitTests() in SLRUs meeting its criteria.
*/ */
bool (*PagePrecedes) (int, int); bool (*PagePrecedes) (int64, int64);
/*
* If true, use long segment filenames formed from lower 48 bits of the
* segment number, e.g. pg_xact/000000001234. Otherwise, use short
* filenames formed from lower 16 bits of the segment number e.g.
* pg_xact/1234.
*/
bool long_segment_names;
/* /*
* Dir is set during SimpleLruInit and does not change thereafter. Since * Dir is set during SimpleLruInit and does not change thereafter. Since
@ -142,11 +150,12 @@ typedef SlruCtlData *SlruCtl;
extern Size SimpleLruShmemSize(int nslots, int nlsns); extern Size SimpleLruShmemSize(int nslots, int nlsns);
extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir, int tranche_id, LWLock *ctllock, const char *subdir, int tranche_id,
SyncRequestHandler sync_handler); SyncRequestHandler sync_handler,
extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); bool long_segment_names);
extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, extern int SimpleLruZeroPage(SlruCtl ctl, int64 pageno);
extern int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
TransactionId xid); TransactionId xid);
extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno,
TransactionId xid); TransactionId xid);
extern void SimpleLruWritePage(SlruCtl ctl, int slotno); extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied); extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied);
@ -155,20 +164,20 @@ extern void SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page);
#else #else
#define SlruPagePrecedesUnitTests(ctl, per_page) do {} while (0) #define SlruPagePrecedesUnitTests(ctl, per_page) do {} while (0)
#endif #endif
extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage); extern void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage);
extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno); extern bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno);
typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int segpage, typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int64 segpage,
void *data); void *data);
extern bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data); extern bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data);
extern void SlruDeleteSegment(SlruCtl ctl, int segno); extern void SlruDeleteSegment(SlruCtl ctl, int64 segno);
extern int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path); extern int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path);
/* SlruScanDirectory public callbacks */ /* SlruScanDirectory public callbacks */
extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
int segpage, void *data); int64 segpage, void *data);
extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage,
void *data); void *data);
#endif /* SLRU_H */ #endif /* SLRU_H */

View File

@ -21,6 +21,7 @@
#define NUM_NOTIFY_BUFFERS 8 #define NUM_NOTIFY_BUFFERS 8
extern PGDLLIMPORT bool Trace_notify; extern PGDLLIMPORT bool Trace_notify;
extern PGDLLIMPORT int max_notify_queue_pages;
extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
extern Size AsyncShmemSize(void); extern Size AsyncShmemSize(void);

View File

@ -281,7 +281,7 @@ struct PGPROC
TransactionId clogGroupMemberXid; /* transaction id of clog group member */ TransactionId clogGroupMemberXid; /* transaction id of clog group member */
XidStatus clogGroupMemberXidStatus; /* transaction status of clog XidStatus clogGroupMemberXidStatus; /* transaction status of clog
* group member */ * group member */
int clogGroupMemberPage; /* clog page corresponding to int64 clogGroupMemberPage; /* clog page corresponding to
* transaction id of clog group member */ * transaction id of clog group member */
XLogRecPtr clogGroupMemberLsn; /* WAL location of commit record for clog XLogRecPtr clogGroupMemberLsn; /* WAL location of commit record for clog
* group member */ * group member */

View File

@ -52,7 +52,7 @@ typedef struct FileTag
int16 handler; /* SyncRequestHandler value, saving space */ int16 handler; /* SyncRequestHandler value, saving space */
int16 forknum; /* ForkNumber, saving space */ int16 forknum; /* ForkNumber, saving space */
RelFileLocator rlocator; RelFileLocator rlocator;
uint32 segno; uint64 segno;
} FileTag; } FileTag;
extern void InitSync(void); extern void InitSync(void);

View File

@ -61,7 +61,7 @@ SELECT test_slru_page_writeall();
-- Flush the last page written out. -- Flush the last page written out.
SELECT test_slru_page_sync(12393); SELECT test_slru_page_sync(12393);
NOTICE: Called SlruSyncFileTag() for segment 387 on path pg_test_slru/0183 NOTICE: Called SlruSyncFileTag() for segment 387 on path pg_test_slru/000000000000183
test_slru_page_sync test_slru_page_sync
--------------------- ---------------------
@ -132,4 +132,139 @@ SELECT test_slru_page_exists(12393);
f f
(1 row) (1 row)
--
-- Test 64-bit pages
--
SELECT test_slru_page_exists(0x1234500000000);
test_slru_page_exists
-----------------------
f
(1 row)
SELECT test_slru_page_write(0x1234500000000, 'Test SLRU 64-bit');
test_slru_page_write
----------------------
(1 row)
SELECT test_slru_page_read(0x1234500000000);
test_slru_page_read
---------------------
Test SLRU 64-bit
(1 row)
SELECT test_slru_page_exists(0x1234500000000);
test_slru_page_exists
-----------------------
t
(1 row)
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU 64-bit'))
FROM generate_series(0x1234500000001, 0x1234500000030, 1) as a;
count
-------
48
(1 row)
-- Reading page in buffer for read and write
SELECT test_slru_page_read(0x1234500000020, true);
test_slru_page_read
---------------------
Test SLRU 64-bit
(1 row)
-- Reading page in buffer for read-only
SELECT test_slru_page_readonly(0x1234500000020);
test_slru_page_readonly
-------------------------
Test SLRU 64-bit
(1 row)
-- Reading page not in buffer with read-only
SELECT test_slru_page_readonly(0x1234500000001);
test_slru_page_readonly
-------------------------
Test SLRU 64-bit
(1 row)
-- Write all the pages in buffers
SELECT test_slru_page_writeall();
test_slru_page_writeall
-------------------------
(1 row)
-- Flush the last page written out.
SELECT test_slru_page_sync(0x1234500000030);
NOTICE: Called SlruSyncFileTag() for segment 10007944888321 on path pg_test_slru/000091A28000001
test_slru_page_sync
---------------------
(1 row)
SELECT test_slru_page_exists(0x1234500000030);
test_slru_page_exists
-----------------------
t
(1 row)
-- Segment deletion
SELECT test_slru_page_delete(0x1234500000030);
NOTICE: Called SlruDeleteSegment() for segment 10007944888321
test_slru_page_delete
-----------------------
(1 row)
SELECT test_slru_page_exists(0x1234500000030);
test_slru_page_exists
-----------------------
f
(1 row)
-- Page truncation
SELECT test_slru_page_exists(0x1234500000020);
test_slru_page_exists
-----------------------
f
(1 row)
SELECT test_slru_page_truncate(0x1234500000020);
test_slru_page_truncate
-------------------------
(1 row)
SELECT test_slru_page_exists(0x1234500000020);
test_slru_page_exists
-----------------------
f
(1 row)
-- Full deletion
SELECT test_slru_delete_all();
test_slru_delete_all
----------------------
(1 row)
SELECT test_slru_page_exists(0x1234500000000);
test_slru_page_exists
-----------------------
f
(1 row)
SELECT test_slru_page_exists(0x1234500000020);
test_slru_page_exists
-----------------------
f
(1 row)
SELECT test_slru_page_exists(0x1234500000030);
test_slru_page_exists
-----------------------
f
(1 row)
DROP EXTENSION test_slru; DROP EXTENSION test_slru;

View File

@ -35,4 +35,42 @@ SELECT test_slru_page_exists(12345);
SELECT test_slru_page_exists(12377); SELECT test_slru_page_exists(12377);
SELECT test_slru_page_exists(12393); SELECT test_slru_page_exists(12393);
--
-- Test 64-bit pages
--
SELECT test_slru_page_exists(0x1234500000000);
SELECT test_slru_page_write(0x1234500000000, 'Test SLRU 64-bit');
SELECT test_slru_page_read(0x1234500000000);
SELECT test_slru_page_exists(0x1234500000000);
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU 64-bit'))
FROM generate_series(0x1234500000001, 0x1234500000030, 1) as a;
-- Reading page in buffer for read and write
SELECT test_slru_page_read(0x1234500000020, true);
-- Reading page in buffer for read-only
SELECT test_slru_page_readonly(0x1234500000020);
-- Reading page not in buffer with read-only
SELECT test_slru_page_readonly(0x1234500000001);
-- Write all the pages in buffers
SELECT test_slru_page_writeall();
-- Flush the last page written out.
SELECT test_slru_page_sync(0x1234500000030);
SELECT test_slru_page_exists(0x1234500000030);
-- Segment deletion
SELECT test_slru_page_delete(0x1234500000030);
SELECT test_slru_page_exists(0x1234500000030);
-- Page truncation
SELECT test_slru_page_exists(0x1234500000020);
SELECT test_slru_page_truncate(0x1234500000020);
SELECT test_slru_page_exists(0x1234500000020);
-- Full deletion
SELECT test_slru_delete_all();
SELECT test_slru_page_exists(0x1234500000000);
SELECT test_slru_page_exists(0x1234500000020);
SELECT test_slru_page_exists(0x1234500000030);
DROP EXTENSION test_slru; DROP EXTENSION test_slru;

View File

@ -1,21 +1,21 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION -- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_slru" to load this file. \quit \echo Use "CREATE EXTENSION test_slru" to load this file. \quit
CREATE OR REPLACE FUNCTION test_slru_page_write(int, text) RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_page_write(bigint, text) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_write' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_write' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_writeall() RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_page_writeall() RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_writeall' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_writeall' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_sync(int) RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_page_sync(bigint) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_sync' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_sync' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_read(int, bool DEFAULT true) RETURNS text CREATE OR REPLACE FUNCTION test_slru_page_read(bigint, bool DEFAULT true) RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_read' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_read' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_readonly(int) RETURNS text CREATE OR REPLACE FUNCTION test_slru_page_readonly(bigint) RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_readonly' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_readonly' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_exists(int) RETURNS bool CREATE OR REPLACE FUNCTION test_slru_page_exists(bigint) RETURNS bool
AS 'MODULE_PATHNAME', 'test_slru_page_exists' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_exists' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_delete(int) RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_page_delete(bigint) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_delete' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_delete' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_truncate(int) RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_page_truncate(bigint) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_truncate' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_page_truncate' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_delete_all() RETURNS VOID CREATE OR REPLACE FUNCTION test_slru_delete_all() RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_delete_all' LANGUAGE C; AS 'MODULE_PATHNAME', 'test_slru_delete_all' LANGUAGE C;

View File

@ -51,7 +51,7 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool static bool
test_slru_scan_cb(SlruCtl ctl, char *filename, int segpage, void *data) test_slru_scan_cb(SlruCtl ctl, char *filename, int64 segpage, void *data)
{ {
elog(NOTICE, "Calling test_slru_scan_cb()"); elog(NOTICE, "Calling test_slru_scan_cb()");
return SlruScanDirCbDeleteAll(ctl, filename, segpage, data); return SlruScanDirCbDeleteAll(ctl, filename, segpage, data);
@ -60,7 +60,7 @@ test_slru_scan_cb(SlruCtl ctl, char *filename, int segpage, void *data)
Datum Datum
test_slru_page_write(PG_FUNCTION_ARGS) test_slru_page_write(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
char *data = text_to_cstring(PG_GETARG_TEXT_PP(1)); char *data = text_to_cstring(PG_GETARG_TEXT_PP(1));
int slotno; int slotno;
@ -95,7 +95,7 @@ test_slru_page_writeall(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_read(PG_FUNCTION_ARGS) test_slru_page_read(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
bool write_ok = PG_GETARG_BOOL(1); bool write_ok = PG_GETARG_BOOL(1);
char *data = NULL; char *data = NULL;
int slotno; int slotno;
@ -113,7 +113,7 @@ test_slru_page_read(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_readonly(PG_FUNCTION_ARGS) test_slru_page_readonly(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
char *data = NULL; char *data = NULL;
int slotno; int slotno;
@ -131,7 +131,7 @@ test_slru_page_readonly(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_exists(PG_FUNCTION_ARGS) test_slru_page_exists(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
bool found; bool found;
LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
@ -144,7 +144,7 @@ test_slru_page_exists(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_sync(PG_FUNCTION_ARGS) test_slru_page_sync(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
FileTag ftag; FileTag ftag;
char path[MAXPGPATH]; char path[MAXPGPATH];
@ -152,8 +152,8 @@ test_slru_page_sync(PG_FUNCTION_ARGS)
ftag.segno = pageno / SLRU_PAGES_PER_SEGMENT; ftag.segno = pageno / SLRU_PAGES_PER_SEGMENT;
SlruSyncFileTag(TestSlruCtl, &ftag, path); SlruSyncFileTag(TestSlruCtl, &ftag, path);
elog(NOTICE, "Called SlruSyncFileTag() for segment %u on path %s", elog(NOTICE, "Called SlruSyncFileTag() for segment %lld on path %s",
ftag.segno, path); (long long) ftag.segno, path);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -161,13 +161,14 @@ test_slru_page_sync(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_delete(PG_FUNCTION_ARGS) test_slru_page_delete(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
FileTag ftag; FileTag ftag;
ftag.segno = pageno / SLRU_PAGES_PER_SEGMENT; ftag.segno = pageno / SLRU_PAGES_PER_SEGMENT;
SlruDeleteSegment(TestSlruCtl, ftag.segno); SlruDeleteSegment(TestSlruCtl, ftag.segno);
elog(NOTICE, "Called SlruDeleteSegment() for segment %u", ftag.segno); elog(NOTICE, "Called SlruDeleteSegment() for segment %lld",
(long long) ftag.segno);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -175,7 +176,7 @@ test_slru_page_delete(PG_FUNCTION_ARGS)
Datum Datum
test_slru_page_truncate(PG_FUNCTION_ARGS) test_slru_page_truncate(PG_FUNCTION_ARGS)
{ {
int pageno = PG_GETARG_INT32(0); int64 pageno = PG_GETARG_INT64(0);
SimpleLruTruncate(TestSlruCtl, pageno); SimpleLruTruncate(TestSlruCtl, pageno);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -205,7 +206,7 @@ test_slru_shmem_request(void)
} }
static bool static bool
test_slru_page_precedes_logically(int page1, int page2) test_slru_page_precedes_logically(int64 page1, int64 page2)
{ {
return page1 < page2; return page1 < page2;
} }
@ -213,6 +214,11 @@ test_slru_page_precedes_logically(int page1, int page2)
static void static void
test_slru_shmem_startup(void) test_slru_shmem_startup(void)
{ {
/*
* Short segments names are well tested elsewhere so in this test we are
* focusing on long names.
*/
const bool long_segment_names = true;
const char slru_dir_name[] = "pg_test_slru"; const char slru_dir_name[] = "pg_test_slru";
int test_tranche_id; int test_tranche_id;
@ -233,7 +239,7 @@ test_slru_shmem_startup(void)
TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically; TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically;
SimpleLruInit(TestSlruCtl, "TestSLRU", SimpleLruInit(TestSlruCtl, "TestSLRU",
NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name, NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name,
test_tranche_id, SYNC_HANDLER_NONE); test_tranche_id, SYNC_HANDLER_NONE, long_segment_names);
} }
void void