diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e8fb78b331..7f198c2e3e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact) /* If we crash now, we have prepared: WAL replay will fix things */ - /* - * Wake up all walsenders to send WAL up to the PREPARE record immediately - * if replication is enabled - */ - if (max_wal_senders > 0) - WalSndWakeup(); - /* write correct CRC and close file */ if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) { @@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid, /* Flush XLOG to disk */ XLogFlush(recptr); - /* - * Wake up all walsenders to send WAL up to the COMMIT PREPARED record - * immediately if replication is enabled - */ - if (max_wal_senders > 0) - WalSndWakeup(); - /* Mark the transaction committed in pg_clog */ TransactionIdCommitTree(xid, nchildren, children); @@ -2132,13 +2118,6 @@ RecordTransactionAbortPrepared(TransactionId xid, /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); - /* - * Wake up all walsenders to send WAL up to the ABORT PREPARED record - * immediately if replication is enabled - */ - if (max_wal_senders > 0) - WalSndWakeup(); - /* * Mark the transaction aborted in clog. This is not absolutely necessary * but we may as well do it while we are here. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 4755ee6ee4..86b1afa80d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1141,13 +1141,6 @@ RecordTransactionCommit(void) XLogFlush(XactLastRecEnd); - /* - * Wake up all walsenders to send WAL up to the COMMIT record - * immediately if replication is enabled - */ - if (max_wal_senders > 0) - WalSndWakeup(); - /* * Now we may update the CLOG, if we wrote a COMMIT record above */ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cbfa68a4e7..a43e2eeaf3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1025,6 +1025,8 @@ begin:; END_CRIT_SECTION(); + /* wakeup the WalSnd now that we released the WALWriteLock */ + WalSndWakeupProcessRequests(); return RecPtr; } @@ -1208,6 +1210,9 @@ begin:; END_CRIT_SECTION(); + /* wakeup the WalSnd now that we outside contented locks */ + WalSndWakeupProcessRequests(); + return RecPtr; } @@ -1792,6 +1797,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) if (finishing_seg || (xlog_switch && last_iteration)) { issue_xlog_fsync(openLogFile, openLogSegNo); + + /* signal that we need to wakeup WalSnd later */ + WalSndWakeupRequest(); + LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) @@ -1854,7 +1863,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) openLogFile = XLogFileOpen(openLogSegNo); openLogOff = 0; } + issue_xlog_fsync(openLogFile, openLogSegNo); + + /* signal that we need to wakeup WalSnd later */ + WalSndWakeupRequest(); } LogwrtResult.Flush = LogwrtResult.Write; } @@ -2120,6 +2133,9 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); + /* wakeup the WalSnd now that we released the WALWriteLock */ + WalSndWakeupProcessRequests(); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -2245,13 +2261,8 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); - /* - * If we wrote something then we have something to send to standbys also, - * otherwise the replication delay become around 7s with just async - * commit. - */ - if (wrote_something) - WalSndWakeup(); + /* wakeup the WalSnd now that we released the WALWriteLock */ + WalSndWakeupProcessRequests(); return wrote_something; } diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index 65b2fc56e0..335e9f66af 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, * NB: when calling this in a signal handler, be sure to save and restore * errno around it. (That's standard practice in most signal handlers, of * course, but we used to omit it in handlers that only set a flag.) + * + * NB: this function is called from critical sections and signal handlers so + * throwing an error is not a good idea. */ void SetLatch(volatile Latch *latch) diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index eb46dcad1b..1f1ed33dc2 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, return result; } +/* + * The comments above the unix implementation (unix_latch.c) of this function + * apply here as well. + */ void SetLatch(volatile Latch *latch) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 616d4e73e3..912ce9d450 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -81,6 +81,10 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int replication_timeout = 60 * 1000; /* maximum time to send one * WAL data message */ +/* + * State for WalSndWakeupRequest + */ +bool wake_wal_senders = false; /* * These variables are used similarly to openLogFile/Id/Seg/Off, @@ -1395,7 +1399,12 @@ WalSndShmemInit(void) } } -/* Wake up all walsenders */ +/* + * Wake up all walsenders + * + * This will be called inside critical sections, so throwing an error is not + * adviseable. + */ void WalSndWakeup(void) { diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 65536016c2..bb85ccf7b2 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -21,6 +21,7 @@ extern bool am_walsender; extern bool am_cascading_walsender; extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_ready_to_stop; +extern bool wake_wal_senders; /* user-settable parameters */ extern int max_wal_senders; @@ -35,4 +36,27 @@ extern void WalSndRqstFileReload(void); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS); +/* + * Remember that we want to wakeup walsenders later + * + * This is separated from doing the actual wakeup because the writeout is done + * while holding contended locks. + */ +#define WalSndWakeupRequest() \ + do { wake_wal_senders = true; } while (0) + +/* + * wakeup walsenders if there is work to be done + */ +#define WalSndWakeupProcessRequests() \ + do \ + { \ + if (wake_wal_senders) \ + { \ + wake_wal_senders = false; \ + if (max_wal_senders > 0) \ + WalSndWakeup(); \ + } \ + } while (0) + #endif /* _WALSENDER_H */