mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-10-03 05:26:50 +02:00
Use latch instead of select() in walreceiver
Replace use of poll()/select() by WaitLatchOrSocket(), which is more portable and flexible. Also change walreceiver to use its procLatch instead of a custom latch. From: Petr Jelinek <petr@2ndquadrant.com>
This commit is contained in:
parent
b999c247a5
commit
597a87ccc9
@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
|
|||||||
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
|
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
|
||||||
event_name = "WalReceiverWaitStart";
|
event_name = "WalReceiverWaitStart";
|
||||||
break;
|
break;
|
||||||
|
case WAIT_EVENT_LIBPQWALRECEIVER_READ:
|
||||||
|
event_name = "LibPQWalReceiverRead";
|
||||||
|
break;
|
||||||
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
|
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
|
||||||
event_name = "WalSenderWaitForWAL";
|
event_name = "WalSenderWaitForWAL";
|
||||||
break;
|
break;
|
||||||
|
@ -23,19 +23,11 @@
|
|||||||
#include "pqexpbuffer.h"
|
#include "pqexpbuffer.h"
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
#include "pgstat.h"
|
||||||
#include "replication/walreceiver.h"
|
#include "replication/walreceiver.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
|
||||||
#ifdef HAVE_POLL_H
|
|
||||||
#include <poll.h>
|
|
||||||
#endif
|
|
||||||
#ifdef HAVE_SYS_POLL_H
|
|
||||||
#include <sys/poll.h>
|
|
||||||
#endif
|
|
||||||
#ifdef HAVE_SYS_SELECT_H
|
|
||||||
#include <sys/select.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
void _PG_init(void);
|
void _PG_init(void);
|
||||||
@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
|
|||||||
static void libpqrcv_disconnect(void);
|
static void libpqrcv_disconnect(void);
|
||||||
|
|
||||||
/* Prototypes for private functions */
|
/* Prototypes for private functions */
|
||||||
static bool libpq_select(int timeout_ms);
|
|
||||||
static PGresult *libpqrcv_PQexec(const char *query);
|
static PGresult *libpqrcv_PQexec(const char *query);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
|
|||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Wait until we can read WAL stream, or timeout.
|
|
||||||
*
|
|
||||||
* Returns true if data has become available for reading, false if timed out
|
|
||||||
* or interrupted by signal.
|
|
||||||
*
|
|
||||||
* This is based on pqSocketCheck.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
libpq_select(int timeout_ms)
|
|
||||||
{
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
Assert(streamConn != NULL);
|
|
||||||
if (PQsocket(streamConn) < 0)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode_for_socket_access(),
|
|
||||||
errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
|
|
||||||
|
|
||||||
/* We use poll(2) if available, otherwise select(2) */
|
|
||||||
{
|
|
||||||
#ifdef HAVE_POLL
|
|
||||||
struct pollfd input_fd;
|
|
||||||
|
|
||||||
input_fd.fd = PQsocket(streamConn);
|
|
||||||
input_fd.events = POLLIN | POLLERR;
|
|
||||||
input_fd.revents = 0;
|
|
||||||
|
|
||||||
ret = poll(&input_fd, 1, timeout_ms);
|
|
||||||
#else /* !HAVE_POLL */
|
|
||||||
|
|
||||||
fd_set input_mask;
|
|
||||||
struct timeval timeout;
|
|
||||||
struct timeval *ptr_timeout;
|
|
||||||
|
|
||||||
FD_ZERO(&input_mask);
|
|
||||||
FD_SET(PQsocket(streamConn), &input_mask);
|
|
||||||
|
|
||||||
if (timeout_ms < 0)
|
|
||||||
ptr_timeout = NULL;
|
|
||||||
else
|
|
||||||
{
|
|
||||||
timeout.tv_sec = timeout_ms / 1000;
|
|
||||||
timeout.tv_usec = (timeout_ms % 1000) * 1000;
|
|
||||||
ptr_timeout = &timeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = select(PQsocket(streamConn) + 1, &input_mask,
|
|
||||||
NULL, NULL, ptr_timeout);
|
|
||||||
#endif /* HAVE_POLL */
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret == 0 || (ret < 0 && errno == EINTR))
|
|
||||||
return false;
|
|
||||||
if (ret < 0)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode_for_socket_access(),
|
|
||||||
errmsg("select() failed: %m")));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Send a query and wait for the results by using the asynchronous libpq
|
* Send a query and wait for the results by using the asynchronous libpq
|
||||||
* functions and the backend version of select().
|
* functions and the backend version of select().
|
||||||
@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
|
|||||||
*/
|
*/
|
||||||
while (PQisBusy(streamConn))
|
while (PQisBusy(streamConn))
|
||||||
{
|
{
|
||||||
|
int rc;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We don't need to break down the sleep into smaller increments,
|
* We don't need to break down the sleep into smaller increments,
|
||||||
* and check for interrupts after each nap, since we can just
|
* since we'll get interrupted by signals and can either handle
|
||||||
* elog(FATAL) within SIGTERM signal handler if the signal arrives
|
* interrupts here or elog(FATAL) within SIGTERM signal handler if
|
||||||
* in the middle of establishment of replication connection.
|
* the signal arrives in the middle of establishment of
|
||||||
|
* replication connection.
|
||||||
*/
|
*/
|
||||||
if (!libpq_select(-1))
|
ResetLatch(&MyProc->procLatch);
|
||||||
continue; /* interrupted */
|
rc = WaitLatchOrSocket(&MyProc->procLatch,
|
||||||
|
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
|
||||||
|
WL_LATCH_SET,
|
||||||
|
PQsocket(streamConn),
|
||||||
|
0,
|
||||||
|
WAIT_EVENT_LIBPQWALRECEIVER_READ);
|
||||||
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
|
exit(1);
|
||||||
|
|
||||||
|
/* interrupted */
|
||||||
|
if (rc & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (PQconsumeInput(streamConn) == 0)
|
if (PQconsumeInput(streamConn) == 0)
|
||||||
return NULL; /* trouble */
|
return NULL; /* trouble */
|
||||||
}
|
}
|
||||||
|
@ -261,7 +261,7 @@ WalReceiverMain(void)
|
|||||||
/* Arrange to clean up at walreceiver exit */
|
/* Arrange to clean up at walreceiver exit */
|
||||||
on_shmem_exit(WalRcvDie, 0);
|
on_shmem_exit(WalRcvDie, 0);
|
||||||
|
|
||||||
OwnLatch(&walrcv->latch);
|
walrcv->latch = &MyProc->procLatch;
|
||||||
|
|
||||||
/* Properly accept or ignore signals the postmaster might send us */
|
/* Properly accept or ignore signals the postmaster might send us */
|
||||||
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
|
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
|
||||||
@ -483,7 +483,7 @@ WalReceiverMain(void)
|
|||||||
* avoiding some system calls.
|
* avoiding some system calls.
|
||||||
*/
|
*/
|
||||||
Assert(wait_fd != PGINVALID_SOCKET);
|
Assert(wait_fd != PGINVALID_SOCKET);
|
||||||
rc = WaitLatchOrSocket(&walrcv->latch,
|
rc = WaitLatchOrSocket(walrcv->latch,
|
||||||
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
|
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
|
||||||
WL_TIMEOUT | WL_LATCH_SET,
|
WL_TIMEOUT | WL_LATCH_SET,
|
||||||
wait_fd,
|
wait_fd,
|
||||||
@ -491,7 +491,7 @@ WalReceiverMain(void)
|
|||||||
WAIT_EVENT_WAL_RECEIVER_MAIN);
|
WAIT_EVENT_WAL_RECEIVER_MAIN);
|
||||||
if (rc & WL_LATCH_SET)
|
if (rc & WL_LATCH_SET)
|
||||||
{
|
{
|
||||||
ResetLatch(&walrcv->latch);
|
ResetLatch(walrcv->latch);
|
||||||
if (walrcv->force_reply)
|
if (walrcv->force_reply)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
|
|||||||
WakeupRecovery();
|
WakeupRecovery();
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
ResetLatch(&walrcv->latch);
|
ResetLatch(walrcv->latch);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Emergency bailout if postmaster has died. This is to avoid the
|
* Emergency bailout if postmaster has died. This is to avoid the
|
||||||
@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
|
|||||||
}
|
}
|
||||||
SpinLockRelease(&walrcv->mutex);
|
SpinLockRelease(&walrcv->mutex);
|
||||||
|
|
||||||
WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
|
WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
|
||||||
WAIT_EVENT_WAL_RECEIVER_WAIT_START);
|
WAIT_EVENT_WAL_RECEIVER_WAIT_START);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
|
|||||||
/* Ensure that all WAL records received are flushed to disk */
|
/* Ensure that all WAL records received are flushed to disk */
|
||||||
XLogWalRcvFlush(true);
|
XLogWalRcvFlush(true);
|
||||||
|
|
||||||
DisownLatch(&walrcv->latch);
|
walrcv->latch = NULL;
|
||||||
|
|
||||||
SpinLockAcquire(&walrcv->mutex);
|
SpinLockAcquire(&walrcv->mutex);
|
||||||
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
|
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
|
||||||
@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
|
|||||||
|
|
||||||
got_SIGTERM = true;
|
got_SIGTERM = true;
|
||||||
|
|
||||||
SetLatch(&WalRcv->latch);
|
if (WalRcv->latch)
|
||||||
|
SetLatch(WalRcv->latch);
|
||||||
|
|
||||||
/* Don't joggle the elbow of proc_exit */
|
/* Don't joggle the elbow of proc_exit */
|
||||||
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
|
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
|
||||||
@ -1297,7 +1298,8 @@ void
|
|||||||
WalRcvForceReply(void)
|
WalRcvForceReply(void)
|
||||||
{
|
{
|
||||||
WalRcv->force_reply = true;
|
WalRcv->force_reply = true;
|
||||||
SetLatch(&WalRcv->latch);
|
if (WalRcv->latch)
|
||||||
|
SetLatch(WalRcv->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -64,7 +64,7 @@ WalRcvShmemInit(void)
|
|||||||
MemSet(WalRcv, 0, WalRcvShmemSize());
|
MemSet(WalRcv, 0, WalRcvShmemSize());
|
||||||
WalRcv->walRcvState = WALRCV_STOPPED;
|
WalRcv->walRcvState = WALRCV_STOPPED;
|
||||||
SpinLockInit(&WalRcv->mutex);
|
SpinLockInit(&WalRcv->mutex);
|
||||||
InitSharedLatch(&WalRcv->latch);
|
WalRcv->latch = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
|
|||||||
|
|
||||||
if (launch)
|
if (launch)
|
||||||
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
|
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
|
||||||
else
|
else if (walrcv->latch)
|
||||||
SetLatch(&walrcv->latch);
|
SetLatch(walrcv->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -763,6 +763,7 @@ typedef enum
|
|||||||
WAIT_EVENT_CLIENT_WRITE,
|
WAIT_EVENT_CLIENT_WRITE,
|
||||||
WAIT_EVENT_SSL_OPEN_SERVER,
|
WAIT_EVENT_SSL_OPEN_SERVER,
|
||||||
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
|
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
|
||||||
|
WAIT_EVENT_LIBPQWALRECEIVER_READ,
|
||||||
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
|
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
|
||||||
WAIT_EVENT_WAL_SENDER_WRITE_DATA
|
WAIT_EVENT_WAL_SENDER_WRITE_DATA
|
||||||
} WaitEventClient;
|
} WaitEventClient;
|
||||||
|
@ -127,8 +127,9 @@ typedef struct
|
|||||||
* where to start streaming (after setting receiveStart and
|
* where to start streaming (after setting receiveStart and
|
||||||
* receiveStartTLI), and also to tell it to send apply feedback to the
|
* receiveStartTLI), and also to tell it to send apply feedback to the
|
||||||
* primary whenever specially marked commit records are applied.
|
* primary whenever specially marked commit records are applied.
|
||||||
|
* This is normally mapped to procLatch when walreceiver is running.
|
||||||
*/
|
*/
|
||||||
Latch latch;
|
Latch *latch;
|
||||||
} WalRcvData;
|
} WalRcvData;
|
||||||
|
|
||||||
extern WalRcvData *WalRcv;
|
extern WalRcvData *WalRcv;
|
||||||
|
Loading…
Reference in New Issue
Block a user