From c6ff84b06a68b71719aa1aaa5f6704d8db1b51f8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sat, 23 Apr 2016 19:18:00 -0700 Subject: [PATCH] Emit invalidations to standby for transactions without xid. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit So far, when a transaction with pending invalidations, but without an assigned xid, committed, we simply ignored those invalidation messages. That's problematic, because those are actually sent for a reason. Known symptoms of this include that existing sessions on a hot-standby replica sometimes fail to notice new concurrently built indexes and visibility map updates. The solution is to WAL log such invalidations in transactions without an xid. We considered to alternatively force-assign an xid, but that'd be problematic for vacuum, which might be run in systems with few xids. Important: This adds a new WAL record, but as the patch has to be back-patched, we can't bump the WAL page magic. This means that standbys have to be updated before primaries; otherwise "PANIC: standby_redo: unknown op code 32" errors can be encountered. XXX: Reported-By: Васильев Дмитрий, Masahiko Sawada Discussion: CAB-SwXY6oH=9twBkXJtgR4UC1NqT-vpYAtxCseME62ADwyK5OA@mail.gmail.com CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w@mail.gmail.com --- src/backend/access/rmgrdesc/standbydesc.c | 54 +++++++++++++++++++ src/backend/access/rmgrdesc/xactdesc.c | 30 ++--------- src/backend/access/transam/xact.c | 18 +++++++ src/backend/replication/logical/decode.c | 9 ++++ .../replication/logical/reorderbuffer.c | 53 +++++++++++------- src/backend/storage/ipc/standby.c | 35 ++++++++++++ src/backend/utils/cache/inval.c | 5 +- src/include/replication/reorderbuffer.h | 2 + src/include/storage/standby.h | 2 + src/include/storage/standbydefs.h | 21 ++++++++ 10 files changed, 181 insertions(+), 48 deletions(-) diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index 4872cfb2d9..e6172ccdf7 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record) standby_desc_running_xacts(buf, xlrec); } + else if (info == XLOG_INVALIDATIONS) + { + xl_invalidations *xlrec = (xl_invalidations *) rec; + + standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, + xlrec->dbId, xlrec->tsId, + xlrec->relcacheInitFileInval); + } } const char * @@ -73,7 +81,53 @@ standby_identify(uint8 info) case XLOG_RUNNING_XACTS: id = "RUNNING_XACTS"; break; + case XLOG_INVALIDATIONS: + id = "INVALIDATIONS"; + break; } return id; } + +/* + * This routine is used by both standby_desc and xact_desc, because + * transaction commits and XLOG_INVALIDATIONS messages contain invalidations; + * it seems pointless to duplicate the code. + */ +void +standby_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval) +{ + int i; + + if (relcacheInitFileInval) + appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", + dbId, tsId); + + appendStringInfoString(buf, "; inval msgs:"); + for (i = 0; i < nmsgs; i++) + { + SharedInvalidationMessage *msg = &msgs[i]; + + if (msg->id >= 0) + appendStringInfo(buf, " catcache %d", msg->id); + else if (msg->id == SHAREDINVALCATALOG_ID) + appendStringInfo(buf, " catalog %u", msg->cat.catId); + else if (msg->id == SHAREDINVALRELCACHE_ID) + appendStringInfo(buf, " relcache %u", msg->rc.relId); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALSMGR_ID) + appendStringInfoString(buf, " smgr"); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfoString(buf, " relmap"); + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfo(buf, " relmap db %u", msg->rm.dbId); + else if (msg->id == SHAREDINVALSNAPSHOT_ID) + appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else + appendStringInfo(buf, " unknown id %d", msg->id); + } +} diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index e8a334c17d..6f07c5cfaa 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -18,6 +18,7 @@ #include "access/xact.h" #include "catalog/catalog.h" #include "storage/sinval.h" +#include "storage/standbydefs.h" #include "utils/timestamp.h" /* @@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId } if (parsed.nmsgs > 0) { - if (XactCompletionRelcacheInitFileInval(parsed.xinfo)) - appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", - parsed.dbId, parsed.tsId); - - appendStringInfoString(buf, "; inval msgs:"); - for (i = 0; i < parsed.nmsgs; i++) - { - SharedInvalidationMessage *msg = &parsed.msgs[i]; - - if (msg->id >= 0) - appendStringInfo(buf, " catcache %d", msg->id); - else if (msg->id == SHAREDINVALCATALOG_ID) - appendStringInfo(buf, " catalog %u", msg->cat.catId); - else if (msg->id == SHAREDINVALRELCACHE_ID) - appendStringInfo(buf, " relcache %u", msg->rc.relId); - /* not expected, but print something anyway */ - else if (msg->id == SHAREDINVALSMGR_ID) - appendStringInfoString(buf, " smgr"); - /* not expected, but print something anyway */ - else if (msg->id == SHAREDINVALRELMAP_ID) - appendStringInfoString(buf, " relmap"); - else if (msg->id == SHAREDINVALSNAPSHOT_ID) - appendStringInfo(buf, " snapshot %u", msg->sn.relId); - else - appendStringInfo(buf, " unknown id %d", msg->id); - } + standby_desc_invalidations( + buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId, + XactCompletionRelcacheInitFileInval(parsed.xinfo)); } if (XactCompletionForceSyncCommit(parsed.xinfo)) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e37331613..95690ff36c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1163,6 +1163,24 @@ RecordTransactionCommit(void) /* Can't have child XIDs either; AssignTransactionId enforces this */ Assert(nchildren == 0); + /* + * Transactions without an assigned xid can contain invalidation + * messages (e.g. explicit relcache invalidations or catcache + * invalidations for inplace updates); standbys need to process + * those. We can't emit a commit record without an xid, and we don't + * want to force assigning an xid, because that'd be problematic for + * e.g. vacuum. Hence we emit a bespoke record for the + * invalidations. We don't want to use that in case a commit record is + * emitted, so they happen synchronously with commits (besides not + * wanting to emit more WAL recoreds). + */ + if (nmsgs != 0) + { + LogStandbyInvalidations(nmsgs, invalMessages, + RelcacheInitFileInval); + wrote_xlog = true; /* not strictly necessary */ + } + /* * If we didn't create XLOG entries, we're done here; otherwise we * should trigger flushing those entries the same as a commit record diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 0cdb0b8a92..0c248f07e8 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_STANDBY_LOCK: break; + case XLOG_INVALIDATIONS: + { + xl_invalidations *invalidations = + (xl_invalidations *) XLogRecGetData(r); + + ReorderBufferImmediateInvalidation( + ctx->reorder, invalidations->nmsgs, invalidations->msgs); + } + break; default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 45207086ac..57821c3402 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) * catalog and we need to update the caches according to that. */ if (txn->base_snapshot != NULL && txn->ninvalidations > 0) - { - bool use_subtxn = IsTransactionOrTransactionBlock(); - - if (use_subtxn) - BeginInternalSubTransaction("replay"); - - /* - * Force invalidations to happen outside of a valid transaction - that - * way entries will just be marked as invalid without accessing the - * catalog. That's advantageous because we don't need to setup the - * full state necessary for catalog access. - */ - if (use_subtxn) - AbortCurrentTransaction(); - - ReorderBufferExecuteInvalidations(rb, txn); - - if (use_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - } + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); else Assert(txn->ninvalidations == 0); @@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) ReorderBufferCleanupTXN(rb, txn); } +/* + * Execute invalidations happening outside the context of a decoded + * transaction. That currently happens either for xid-less commits + * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting + * transactions (via ReorderBufferForget()). + */ +void +ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, + SharedInvalidationMessage *invalidations) +{ + bool use_subtxn = IsTransactionOrTransactionBlock(); + int i; + + if (use_subtxn) + BeginInternalSubTransaction("replay"); + + /* + * Force invalidations to happen outside of a valid transaction - that + * way entries will just be marked as invalid without accessing the + * catalog. That's advantageous because we don't need to setup the + * full state necessary for catalog access. + */ + if (use_subtxn) + AbortCurrentTransaction(); + + for (i = 0; i < ninvalidations; i++) + LocalExecuteInvalidationMessage(&invalidations[i]); + + if (use_subtxn) + RollbackAndReleaseCurrentSubTransaction(); +} /* * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 6a9bf842d3..762dfa65eb 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record) ProcArrayApplyRecoveryInfo(&running); } + else if (info == XLOG_INVALIDATIONS) + { + xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record); + + ProcessCommittedInvalidationMessages(xlrec->msgs, + xlrec->nmsgs, + xlrec->relcacheInitFileInval, + xlrec->dbId, + xlrec->tsId); + } else elog(PANIC, "standby_redo: unknown op code %u", info); } @@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void) */ (void) GetTopTransactionId(); } + +/* + * Emit WAL for invalidations. This currently is only used for commits without + * an xid but which contain invalidations. + */ +void +LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval) +{ + xl_invalidations xlrec; + + /* prepare record */ + memset(&xlrec, 0, sizeof(xlrec)); + xlrec.dbId = MyDatabaseId; + xlrec.tsId = MyDatabaseTableSpace; + xlrec.relcacheInitFileInval = relcacheInitFileInval; + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS); +} diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 924bebbac5..5803518229 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, } /* - * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() - * to process invalidation messages added to commit records. + * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or + * standby_redo() to process invalidation messages. Currently that happens + * only at end-of-xact. * * Relcache init file invalidation requires processing both * before and after we send the SI messages. See AtEOXact_Inval() diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4c54953a51..e0708940a0 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn CommandId cmin, CommandId cmax, CommandId combocid); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, + SharedInvalidationMessage *invalidations); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index aafc9b8a48..52058840a5 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid); extern void LogAccessExclusiveLockPrepare(void); extern XLogRecPtr LogStandbySnapshot(void); +extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval); #endif /* STANDBY_H */ diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h index 609d06edee..bd3c97fe43 100644 --- a/src/include/storage/standbydefs.h +++ b/src/include/storage/standbydefs.h @@ -17,17 +17,23 @@ #include "access/xlogreader.h" #include "lib/stringinfo.h" #include "storage/lockdefs.h" +#include "storage/sinval.h" /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */ extern void standby_redo(XLogReaderState *record); extern void standby_desc(StringInfo buf, XLogReaderState *record); extern const char *standby_identify(uint8 info); +extern void standby_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval); /* * XLOG message types */ #define XLOG_STANDBY_LOCK 0x00 #define XLOG_RUNNING_XACTS 0x10 +#define XLOG_INVALIDATIONS 0x20 typedef struct xl_standby_locks { @@ -50,4 +56,19 @@ typedef struct xl_running_xacts TransactionId xids[FLEXIBLE_ARRAY_MEMBER]; } xl_running_xacts; +/* + * Invalidations for standby, currently only when transactions without an + * assigned xid commit. + */ +typedef struct xl_invalidations +{ + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init file */ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_invalidations; + +#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs) + #endif /* STANDBYDEFS_H */