diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index 4872cfb2d9..e6172ccdf7 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record) standby_desc_running_xacts(buf, xlrec); } + else if (info == XLOG_INVALIDATIONS) + { + xl_invalidations *xlrec = (xl_invalidations *) rec; + + standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, + xlrec->dbId, xlrec->tsId, + xlrec->relcacheInitFileInval); + } } const char * @@ -73,7 +81,53 @@ standby_identify(uint8 info) case XLOG_RUNNING_XACTS: id = "RUNNING_XACTS"; break; + case XLOG_INVALIDATIONS: + id = "INVALIDATIONS"; + break; } return id; } + +/* + * This routine is used by both standby_desc and xact_desc, because + * transaction commits and XLOG_INVALIDATIONS messages contain invalidations; + * it seems pointless to duplicate the code. + */ +void +standby_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval) +{ + int i; + + if (relcacheInitFileInval) + appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", + dbId, tsId); + + appendStringInfoString(buf, "; inval msgs:"); + for (i = 0; i < nmsgs; i++) + { + SharedInvalidationMessage *msg = &msgs[i]; + + if (msg->id >= 0) + appendStringInfo(buf, " catcache %d", msg->id); + else if (msg->id == SHAREDINVALCATALOG_ID) + appendStringInfo(buf, " catalog %u", msg->cat.catId); + else if (msg->id == SHAREDINVALRELCACHE_ID) + appendStringInfo(buf, " relcache %u", msg->rc.relId); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALSMGR_ID) + appendStringInfoString(buf, " smgr"); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfoString(buf, " relmap"); + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfo(buf, " relmap db %u", msg->rm.dbId); + else if (msg->id == SHAREDINVALSNAPSHOT_ID) + appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else + appendStringInfo(buf, " unknown id %d", msg->id); + } +} diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index e8a334c17d..6f07c5cfaa 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -18,6 +18,7 @@ #include "access/xact.h" #include "catalog/catalog.h" #include "storage/sinval.h" +#include "storage/standbydefs.h" #include "utils/timestamp.h" /* @@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId } if (parsed.nmsgs > 0) { - if (XactCompletionRelcacheInitFileInval(parsed.xinfo)) - appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", - parsed.dbId, parsed.tsId); - - appendStringInfoString(buf, "; inval msgs:"); - for (i = 0; i < parsed.nmsgs; i++) - { - SharedInvalidationMessage *msg = &parsed.msgs[i]; - - if (msg->id >= 0) - appendStringInfo(buf, " catcache %d", msg->id); - else if (msg->id == SHAREDINVALCATALOG_ID) - appendStringInfo(buf, " catalog %u", msg->cat.catId); - else if (msg->id == SHAREDINVALRELCACHE_ID) - appendStringInfo(buf, " relcache %u", msg->rc.relId); - /* not expected, but print something anyway */ - else if (msg->id == SHAREDINVALSMGR_ID) - appendStringInfoString(buf, " smgr"); - /* not expected, but print something anyway */ - else if (msg->id == SHAREDINVALRELMAP_ID) - appendStringInfoString(buf, " relmap"); - else if (msg->id == SHAREDINVALSNAPSHOT_ID) - appendStringInfo(buf, " snapshot %u", msg->sn.relId); - else - appendStringInfo(buf, " unknown id %d", msg->id); - } + standby_desc_invalidations( + buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId, + XactCompletionRelcacheInitFileInval(parsed.xinfo)); } if (XactCompletionForceSyncCommit(parsed.xinfo)) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e37331613..95690ff36c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1163,6 +1163,24 @@ RecordTransactionCommit(void) /* Can't have child XIDs either; AssignTransactionId enforces this */ Assert(nchildren == 0); + /* + * Transactions without an assigned xid can contain invalidation + * messages (e.g. explicit relcache invalidations or catcache + * invalidations for inplace updates); standbys need to process + * those. We can't emit a commit record without an xid, and we don't + * want to force assigning an xid, because that'd be problematic for + * e.g. vacuum. Hence we emit a bespoke record for the + * invalidations. We don't want to use that in case a commit record is + * emitted, so they happen synchronously with commits (besides not + * wanting to emit more WAL recoreds). + */ + if (nmsgs != 0) + { + LogStandbyInvalidations(nmsgs, invalMessages, + RelcacheInitFileInval); + wrote_xlog = true; /* not strictly necessary */ + } + /* * If we didn't create XLOG entries, we're done here; otherwise we * should trigger flushing those entries the same as a commit record diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 0cdb0b8a92..0c248f07e8 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_STANDBY_LOCK: break; + case XLOG_INVALIDATIONS: + { + xl_invalidations *invalidations = + (xl_invalidations *) XLogRecGetData(r); + + ReorderBufferImmediateInvalidation( + ctx->reorder, invalidations->nmsgs, invalidations->msgs); + } + break; default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 45207086ac..57821c3402 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) * catalog and we need to update the caches according to that. */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) - { - bool use_subtxn = IsTransactionOrTransactionBlock(); - - if (use_subtxn) - BeginInternalSubTransaction("replay"); - - /* - * Force invalidations to happen outside of a valid transaction - that - * way entries will just be marked as invalid without accessing the - * catalog. That's advantageous because we don't need to setup the - * full state necessary for catalog access. - */ - if (use_subtxn) - AbortCurrentTransaction(); - - ReorderBufferExecuteInvalidations(rb, txn); - - if (use_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - } + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); else Assert(txn->ninvalidations == 0); @@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) ReorderBufferCleanupTXN(rb, txn); } +/* + * Execute invalidations happening outside the context of a decoded + * transaction. That currently happens either for xid-less commits + * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting + * transactions (via ReorderBufferForget()). + */ +void +ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, + SharedInvalidationMessage *invalidations) +{ + bool use_subtxn = IsTransactionOrTransactionBlock(); + int i; + + if (use_subtxn) + BeginInternalSubTransaction("replay"); + + /* + * Force invalidations to happen outside of a valid transaction - that + * way entries will just be marked as invalid without accessing the + * catalog. That's advantageous because we don't need to setup the + * full state necessary for catalog access. + */ + if (use_subtxn) + AbortCurrentTransaction(); + + for (i = 0; i < ninvalidations; i++) + LocalExecuteInvalidationMessage(&invalidations[i]); + + if (use_subtxn) + RollbackAndReleaseCurrentSubTransaction(); +} /* * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 6a9bf842d3..762dfa65eb 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record) ProcArrayApplyRecoveryInfo(&running); } + else if (info == XLOG_INVALIDATIONS) + { + xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record); + + ProcessCommittedInvalidationMessages(xlrec->msgs, + xlrec->nmsgs, + xlrec->relcacheInitFileInval, + xlrec->dbId, + xlrec->tsId); + } else elog(PANIC, "standby_redo: unknown op code %u", info); } @@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void) */ (void) GetTopTransactionId(); } + +/* + * Emit WAL for invalidations. This currently is only used for commits without + * an xid but which contain invalidations. + */ +void +LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval) +{ + xl_invalidations xlrec; + + /* prepare record */ + memset(&xlrec, 0, sizeof(xlrec)); + xlrec.dbId = MyDatabaseId; + xlrec.tsId = MyDatabaseTableSpace; + xlrec.relcacheInitFileInval = relcacheInitFileInval; + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS); +} diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 924bebbac5..5803518229 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, } /* - * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() - * to process invalidation messages added to commit records. + * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or + * standby_redo() to process invalidation messages. Currently that happens + * only at end-of-xact. * * Relcache init file invalidation requires processing both * before and after we send the SI messages. See AtEOXact_Inval() diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4c54953a51..e0708940a0 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn CommandId cmin, CommandId cmax, CommandId combocid); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, + SharedInvalidationMessage *invalidations); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index aafc9b8a48..52058840a5 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid); extern void LogAccessExclusiveLockPrepare(void); extern XLogRecPtr LogStandbySnapshot(void); +extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval); #endif /* STANDBY_H */ diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h index 609d06edee..bd3c97fe43 100644 --- a/src/include/storage/standbydefs.h +++ b/src/include/storage/standbydefs.h @@ -17,17 +17,23 @@ #include "access/xlogreader.h" #include "lib/stringinfo.h" #include "storage/lockdefs.h" +#include "storage/sinval.h" /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */ extern void standby_redo(XLogReaderState *record); extern void standby_desc(StringInfo buf, XLogReaderState *record); extern const char *standby_identify(uint8 info); +extern void standby_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval); /* * XLOG message types */ #define XLOG_STANDBY_LOCK 0x00 #define XLOG_RUNNING_XACTS 0x10 +#define XLOG_INVALIDATIONS 0x20 typedef struct xl_standby_locks { @@ -50,4 +56,19 @@ typedef struct xl_running_xacts TransactionId xids[FLEXIBLE_ARRAY_MEMBER]; } xl_running_xacts; +/* + * Invalidations for standby, currently only when transactions without an + * assigned xid commit. + */ +typedef struct xl_invalidations +{ + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init file */ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_invalidations; + +#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs) + #endif /* STANDBYDEFS_H */