From 5737c12df0581b3298e3e9586bdef170811ce176 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Sat, 25 Mar 2017 14:07:27 +0000 Subject: [PATCH] Report catalog_xmin separately in hot_standby_feedback If the upstream walsender is using a physical replication slot, store the catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a slot and has only a PGPROC entry behaviour doesn't change, as we store the combined xmin and catalog_xmin in the PGPROC entry. Author: Craig Ringer --- doc/src/sgml/protocol.sgml | 33 +++++- src/backend/replication/walreceiver.c | 43 +++++-- src/backend/replication/walsender.c | 108 +++++++++++++----- src/backend/storage/ipc/procarray.c | 12 +- src/include/storage/proc.h | 5 + src/include/storage/procarray.h | 11 ++ .../t/010_logical_decoding_timelines.pl | 38 +++++- 7 files changed, 199 insertions(+), 51 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 48ca414031..b3a50261c3 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1916,10 +1916,11 @@ The commands accepted in walsender mode are: - The standby's current xmin. This may be 0, if the standby is - sending notification that Hot Standby feedback will no longer - be sent on this connection. Later non-zero messages may - reinitiate the feedback mechanism. + The standby's current global xmin, excluding the catalog_xmin from any + replication slots. If both this value and the following + catalog_xmin are 0 this is treated as a notification that Hot Standby + feedback will no longer be sent on this connection. Later non-zero + messages may reinitiate the feedback mechanism. @@ -1929,7 +1930,29 @@ The commands accepted in walsender mode are: - The standby's current epoch. + The epoch of the global xmin xid on the standby. + + + + + + Int32 + + + + The lowest catalog_xmin of any replication slots on the standby. Set to 0 + if no catalog_xmin exists on the standby or if hot standby feedback is being + disabled. + + + + + + Int32 + + + + The epoch of the catalog_xmin xid on the standby. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 31c567b37e..771ac305c3 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed) { TimestampTz now; TransactionId nextXid; - uint32 nextEpoch; - TransactionId xmin; + uint32 xmin_epoch, catalog_xmin_epoch; + TransactionId xmin, catalog_xmin; static TimestampTz sendTime = 0; /* initially true so we always send at least one feedback message */ static bool master_has_standby_xmin = true; @@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed) * everything else has been checked. */ if (hot_standby_feedback) - xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT); + { + TransactionId slot_xmin; + + /* + * Usually GetOldestXmin() would include both global replication slot + * xmin and catalog_xmin in its calculations, but we want to derive + * separate values for each of those. So we ask for an xmin that + * excludes the catalog_xmin. + */ + xmin = GetOldestXmin(NULL, + PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); + + ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); + + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedes(slot_xmin, xmin)) + xmin = slot_xmin; + } else + { xmin = InvalidTransactionId; + catalog_xmin = InvalidTransactionId; + } /* * Get epoch and adjust if nextXid and oldestXmin are different sides of * the epoch boundary. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); + GetNextXidAndEpoch(&nextXid, &xmin_epoch); + catalog_xmin_epoch = xmin_epoch; if (nextXid < xmin) - nextEpoch--; + xmin_epoch --; + if (nextXid < catalog_xmin) + catalog_xmin_epoch --; - elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", - xmin, nextEpoch); + elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", + xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); /* Construct the message and send it. */ resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'h'); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendint(&reply_message, xmin, 4); - pq_sendint(&reply_message, nextEpoch, 4); + pq_sendint(&reply_message, xmin_epoch, 4); + pq_sendint(&reply_message, catalog_xmin, 4); + pq_sendint(&reply_message, catalog_xmin_epoch, 4); walrcv_send(wrconn, reply_message.data, reply_message.len); - if (TransactionIdIsValid(xmin)) + if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) master_has_standby_xmin = true; else master_has_standby_xmin = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a29d0e7cf4..59ae22df8c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -242,6 +242,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); +static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); @@ -1756,7 +1757,7 @@ ProcessStandbyReplyMessage(void) /* compute new replication slot xmin horizon if needed */ static void -PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin) { bool changed = false; ReplicationSlot *slot = MyReplicationSlot; @@ -1777,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + if (!TransactionIdIsNormal(slot->data.catalog_xmin) || + !TransactionIdIsNormal(feedbackCatalogXmin) || + TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) + { + changed = true; + slot->data.catalog_xmin = feedbackCatalogXmin; + slot->effective_catalog_xmin = feedbackCatalogXmin; + } SpinLockRelease(&slot->mutex); if (changed) @@ -1786,60 +1795,93 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) } } +/* + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. + * + * Epoch of nextXid should be same as standby, or if the counter has + * wrapped, then one greater than standby. + * + * This check doesn't care about whether clog exists for these xids + * at all. + */ +static bool +TransactionIdInRecentPast(TransactionId xid, uint32 epoch) +{ + TransactionId nextXid; + uint32 nextEpoch; + + GetNextXidAndEpoch(&nextXid, &nextEpoch); + + if (xid <= nextXid) + { + if (epoch != nextEpoch) + return false; + } + else + { + if (epoch + 1 != nextEpoch) + return false; + } + + if (!TransactionIdPrecedesOrEquals(xid, nextXid)) + return false; /* epoch OK, but it's wrapped around */ + + return true; +} + /* * Hot Standby feedback */ static void ProcessStandbyHSFeedbackMessage(void) { - TransactionId nextXid; - uint32 nextEpoch; TransactionId feedbackXmin; uint32 feedbackEpoch; + TransactionId feedbackCatalogXmin; + uint32 feedbackCatalogEpoch; /* * Decipher the reply message. The caller already consumed the msgtype - * byte. + * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation + * of this message. */ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); + feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); + feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u", + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", feedbackXmin, - feedbackEpoch); + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch); - /* Unset WalSender's xmin if the feedback message value is invalid */ - if (!TransactionIdIsNormal(feedbackXmin)) + /* + * Unset WalSender's xmins if the feedback message values are invalid. + * This happens when the downstream turned hot_standby_feedback off. + */ + if (!TransactionIdIsNormal(feedbackXmin) + && !TransactionIdIsNormal(feedbackCatalogXmin)) { MyPgXact->xmin = InvalidTransactionId; if (MyReplicationSlot != NULL) - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); return; } /* * Check that the provided xmin/epoch are sane, that is, not in the future * and not so far back as to be already wrapped around. Ignore if not. - * - * Epoch of nextXid should be same as standby, or if the counter has - * wrapped, then one greater than standby. */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (TransactionIdIsNormal(feedbackXmin) && + !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch)) + return; - if (feedbackXmin <= nextXid) - { - if (feedbackEpoch != nextEpoch) - return; - } - else - { - if (feedbackEpoch + 1 != nextEpoch) - return; - } - - if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid)) - return; /* epoch OK, but it's wrapped around */ + if (TransactionIdIsNormal(feedbackCatalogXmin) && + !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch)) + return; /* * Set the WalSender's xmin equal to the standby's requested xmin, so that @@ -1864,15 +1906,23 @@ ProcessStandbyHSFeedbackMessage(void) * already since a VACUUM could have just finished calling GetOldestXmin.) * * If we're using a replication slot we reserve the xmin via that, - * otherwise via the walsender's PGXACT entry. + * otherwise via the walsender's PGXACT entry. We can only track the + * catalog xmin separately when using a slot, so we store the least + * of the two provided when not using a slot. * * XXX: It might make sense to generalize the ephemeral slot concept and * always use the slot mechanism to handle the feedback xmin. */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ - PhysicalReplicationSlotNewXmin(feedbackXmin); + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); else - MyPgXact->xmin = feedbackXmin; + { + if (TransactionIdIsNormal(feedbackCatalogXmin) + && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) + MyPgXact->xmin = feedbackCatalogXmin; + else + MyPgXact->xmin = feedbackXmin; + } } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 40c3247d4b..7c2e1e1c85 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1264,6 +1264,10 @@ TransactionIdIsActive(TransactionId xid) * corresponding flags is set. Typically, if you want to ignore ones with * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM. * + * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and + * catalog_xmin of any replication slots that exist in the system when + * calculating the oldest xmin. + * * This is used by VACUUM to decide which deleted tuples must be preserved in * the passed in table. For shared relations backends in all databases must be * considered, but for non-shared relations that's not required, since only @@ -1342,7 +1346,7 @@ GetOldestXmin(Relation rel, int flags) volatile PGPROC *proc = &allProcs[pgprocno]; volatile PGXACT *pgxact = &allPgXact[pgprocno]; - if (pgxact->vacuumFlags & flags) + if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK)) continue; if (allDbs || @@ -1418,7 +1422,8 @@ GetOldestXmin(Relation rel, int flags) /* * Check whether there are replication slots requiring an older xmin. */ - if (TransactionIdIsValid(replication_slot_xmin) && + if (!(flags & PROCARRAY_SLOTS_XMIN) && + TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, result)) result = replication_slot_xmin; @@ -1428,7 +1433,8 @@ GetOldestXmin(Relation rel, int flags) * possible. We need to do so if we're computing the global limit (rel = * NULL) or if the passed relation is a catalog relation of some kind. */ - if ((rel == NULL || + if (!(flags & PROCARRAY_SLOTS_XMIN) && + (rel == NULL || RelationIsAccessibleInLogicalDecoding(rel)) && TransactionIdIsValid(replication_slot_catalog_xmin) && NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 945dd1d592..1b345faa2d 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -44,6 +44,10 @@ struct XidCache * * Note: If you modify these flags, you need to modify PROCARRAY_XXX flags * in src/include/storage/procarray.h. + * + * PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is + * used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able + * to match and ignore processes with this flag set. */ #define PROC_IS_AUTOVACUUM 0x01 /* is it an autovac worker? */ #define PROC_IN_VACUUM 0x02 /* currently running lazy vacuum */ @@ -51,6 +55,7 @@ struct XidCache #define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical * decoding outside xact */ +#define PROC_RESERVED 0x20 /* reserved for procarray */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index c8e1ae517c..9b42e49524 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -32,6 +32,17 @@ #define PROCARRAY_LOGICAL_DECODING_FLAG 0x10 /* currently doing logical * decoding outside xact */ +#define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, + * catalog_xmin */ +/* + * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching + * PGXACT->vacuumFlags. Other flags are used for different purposes and + * have no corresponding PROC flag equivalent. + */ +#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_VACUUM_FLAG | \ + PROCARRAY_ANALYZE_FLAG | \ + PROCARRAY_LOGICAL_DECODING_FLAG) + /* Use the following flags as an input "flags" to GetOldestXmin function */ /* Consider all backends except for logical decoding ones which manage xmin separately */ #define PROCARRAY_FLAGS_DEFAULT PROCARRAY_LOGICAL_DECODING_FLAG diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl index 09830dc39c..4561a06143 100644 --- a/src/test/recovery/t/010_logical_decoding_timelines.pl +++ b/src/test/recovery/t/010_logical_decoding_timelines.pl @@ -20,7 +20,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 10; use RecursiveCopy; use File::Copy; use IPC::Run (); @@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret); # Initialize master node my $node_master = get_new_node('master'); $node_master->init(allows_streaming => 1, has_archiving => 1); -$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); -$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); -$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); -$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->append_conf('postgresql.conf', q[ +wal_level = 'logical' +max_replication_slots = 3 +max_wal_senders = 2 +log_min_messages = 'debug2' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +]); $node_master->dump_info; $node_master->start; @@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;'); my $backup_name = 'b1'; $node_master->backup_fs_hot($backup_name); +$node_master->safe_psql('postgres', + q[SELECT pg_create_physical_replication_slot('phys_slot');]); + my $node_replica = get_new_node('replica'); $node_replica->init_from_backup( $node_master, $backup_name, has_streaming => 1, has_restoring => 1); +$node_replica->append_conf( + 'recovery.conf', q[primary_slot_name = 'phys_slot']); + $node_replica->start; $node_master->safe_psql('postgres', @@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres', is($stdout, 'before_basebackup', 'Expected to find only slot before_basebackup on replica'); +# Examine the physical slot the replica uses to stream changes +# from the master to make sure its hot_standby_feedback +# has locked in a catalog_xmin on the physical slot, and that +# any xmin is < the catalog_xmin +$node_master->poll_query_until('postgres', q[ + SELECT catalog_xmin IS NOT NULL + FROM pg_replication_slots + WHERE slot_name = 'phys_slot' + ]); +my $phys_slot = $node_master->slot('phys_slot'); +isnt($phys_slot->{'xmin'}, '', + 'xmin assigned on physical slot of master'); +isnt($phys_slot->{'catalog_xmin'}, '', + 'catalog_xmin assigned on physical slot of master'); +# Ignore wrap-around here, we're on a new cluster: +cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'}, + 'xmin on physical slot must not be lower than catalog_xmin'); + # Boom, crash $node_master->stop('immediate');