diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7123d4169d..765d58d120 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -99,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { /* Prototypes for private functions */ static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); +static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + ProcessWalRcvInterrupts(); } /* If socket is ready, advance the libpq state machine */ @@ -456,6 +457,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) { PGresult *res; + /* + * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * block, but the risk seems small. + */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, @@ -472,7 +477,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -486,7 +491,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -499,7 +504,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -509,7 +514,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", @@ -572,12 +577,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * The function is modeled on PQexec() in libpq, but only implements * those parts that are in use in the walreceiver api. * - * Queries are always executed on the connection in streamConn. + * May return NULL, rather than an error result, on failure. */ static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query) { - PGresult *result = NULL; PGresult *lastResult = NULL; /* @@ -588,60 +592,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) */ /* - * Submit a query. Since we don't use non-blocking mode, this also can - * block. But its risk is relatively small, so we ignore that for now. + * Submit the query. Since we don't use non-blocking mode, this could + * theoretically block. In practice, since we don't send very long query + * strings, the risk seems negligible. */ if (!PQsendQuery(streamConn, query)) return NULL; for (;;) { - /* - * Receive data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; + /* Wait for, and collect, the next PGresult. */ + PGresult *result; - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can either handle - * interrupts here or elog(FATAL) within SIGTERM signal handler if - * the signal arrives in the middle of establishment of - * replication connection. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; drop whatever we had and return NULL */ - PQclear(lastResult); - return NULL; - } - } + result = libpqrcv_PQgetResult(streamConn); + if (result == NULL) + break; /* query is complete, or failure */ /* * Emulate PQexec()'s behavior of returning the last result when there * are many. We are fine with returning just last error message. */ - result = PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete */ - PQclear(lastResult); lastResult = result; @@ -655,6 +625,51 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) return lastResult; } +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static PGresult * +libpqrcv_PQgetResult(PGconn *streamConn) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(streamConn)) + { + int rc; + + /* + * We don't need to break down the sleep into smaller increments, + * since we'll get interrupted by signals and can handle any + * interrupts here. + */ + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | + WL_LATCH_SET, + PQsocket(streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessWalRcvInterrupts(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(streamConn) == 0) + { + /* trouble; return NULL */ + return NULL; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(streamConn); +} + /* * Disconnect connection to primary, if any. */ @@ -716,13 +731,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) { PQclear(res); @@ -886,7 +901,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, { char *cstrs[MaxTupleAttributeNumber]; - CHECK_FOR_INTERRUPTS(); + ProcessWalRcvInterrupts(); /* Do the allocations in temporary context. */ oldcontext = MemoryContextSwitchTo(rowcontext); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f32cf91ffb..d52ec7b2cf 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -112,28 +112,7 @@ static struct static StringInfoData reply_message; static StringInfoData incoming_message; -/* - * About SIGTERM handling: - * - * We can't just exit(1) within SIGTERM signal handler, because the signal - * might arrive in the middle of some critical operation, like while we're - * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking operations - * like libpqrcv_PQexec(), which can take a long time to finish. - * - * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's - * safe for the signal handler to elog(FATAL) immediately. Otherwise it just - * sets got_SIGTERM flag, which is checked in the main loop when convenient. - * - * This is very much like what regular backends do with ImmediateInterruptOK, - * ProcessInterrupts() etc. - */ -static volatile bool WalRcvImmediateInterruptOK = false; - /* Prototypes for private functions */ -static void ProcessWalRcvInterrupts(void); -static void EnableWalRcvImmediateExit(void); -static void DisableWalRcvImmediateExit(void); static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); @@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); -static void +/* + * Process any interrupts the walreceiver process may have received. + * This should be called any time the process's latch has become set. + * + * Currently, only SIGTERM is of interest. We can't just exit(1) within the + * SIGTERM signal handler, because the signal might arrive in the middle of + * some critical operation, like while we're holding a spinlock. Instead, the + * signal handler sets a flag variable as well as setting the process's latch. + * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the + * latch has become set. Operations that could block for a long time, such as + * reading from a remote server, must pay attention to the latch too; see + * libpqrcv_PQgetResult for example. + */ +void ProcessWalRcvInterrupts(void) { /* @@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void) if (got_SIGTERM) { - WalRcvImmediateInterruptOK = false; ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating walreceiver process due to administrator command"))); } } -static void -EnableWalRcvImmediateExit(void) -{ - WalRcvImmediateInterruptOK = true; - ProcessWalRcvInterrupts(); -} - -static void -DisableWalRcvImmediateExit(void) -{ - WalRcvImmediateInterruptOK = false; - ProcessWalRcvInterrupts(); -} /* Main entry point for walreceiver process */ void @@ -292,12 +270,10 @@ WalReceiverMain(void) PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ - EnableWalRcvImmediateExit(); wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) ereport(ERROR, (errmsg("could not connect to the primary server: %s", err))); - DisableWalRcvImmediateExit(); /* * Save user-visible connection string. This clobbers the original @@ -336,7 +312,6 @@ WalReceiverMain(void) * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command. */ - EnableWalRcvImmediateExit(); primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, @@ -348,7 +323,6 @@ WalReceiverMain(void) errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } - DisableWalRcvImmediateExit(); /* * Confirm that the current timeline of the primary is the same or @@ -509,6 +483,8 @@ WalReceiverMain(void) if (rc & WL_LATCH_SET) { ResetLatch(walrcv->latch); + ProcessWalRcvInterrupts(); + if (walrcv->force_reply) { /* @@ -577,9 +553,7 @@ WalReceiverMain(void) * The backend finished streaming. Exit streaming COPY-mode from * our side, too. */ - EnableWalRcvImmediateExit(); walrcv_endstreaming(wrconn, &primaryTLI); - DisableWalRcvImmediateExit(); /* * If the server had switched to a new timeline that we didn't @@ -726,9 +700,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) (errmsg("fetching timeline history file for timeline %u from primary server", tli))); - EnableWalRcvImmediateExit(); walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); - DisableWalRcvImmediateExit(); /* * Check that the filename on the master matches what we @@ -805,7 +777,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) errno = save_errno; } -/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ +/* SIGTERM: set flag for ProcessWalRcvInterrupts */ static void WalRcvShutdownHandler(SIGNAL_ARGS) { @@ -816,10 +788,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS) if (WalRcv->latch) SetLatch(WalRcv->latch); - /* Don't joggle the elbow of proc_exit */ - if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) - ProcessWalRcvInterrupts(); - errno = save_errno; } diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 33e89cae36..7f2927cb46 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -302,6 +302,7 @@ walrcv_clear_result(WalRcvExecResult *walres) /* prototypes for functions in walreceiver.c */ extern void WalReceiverMain(void) pg_attribute_noreturn(); +extern void ProcessWalRcvInterrupts(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void);