diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index d799458af1..1e30f338ff 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); static List *SyncRepGetSyncStandbysPriority(bool *am_sync); static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -398,9 +402,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -451,7 +456,11 @@ SyncRepReleaseWaiters(void) /* * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * positions among all sync standbys. (Note: although this step does not + * of itself require holding SyncRepLock, it seems like a good idea to do + * it after acquiring the lock. This ensures that the WAL pointers we use + * to release waiters are newer than any previous execution of this + * routine used.) */ got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); @@ -526,25 +535,41 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; *am_sync = false; + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) + { + if (sync_standbys[i].is_me) + { + *am_sync = true; + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. */ if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -564,15 +589,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -580,27 +606,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; - - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -616,38 +639,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + /* Should have enough candidates, or somebody messed up */ + Assert(nth > 0 && nth <= num_standbys); - foreach(cell, sync_standbys) + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[i] = sync_standbys[i].write; + flush_array[i] = sync_standbys[i].flush; + apply_array[i] = sync_standbys[i].apply; } /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -676,6 +697,111 @@ cmp_lsn(const void *a, const void *b) return 1; } +/* + * Return data about walsenders that are candidates to be sync standbys. + * + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. + * (This might be more or fewer than num_sync; caller must check.) + */ +int +SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys) +{ + int i; + int n; + + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return 0; + + /* Collect raw data from shared memory */ + n = 0; + for (i = 0; i < max_wal_senders; i++) + { + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ + + walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; + + SpinLockAcquire(&walsnd->mutex); + stby->pid = walsnd->pid; + state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; + SpinLockRelease(&walsnd->mutex); + + /* Must be active */ + if (stby->pid == 0) + continue; + + /* Must be streaming or stopping */ + if (state != WALSNDSTATE_STREAMING && + state != WALSNDSTATE_STOPPING) + continue; + + /* Must be synchronous */ + if (stby->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(stby->flush)) + continue; + + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; + } + + /* + * In quorum mode, we return all the candidates. In priority mode, if we + * have too many candidates then return only the num_sync ones of highest + * priority. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY && + n > SyncRepConfig->num_sync) + { + /* Sort by priority ... */ + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); + /* ... then report just the first num_sync ones */ + n = SyncRepConfig->num_sync; + } + + return n; +} + +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; + + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; + + /* + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) + */ + return sa->walsnd_index - sb->walsnd_index; +} + + /* * Return the list of sync standbys, or NIL if no sync standby is connected. * @@ -683,6 +809,10 @@ cmp_lsn(const void *a, const void *b) * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. + * + * XXX This function is BROKEN and should not be used in new code. It has + * an inherent race condition, since the returned list of integer indexes + * might no longer correspond to reality. */ List * SyncRepGetSyncStandbys(bool *am_sync) @@ -942,8 +1072,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) priority = next_highest_priority; } - /* never reached, but keep compiler quiet */ - Assert(false); + /* + * We might get here if the set of sync_standby_priority values in shared + * memory is inconsistent, as can happen transiently after a change in the + * synchronous_standby_names setting. In that case, just return the + * incomplete list we have so far. That will cause the caller to decide + * there aren't enough synchronous candidates, which should be a safe + * choice until the priority values become consistent again. + */ + list_free(pending); return result; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f95dc86296..ebe986559a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2270,14 +2270,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ @@ -3185,7 +3187,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3214,11 +3217,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3233,9 +3235,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3254,6 +3259,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); + /* + * Detect whether walsender is/was considered synchronous. We can + * provide some protection against stale data by checking the PID + * along with walsnd_index. + */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i && + sync_standbys[j].pid == pid) + { + is_sync_standby = true; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3323,7 +3344,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index bc43b4e109..5b022ccb36 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetCandidateStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,6 +92,9 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ +extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys); + +/* obsolete, do not use in new code */ extern List *SyncRepGetSyncStandbys(bool *am_sync); /* called by checkpointer */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477936..07b01ca17e 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,7 +59,7 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; - /* Protects shared variables shown above. */ + /* Protects shared variables shown above (and sync_standby_priority). */ slock_t mutex; /* @@ -71,8 +70,7 @@ typedef struct WalSnd /* * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. + * in synchronous_standby_names, or 0 if not-listed. */ int sync_standby_priority; } WalSnd;