Reduce memory consumption for pending invalidation messages.

The existing data structures in inval.c are fairly inefficient for
the common case of a command or subtransaction that registers a small
number of cache invalidation events.  While this doesn't matter if we
commit right away, it can build up to a lot of bloat in a transaction
that contains many DDL operations.  By making a few more assumptions
about the expected use-case, we can switch to a representation using
densely-packed arrays.  Although this eliminates some data-copying,
it doesn't seem to make much difference time-wise.  But the space
consumption decreases substantially.

Patch by me; thanks to Nathan Bossart for review.

Discussion: https://postgr.es/m/2380555.1622395376@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2021-08-16 16:48:25 -04:00
parent 069d33d0c5
commit 3aafc030a5
1 changed files with 283 additions and 225 deletions

View File

@ -71,11 +71,6 @@
* manipulating the init file is in relcache.c, but we keep track of the
* need for it here.
*
* The request lists proper are kept in CurTransactionContext of their
* creating (sub)transaction, since they can be forgotten on abort of that
* transaction but must be kept till top-level commit otherwise. For
* simplicity we keep the controlling list-of-lists in TopTransactionContext.
*
* Currently, inval messages are sent without regard for the possibility
* that the object described by the catalog tuple might be a session-local
* object such as a temporary table. This is because (1) this code has
@ -106,7 +101,6 @@
#include "catalog/catalog.h"
#include "catalog/pg_constraint.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "storage/sinval.h"
#include "storage/smgr.h"
#include "utils/catcache.h"
@ -121,35 +115,86 @@
/*
* To minimize palloc traffic, we keep pending requests in successively-
* larger chunks (a slightly more sophisticated version of an expansible
* array). All request types can be stored as SharedInvalidationMessage
* records. The ordering of requests within a list is never significant.
* Pending requests are stored as ready-to-send SharedInvalidationMessages.
* We keep the messages themselves in arrays in TopTransactionContext
* (there are separate arrays for catcache and relcache messages). Control
* information is kept in a chain of TransInvalidationInfo structs, also
* allocated in TopTransactionContext. (We could keep a subtransaction's
* TransInvalidationInfo in its CurTransactionContext; but that's more
* wasteful not less so, since in very many scenarios it'd be the only
* allocation in the subtransaction's CurTransactionContext.)
*
* We can store the message arrays densely, and yet avoid moving data around
* within an array, because within any one subtransaction we need only
* distinguish between messages emitted by prior commands and those emitted
* by the current command. Once a command completes and we've done local
* processing on its messages, we can fold those into the prior-commands
* messages just by changing array indexes in the TransInvalidationInfo
* struct. Similarly, we need distinguish messages of prior subtransactions
* from those of the current subtransaction only until the subtransaction
* completes, after which we adjust the array indexes in the parent's
* TransInvalidationInfo to include the subtransaction's messages.
*
* The ordering of the individual messages within a command's or
* subtransaction's output is not considered significant, although this
* implementation happens to preserve the order in which they were queued.
* (Previous versions of this code did not preserve it.)
*
* For notational convenience, control information is kept in two-element
* arrays, the first for catcache messages and the second for relcache
* messages.
*/
typedef struct InvalidationChunk
{
struct InvalidationChunk *next; /* list link */
int nitems; /* # items currently stored in chunk */
int maxitems; /* size of allocated array in this chunk */
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
} InvalidationChunk;
#define CatCacheMsgs 0
#define RelCacheMsgs 1
typedef struct InvalidationListHeader
/* Pointers to main arrays in TopTransactionContext */
typedef struct InvalMessageArray
{
InvalidationChunk *cclist; /* list of chunks holding catcache msgs */
InvalidationChunk *rclist; /* list of chunks holding relcache msgs */
} InvalidationListHeader;
SharedInvalidationMessage *msgs; /* palloc'd array (can be expanded) */
int maxmsgs; /* current allocated size of array */
} InvalMessageArray;
static InvalMessageArray InvalMessageArrays[2];
/* Control information for one logical group of messages */
typedef struct InvalidationMsgsGroup
{
int firstmsg[2]; /* first index in relevant array */
int nextmsg[2]; /* last+1 index */
} InvalidationMsgsGroup;
/* Macros to help preserve InvalidationMsgsGroup abstraction */
#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \
do { \
(targetgroup)->firstmsg[subgroup] = \
(targetgroup)->nextmsg[subgroup] = \
(priorgroup)->nextmsg[subgroup]; \
} while (0)
#define SetGroupToFollow(targetgroup, priorgroup) \
do { \
SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
} while (0)
#define NumMessagesInSubGroup(group, subgroup) \
((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup])
#define NumMessagesInGroup(group) \
(NumMessagesInSubGroup(group, CatCacheMsgs) + \
NumMessagesInSubGroup(group, RelCacheMsgs))
/*----------------
* Invalidation info is divided into two lists:
* Invalidation messages are divided into two groups:
* 1) events so far in current command, not yet reflected to caches.
* 2) events in previous commands of current transaction; these have
* been reflected to local caches, and must be either broadcast to
* other backends or rolled back from local cache when we commit
* or abort the transaction.
* Actually, we need two such lists for each level of nested transaction,
* Actually, we need such groups for each level of nested transaction,
* so that we can discard events from an aborted subtransaction. When
* a subtransaction commits, we append its lists to the parent's lists.
* a subtransaction commits, we append its events to the parent's groups.
*
* The relcache-file-invalidated flag can just be a simple boolean,
* since we only act on it at transaction commit; we don't care which
@ -165,11 +210,11 @@ typedef struct TransInvalidationInfo
/* Subtransaction nesting depth */
int my_level;
/* head of current-command event list */
InvalidationListHeader CurrentCmdInvalidMsgs;
/* Events emitted by current command */
InvalidationMsgsGroup CurrentCmdInvalidMsgs;
/* head of previous-commands event list */
InvalidationListHeader PriorCmdInvalidMsgs;
/* Events emitted by previous commands of this (sub)transaction */
InvalidationMsgsGroup PriorCmdInvalidMsgs;
/* init file must be invalidated? */
bool RelcacheInitFileInval;
@ -177,10 +222,6 @@ typedef struct TransInvalidationInfo
static TransInvalidationInfo *transInvalInfo = NULL;
static SharedInvalidationMessage *SharedInvalidMessagesArray;
static int numSharedInvalidMessagesArray;
static int maxSharedInvalidMessagesArray;
/* GUC storage */
int debug_discard_caches = 0;
@ -218,124 +259,118 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0;
/* ----------------------------------------------------------------
* Invalidation list support functions
*
* These three routines encapsulate processing of the "chunked"
* representation of what is logically just a list of messages.
* Invalidation subgroup support functions
* ----------------------------------------------------------------
*/
/*
* AddInvalidationMessage
* Add an invalidation message to a list (of chunks).
* Add an invalidation message to a (sub)group.
*
* Note that we do not pay any great attention to maintaining the original
* ordering of the messages.
* The group must be the last active one, since we assume we can add to the
* end of the relevant InvalMessageArray.
*
* subgroup must be CatCacheMsgs or RelCacheMsgs.
*/
static void
AddInvalidationMessage(InvalidationChunk **listHdr,
SharedInvalidationMessage *msg)
AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup,
const SharedInvalidationMessage *msg)
{
InvalidationChunk *chunk = *listHdr;
InvalMessageArray *ima = &InvalMessageArrays[subgroup];
int nextindex = group->nextmsg[subgroup];
if (chunk == NULL)
if (nextindex >= ima->maxmsgs)
{
/* First time through; create initial chunk */
#define FIRSTCHUNKSIZE 32
chunk = (InvalidationChunk *)
MemoryContextAlloc(CurTransactionContext,
offsetof(InvalidationChunk, msgs) +
FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage));
chunk->nitems = 0;
chunk->maxitems = FIRSTCHUNKSIZE;
chunk->next = *listHdr;
*listHdr = chunk;
}
else if (chunk->nitems >= chunk->maxitems)
{
/* Need another chunk; double size of last chunk */
int chunksize = 2 * chunk->maxitems;
if (ima->msgs == NULL)
{
/* Create new storage array in TopTransactionContext */
int reqsize = 32; /* arbitrary */
chunk = (InvalidationChunk *)
MemoryContextAlloc(CurTransactionContext,
offsetof(InvalidationChunk, msgs) +
chunksize * sizeof(SharedInvalidationMessage));
chunk->nitems = 0;
chunk->maxitems = chunksize;
chunk->next = *listHdr;
*listHdr = chunk;
ima->msgs = (SharedInvalidationMessage *)
MemoryContextAlloc(TopTransactionContext,
reqsize * sizeof(SharedInvalidationMessage));
ima->maxmsgs = reqsize;
Assert(nextindex == 0);
}
else
{
/* Enlarge storage array */
int reqsize = 2 * ima->maxmsgs;
ima->msgs = (SharedInvalidationMessage *)
repalloc(ima->msgs,
reqsize * sizeof(SharedInvalidationMessage));
ima->maxmsgs = reqsize;
}
}
/* Okay, add message to current chunk */
chunk->msgs[chunk->nitems] = *msg;
chunk->nitems++;
/* Okay, add message to current group */
ima->msgs[nextindex] = *msg;
group->nextmsg[subgroup]++;
}
/*
* Append one list of invalidation message chunks to another, resetting
* the source chunk-list pointer to NULL.
* Append one subgroup of invalidation messages to another, resetting
* the source subgroup to empty.
*/
static void
AppendInvalidationMessageList(InvalidationChunk **destHdr,
InvalidationChunk **srcHdr)
AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest,
InvalidationMsgsGroup *src,
int subgroup)
{
InvalidationChunk *chunk = *srcHdr;
/* Messages must be adjacent in main array */
Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]);
if (chunk == NULL)
return; /* nothing to do */
/* ... which makes this easy: */
dest->nextmsg[subgroup] = src->nextmsg[subgroup];
while (chunk->next != NULL)
chunk = chunk->next;
chunk->next = *destHdr;
*destHdr = *srcHdr;
*srcHdr = NULL;
/*
* This is handy for some callers and irrelevant for others. But we do it
* always, reasoning that it's bad to leave different groups pointing at
* the same fragment of the message array.
*/
SetSubGroupToFollow(src, dest, subgroup);
}
/*
* Process a list of invalidation messages.
* Process a subgroup of invalidation messages.
*
* This is a macro that executes the given code fragment for each message in
* a message chunk list. The fragment should refer to the message as *msg.
* a message subgroup. The fragment should refer to the message as *msg.
*/
#define ProcessMessageList(listHdr, codeFragment) \
#define ProcessMessageSubGroup(group, subgroup, codeFragment) \
do { \
InvalidationChunk *_chunk; \
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
int _msgindex = (group)->firstmsg[subgroup]; \
int _endmsg = (group)->nextmsg[subgroup]; \
for (; _msgindex < _endmsg; _msgindex++) \
{ \
int _cindex; \
for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \
{ \
SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \
codeFragment; \
} \
SharedInvalidationMessage *msg = \
&InvalMessageArrays[subgroup].msgs[_msgindex]; \
codeFragment; \
} \
} while (0)
/*
* Process a list of invalidation messages group-wise.
* Process a subgroup of invalidation messages as an array.
*
* As above, but the code fragment can handle an array of messages.
* The fragment should refer to the messages as msgs[], with n entries.
*/
#define ProcessMessageListMulti(listHdr, codeFragment) \
#define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \
do { \
InvalidationChunk *_chunk; \
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
{ \
SharedInvalidationMessage *msgs = _chunk->msgs; \
int n = _chunk->nitems; \
int n = NumMessagesInSubGroup(group, subgroup); \
if (n > 0) { \
SharedInvalidationMessage *msgs = \
&InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \
codeFragment; \
} \
} while (0)
/* ----------------------------------------------------------------
* Invalidation set support functions
* Invalidation group support functions
*
* These routines understand about the division of a logical invalidation
* list into separate physical lists for catcache and relcache entries.
* group into separate physical arrays for catcache and relcache entries.
* ----------------------------------------------------------------
*/
@ -343,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
* Add a catcache inval entry
*/
static void
AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group,
int id, uint32 hashValue, Oid dbId)
{
SharedInvalidationMessage msg;
@ -364,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
*/
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->cclist, &msg);
AddInvalidationMessage(group, CatCacheMsgs, &msg);
}
/*
* Add a whole-catalog inval entry
*/
static void
AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
AddCatalogInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid catId)
{
SharedInvalidationMessage msg;
@ -382,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->cclist, &msg);
AddInvalidationMessage(group, CatCacheMsgs, &msg);
}
/*
* Add a relcache inval entry
*/
static void
AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid relId)
{
SharedInvalidationMessage msg;
@ -399,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
* it will never change. InvalidOid for relId means all relations so we
* don't need to add individual ones when it is present.
*/
ProcessMessageList(hdr->rclist,
if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
(msg->rc.relId == relId ||
msg->rc.relId == InvalidOid))
return);
ProcessMessageSubGroup(group, RelCacheMsgs,
if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
(msg->rc.relId == relId ||
msg->rc.relId == InvalidOid))
return);
/* OK, add the item */
msg.rc.id = SHAREDINVALRELCACHE_ID;
@ -412,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->rclist, &msg);
AddInvalidationMessage(group, RelCacheMsgs, &msg);
}
/*
* Add a snapshot inval entry
*
* We put these into the relcache subgroup for simplicity.
*/
static void
AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group,
Oid dbId, Oid relId)
{
SharedInvalidationMessage msg;
/* Don't add a duplicate item */
/* We assume dbId need not be checked because it will never change */
ProcessMessageList(hdr->rclist,
if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
msg->sn.relId == relId)
return);
ProcessMessageSubGroup(group, RelCacheMsgs,
if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
msg->sn.relId == relId)
return);
/* OK, add the item */
msg.sn.id = SHAREDINVALSNAPSHOT_ID;
@ -438,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
/* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
AddInvalidationMessage(&hdr->rclist, &msg);
AddInvalidationMessage(group, RelCacheMsgs, &msg);
}
/*
* Append one list of invalidation messages to another, resetting
* the source list to empty.
* Append one group of invalidation messages to another, resetting
* the source group to empty.
*/
static void
AppendInvalidationMessages(InvalidationListHeader *dest,
InvalidationListHeader *src)
AppendInvalidationMessages(InvalidationMsgsGroup *dest,
InvalidationMsgsGroup *src)
{
AppendInvalidationMessageList(&dest->cclist, &src->cclist);
AppendInvalidationMessageList(&dest->rclist, &src->rclist);
AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
}
/*
* Execute the given function for all the messages in an invalidation list.
* The list is not altered.
* Execute the given function for all the messages in an invalidation group.
* The group is not altered.
*
* catcache entries are processed first, for reasons mentioned above.
*/
static void
ProcessInvalidationMessages(InvalidationListHeader *hdr,
ProcessInvalidationMessages(InvalidationMsgsGroup *group,
void (*func) (SharedInvalidationMessage *msg))
{
ProcessMessageList(hdr->cclist, func(msg));
ProcessMessageList(hdr->rclist, func(msg));
ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
}
/*
@ -472,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
* rather than just one at a time.
*/
static void
ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
void (*func) (const SharedInvalidationMessage *msgs, int n))
{
ProcessMessageListMulti(hdr->cclist, func(msgs, n));
ProcessMessageListMulti(hdr->rclist, func(msgs, n));
ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
}
/* ----------------------------------------------------------------
@ -731,7 +768,7 @@ AcceptInvalidationMessages(void)
/*
* PrepareInvalidationState
* Initialize inval lists for the current (sub)transaction.
* Initialize inval data for the current (sub)transaction.
*/
static void
PrepareInvalidationState(void)
@ -748,12 +785,45 @@ PrepareInvalidationState(void)
myInfo->parent = transInvalInfo;
myInfo->my_level = GetCurrentTransactionNestLevel();
/*
* If there's any previous entry, this one should be for a deeper nesting
* level.
*/
Assert(transInvalInfo == NULL ||
myInfo->my_level > transInvalInfo->my_level);
/* Now, do we have a previous stack entry? */
if (transInvalInfo != NULL)
{
/* Yes; this one should be for a deeper nesting level. */
Assert(myInfo->my_level > transInvalInfo->my_level);
/*
* The parent (sub)transaction must not have any current (i.e.,
* not-yet-locally-processed) messages. If it did, we'd have a
* semantic problem: the new subtransaction presumably ought not be
* able to see those events yet, but since the CommandCounter is
* linear, that can't work once the subtransaction advances the
* counter. This is a convenient place to check for that, as well as
* being important to keep management of the message arrays simple.
*/
if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0)
elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages");
/*
* MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group,
* which is fine for the first (sub)transaction, but otherwise we need
* to update them to follow whatever is already in the arrays.
*/
SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
&transInvalInfo->CurrentCmdInvalidMsgs);
SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
&myInfo->PriorCmdInvalidMsgs);
}
else
{
/*
* Here, we need only clear any array pointers left over from a prior
* transaction.
*/
InvalMessageArrays[CatCacheMsgs].msgs = NULL;
InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
InvalMessageArrays[RelCacheMsgs].msgs = NULL;
InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
}
transInvalInfo = myInfo;
}
@ -777,47 +847,8 @@ PostPrepare_Inval(void)
}
/*
* Collect invalidation messages into SharedInvalidMessagesArray array.
*/
static void
MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
{
/*
* Initialise array first time through in each commit
*/
if (SharedInvalidMessagesArray == NULL)
{
maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
numSharedInvalidMessagesArray = 0;
/*
* Although this is being palloc'd we don't actually free it directly.
* We're so close to EOXact that we now we're going to lose it anyhow.
*/
SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
* sizeof(SharedInvalidationMessage));
}
if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
{
maxSharedInvalidMessagesArray = pg_nextpower2_32(numSharedInvalidMessagesArray + n);
SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
maxSharedInvalidMessagesArray
* sizeof(SharedInvalidationMessage));
}
/*
* Append the next chunk onto the array
*/
memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
msgs, n * sizeof(SharedInvalidationMessage));
numSharedInvalidMessagesArray += n;
}
/*
* xactGetCommittedInvalidationMessages() is executed by
* RecordTransactionCommit() to add invalidation messages onto the
* xactGetCommittedInvalidationMessages() is called by
* RecordTransactionCommit() to collect invalidation messages to add to the
* commit record. This applies only to commit message types, never to
* abort records. Must always run before AtEOXact_Inval(), since that
* removes the data we need to see.
@ -832,7 +863,9 @@ int
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
bool *RelcacheInitFileInval)
{
MemoryContext oldcontext;
SharedInvalidationMessage *msgarray;
int nummsgs;
int nmsgs;
/* Quick exit if we haven't done anything with invalidation messages. */
if (transInvalInfo == NULL)
@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
*RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;
/*
* Walk through TransInvalidationInfo to collect all the messages into a
* single contiguous array of invalidation messages. It must be contiguous
* so we can copy directly into WAL message. Maintain the order that they
* would be processed in by AtEOXact_Inval(), to ensure emulated behaviour
* in redo is as similar as possible to original. We want the same bugs,
* if any, not new ones.
* Collect all the pending messages into a single contiguous array of
* invalidation messages, to simplify what needs to happen while building
* the commit WAL message. Maintain the order that they would be
* processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo
* is as similar as possible to original. We want the same bugs, if any,
* not new ones.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
MakeSharedInvalidMessagesArray);
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
MakeSharedInvalidMessagesArray);
MemoryContextSwitchTo(oldcontext);
*msgs = msgarray = (SharedInvalidationMessage *)
MemoryContextAlloc(CurTransactionContext,
nummsgs * sizeof(SharedInvalidationMessage));
Assert(!(numSharedInvalidMessagesArray > 0 &&
SharedInvalidMessagesArray == NULL));
nmsgs = 0;
ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
CatCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
CatCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
RelCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
RelCacheMsgs,
(memcpy(msgarray + nmsgs,
msgs,
n * sizeof(SharedInvalidationMessage)),
nmsgs += n));
Assert(nmsgs == nummsgs);
*msgs = SharedInvalidMessagesArray;
return numSharedInvalidMessagesArray;
return nmsgs;
}
/*
@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs,
* about CurrentCmdInvalidMsgs too, since those changes haven't touched
* the caches yet.
*
* In any case, reset the various lists to empty. We need not physically
* In any case, reset our state to empty. We need not physically
* free memory here, since TopTransactionContext is about to be emptied
* anyway.
*
@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit)
/* Need not free anything explicitly */
transInvalInfo = NULL;
SharedInvalidMessagesArray = NULL;
numSharedInvalidMessagesArray = 0;
}
/*
@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit)
return;
}
/* Pass up my inval messages to parent */
/*
* Pass up my inval messages to parent. Notice that we stick them in
* PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've
* already been locally processed. (This would trigger the Assert in
* AppendInvalidationMessageSubGroup if the parent's
* CurrentCmdInvalidMsgs isn't empty; but we already checked that in
* PrepareInvalidationState.)
*/
AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs,
&myInfo->PriorCmdInvalidMsgs);
/* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
&myInfo->parent->PriorCmdInvalidMsgs);
/* Pending relcache inval becomes parent's problem too */
if (myInfo->RelcacheInitFileInval)
myInfo->parent->RelcacheInitFileInval = true;
@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
/*
* LogLogicalInvalidations
*
* Emit WAL for invalidations. This is currently only used for logging
* invalidations at the command end or at commit time if any invalidations
* are pending.
* Emit WAL for invalidations caused by the current command.
*
* This is currently only used for logging invalidations at the command end
* or at commit time if any invalidations are pending.
*/
void
LogLogicalInvalidations()
LogLogicalInvalidations(void)
{
xl_xact_invals xlrec;
SharedInvalidationMessage *invalMessages;
int nmsgs = 0;
InvalidationMsgsGroup *group;
int nmsgs;
/* Quick exit if we haven't done anything with invalidation messages. */
if (transInvalInfo == NULL)
return;
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
MakeSharedInvalidMessagesArray);
Assert(!(numSharedInvalidMessagesArray > 0 &&
SharedInvalidMessagesArray == NULL));
invalMessages = SharedInvalidMessagesArray;
nmsgs = numSharedInvalidMessagesArray;
SharedInvalidMessagesArray = NULL;
numSharedInvalidMessagesArray = 0;
group = &transInvalInfo->CurrentCmdInvalidMsgs;
nmsgs = NumMessagesInGroup(group);
if (nmsgs > 0)
{
@ -1549,10 +1605,12 @@ LogLogicalInvalidations()
/* perform insertion */
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
XLogRegisterData((char *) invalMessages,
nmsgs * sizeof(SharedInvalidationMessage));
ProcessMessageSubGroupMulti(group, CatCacheMsgs,
XLogRegisterData((char *) msgs,
n * sizeof(SharedInvalidationMessage)));
ProcessMessageSubGroupMulti(group, RelCacheMsgs,
XLogRegisterData((char *) msgs,
n * sizeof(SharedInvalidationMessage)));
XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
pfree(invalMessages);
}
}