2010-01-20 10:16:24 +01:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* libpqwalreceiver.c
|
|
|
|
*
|
|
|
|
* This file contains the libpq-specific parts of walreceiver. It's
|
|
|
|
* loaded as a dynamic module to avoid linking the main server binary with
|
|
|
|
* libpq.
|
|
|
|
*
|
|
|
|
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
2010-04-19 16:10:45 +02:00
|
|
|
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
|
2010-01-20 10:16:24 +01:00
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include <unistd.h>
|
|
|
|
#include <sys/time.h>
|
|
|
|
|
|
|
|
#include "libpq-fe.h"
|
|
|
|
#include "access/xlog.h"
|
|
|
|
#include "miscadmin.h"
|
|
|
|
#include "replication/walreceiver.h"
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
|
|
|
|
#ifdef HAVE_POLL_H
|
|
|
|
#include <poll.h>
|
|
|
|
#endif
|
|
|
|
#ifdef HAVE_SYS_POLL_H
|
|
|
|
#include <sys/poll.h>
|
|
|
|
#endif
|
|
|
|
#ifdef HAVE_SYS_SELECT_H
|
|
|
|
#include <sys/select.h>
|
|
|
|
#endif
|
|
|
|
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
|
|
|
|
void _PG_init(void);
|
|
|
|
|
|
|
|
/* Current connection to the primary, if any */
|
|
|
|
static PGconn *streamConn = NULL;
|
|
|
|
static bool justconnected = false;
|
|
|
|
|
|
|
|
/* Buffer for currently read records */
|
|
|
|
static char *recvBuf = NULL;
|
|
|
|
|
|
|
|
/* Prototypes for interface functions */
|
|
|
|
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
|
2010-02-03 10:47:19 +01:00
|
|
|
static bool libpqrcv_receive(int timeout, unsigned char *type,
|
2010-02-26 03:01:40 +01:00
|
|
|
char **buffer, int *len);
|
2010-01-20 10:16:24 +01:00
|
|
|
static void libpqrcv_disconnect(void);
|
|
|
|
|
|
|
|
/* Prototypes for private functions */
|
|
|
|
static bool libpq_select(int timeout_ms);
|
2010-04-19 16:10:45 +02:00
|
|
|
static PGresult *libpqrcv_PQexec(const char *query);
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Module load callback
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
_PG_init(void)
|
|
|
|
{
|
|
|
|
/* Tell walreceiver how to reach us */
|
2010-01-20 12:58:44 +01:00
|
|
|
if (walrcv_connect != NULL || walrcv_receive != NULL ||
|
|
|
|
walrcv_disconnect != NULL)
|
2010-01-20 10:16:24 +01:00
|
|
|
elog(ERROR, "libpqwalreceiver already loaded");
|
|
|
|
walrcv_connect = libpqrcv_connect;
|
|
|
|
walrcv_receive = libpqrcv_receive;
|
|
|
|
walrcv_disconnect = libpqrcv_disconnect;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Establish the connection to the primary server for XLOG streaming
|
|
|
|
*/
|
|
|
|
static bool
|
|
|
|
libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
|
|
|
|
{
|
2010-03-19 18:51:42 +01:00
|
|
|
char conninfo_repl[MAXCONNINFO + 18];
|
2010-01-20 10:16:24 +01:00
|
|
|
char *primary_sysid;
|
|
|
|
char standby_sysid[32];
|
|
|
|
TimeLineID primary_tli;
|
|
|
|
TimeLineID standby_tli;
|
|
|
|
PGresult *res;
|
|
|
|
char cmd[64];
|
|
|
|
|
2010-03-19 18:51:42 +01:00
|
|
|
/* Connect using deliberately undocumented parameter: replication */
|
2010-01-20 10:16:24 +01:00
|
|
|
snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
|
|
|
|
|
|
|
|
streamConn = PQconnectdb(conninfo_repl);
|
|
|
|
if (PQstatus(streamConn) != CONNECTION_OK)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not connect to the primary server : %s",
|
|
|
|
PQerrorMessage(streamConn))));
|
|
|
|
|
|
|
|
/*
|
2010-02-26 03:01:40 +01:00
|
|
|
* Get the system identifier and timeline ID as a DataRow message from the
|
|
|
|
* primary server.
|
2010-01-20 10:16:24 +01:00
|
|
|
*/
|
2010-04-19 16:10:45 +02:00
|
|
|
res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
|
2010-01-20 10:16:24 +01:00
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
2010-02-26 03:01:40 +01:00
|
|
|
{
|
2010-01-20 10:16:24 +01:00
|
|
|
PQclear(res);
|
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("could not receive database system identifier and timeline ID from "
|
2010-01-20 10:16:24 +01:00
|
|
|
"the primary server: %s",
|
|
|
|
PQerrorMessage(streamConn))));
|
2010-02-26 03:01:40 +01:00
|
|
|
}
|
2010-01-20 10:16:24 +01:00
|
|
|
if (PQnfields(res) != 2 || PQntuples(res) != 1)
|
|
|
|
{
|
2010-02-26 03:01:40 +01:00
|
|
|
int ntuples = PQntuples(res);
|
|
|
|
int nfields = PQnfields(res);
|
|
|
|
|
2010-01-20 10:16:24 +01:00
|
|
|
PQclear(res);
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("invalid response from primary server"),
|
2010-03-21 01:17:59 +01:00
|
|
|
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
|
2010-01-20 10:16:24 +01:00
|
|
|
ntuples, nfields)));
|
|
|
|
}
|
|
|
|
primary_sysid = PQgetvalue(res, 0, 0);
|
|
|
|
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
|
|
|
|
|
|
|
|
/*
|
2010-02-26 03:01:40 +01:00
|
|
|
* Confirm that the system identifier of the primary is the same as ours.
|
2010-01-20 10:16:24 +01:00
|
|
|
*/
|
|
|
|
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
|
|
|
|
GetSystemIdentifier());
|
|
|
|
if (strcmp(primary_sysid, standby_sysid) != 0)
|
|
|
|
{
|
|
|
|
PQclear(res);
|
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("database system identifier differs between the primary and standby"),
|
|
|
|
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
|
2010-01-20 10:16:24 +01:00
|
|
|
primary_sysid, standby_sysid)));
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2010-02-26 03:01:40 +01:00
|
|
|
* Confirm that the current timeline of the primary is the same as the
|
|
|
|
* recovery target timeline.
|
2010-01-20 10:16:24 +01:00
|
|
|
*/
|
|
|
|
standby_tli = GetRecoveryTargetTLI();
|
|
|
|
PQclear(res);
|
|
|
|
if (primary_tli != standby_tli)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("timeline %u of the primary does not match recovery target timeline %u",
|
|
|
|
primary_tli, standby_tli)));
|
|
|
|
ThisTimeLineID = primary_tli;
|
|
|
|
|
|
|
|
/* Start streaming from the point requested by startup process */
|
|
|
|
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
|
|
|
|
startpoint.xlogid, startpoint.xrecoff);
|
2010-04-19 16:10:45 +02:00
|
|
|
res = libpqrcv_PQexec(cmd);
|
2010-01-20 10:16:24 +01:00
|
|
|
if (PQresultStatus(res) != PGRES_COPY_OUT)
|
2010-04-19 16:10:45 +02:00
|
|
|
{
|
|
|
|
PQclear(res);
|
2010-01-20 10:16:24 +01:00
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("could not start WAL streaming: %s",
|
2010-01-20 10:16:24 +01:00
|
|
|
PQerrorMessage(streamConn))));
|
2010-04-19 16:10:45 +02:00
|
|
|
}
|
2010-01-20 10:16:24 +01:00
|
|
|
PQclear(res);
|
|
|
|
|
|
|
|
justconnected = true;
|
2010-03-19 20:19:38 +01:00
|
|
|
ereport(LOG,
|
|
|
|
(errmsg("streaming replication successfully connected to primary")));
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wait until we can read WAL stream, or timeout.
|
|
|
|
*
|
|
|
|
* Returns true if data has become available for reading, false if timed out
|
|
|
|
* or interrupted by signal.
|
|
|
|
*
|
|
|
|
* This is based on pqSocketCheck.
|
|
|
|
*/
|
|
|
|
static bool
|
|
|
|
libpq_select(int timeout_ms)
|
|
|
|
{
|
2010-02-26 03:01:40 +01:00
|
|
|
int ret;
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
Assert(streamConn != NULL);
|
|
|
|
if (PQsocket(streamConn) < 0)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode_for_socket_access(),
|
|
|
|
errmsg("socket not open")));
|
|
|
|
|
|
|
|
/* We use poll(2) if available, otherwise select(2) */
|
|
|
|
{
|
|
|
|
#ifdef HAVE_POLL
|
|
|
|
struct pollfd input_fd;
|
|
|
|
|
|
|
|
input_fd.fd = PQsocket(streamConn);
|
|
|
|
input_fd.events = POLLIN | POLLERR;
|
|
|
|
input_fd.revents = 0;
|
|
|
|
|
|
|
|
ret = poll(&input_fd, 1, timeout_ms);
|
|
|
|
#else /* !HAVE_POLL */
|
|
|
|
|
|
|
|
fd_set input_mask;
|
|
|
|
struct timeval timeout;
|
|
|
|
struct timeval *ptr_timeout;
|
|
|
|
|
|
|
|
FD_ZERO(&input_mask);
|
2010-02-26 03:01:40 +01:00
|
|
|
FD_SET (PQsocket(streamConn), &input_mask);
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
if (timeout_ms < 0)
|
|
|
|
ptr_timeout = NULL;
|
|
|
|
else
|
|
|
|
{
|
2010-02-26 03:01:40 +01:00
|
|
|
timeout.tv_sec = timeout_ms / 1000;
|
|
|
|
timeout.tv_usec = (timeout_ms % 1000) * 1000;
|
|
|
|
ptr_timeout = &timeout;
|
2010-01-20 10:16:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
ret = select(PQsocket(streamConn) + 1, &input_mask,
|
|
|
|
NULL, NULL, ptr_timeout);
|
|
|
|
#endif /* HAVE_POLL */
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ret == 0 || (ret < 0 && errno == EINTR))
|
|
|
|
return false;
|
|
|
|
if (ret < 0)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode_for_socket_access(),
|
|
|
|
errmsg("select() failed: %m")));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2010-04-19 16:10:45 +02:00
|
|
|
/*
|
|
|
|
* Send a query and wait for the results by using the asynchronous libpq
|
|
|
|
* functions and the backend version of select().
|
|
|
|
*
|
|
|
|
* We must not use the regular blocking libpq functions like PQexec()
|
|
|
|
* since they are uninterruptible by signals on some platforms, such as
|
|
|
|
* Windows.
|
|
|
|
*
|
|
|
|
* We must also not use vanilla select() here since it cannot handle the
|
|
|
|
* signal emulation layer on Windows.
|
|
|
|
*
|
|
|
|
* The function is modeled on PQexec() in libpq, but only implements
|
|
|
|
* those parts that are in use in the walreceiver.
|
|
|
|
*
|
|
|
|
* Queries are always executed on the connection in streamConn.
|
|
|
|
*/
|
|
|
|
static PGresult *
|
|
|
|
libpqrcv_PQexec(const char *query)
|
|
|
|
{
|
|
|
|
PGresult *result = NULL;
|
|
|
|
PGresult *lastResult = NULL;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* PQexec() silently discards any prior query results on the
|
|
|
|
* connection. This is not required for walreceiver since it's
|
|
|
|
* expected that walsender won't generate any such junk results.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Submit a query. Since we don't use non-blocking mode, this also
|
|
|
|
* can block. But its risk is relatively small, so we ignore that
|
|
|
|
* for now.
|
|
|
|
*/
|
|
|
|
if (!PQsendQuery(streamConn, query))
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* Receive data until PQgetResult is ready to get the result
|
|
|
|
* without blocking.
|
|
|
|
*/
|
|
|
|
while (PQisBusy(streamConn))
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* We don't need to break down the sleep into smaller increments,
|
|
|
|
* and check for interrupts after each nap, since we can just
|
|
|
|
* elog(FATAL) within SIGTERM signal handler if the signal
|
|
|
|
* arrives in the middle of establishment of replication connection.
|
|
|
|
*/
|
|
|
|
if (!libpq_select(-1))
|
|
|
|
continue; /* interrupted */
|
|
|
|
if (PQconsumeInput(streamConn) == 0)
|
|
|
|
return NULL; /* trouble */
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Emulate the PQexec()'s behavior of returning the last result
|
|
|
|
* when there are many.
|
|
|
|
* Since walsender will never generate multiple results, we skip
|
|
|
|
* the concatenation of error messages.
|
|
|
|
*/
|
|
|
|
result = PQgetResult(streamConn);
|
|
|
|
if (result == NULL)
|
|
|
|
break; /* query is complete */
|
|
|
|
|
|
|
|
PQclear(lastResult);
|
|
|
|
lastResult = result;
|
|
|
|
|
|
|
|
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
|
|
|
|
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
|
|
|
|
PQstatus(streamConn) == CONNECTION_BAD)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return lastResult;
|
|
|
|
}
|
|
|
|
|
2010-01-20 10:16:24 +01:00
|
|
|
/*
|
|
|
|
* Disconnect connection to primary, if any.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
libpqrcv_disconnect(void)
|
|
|
|
{
|
|
|
|
PQfinish(streamConn);
|
|
|
|
streamConn = NULL;
|
|
|
|
justconnected = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2010-02-03 10:47:19 +01:00
|
|
|
* Receive a message available from XLOG stream, blocking for
|
2010-01-20 10:16:24 +01:00
|
|
|
* maximum of 'timeout' ms.
|
|
|
|
*
|
|
|
|
* Returns:
|
|
|
|
*
|
2010-02-26 03:01:40 +01:00
|
|
|
* True if data was received. *type, *buffer and *len are set to
|
|
|
|
* the type of the received data, buffer holding it, and length,
|
|
|
|
* respectively.
|
2010-01-20 10:16:24 +01:00
|
|
|
*
|
2010-02-26 03:01:40 +01:00
|
|
|
* False if no data was available within timeout, or wait was interrupted
|
|
|
|
* by signal.
|
2010-01-20 10:16:24 +01:00
|
|
|
*
|
|
|
|
* The buffer returned is only valid until the next call of this function or
|
|
|
|
* libpq_connect/disconnect.
|
|
|
|
*
|
|
|
|
* ereports on error.
|
|
|
|
*/
|
|
|
|
static bool
|
2010-02-03 10:47:19 +01:00
|
|
|
libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
|
2010-01-20 10:16:24 +01:00
|
|
|
{
|
|
|
|
int rawlen;
|
|
|
|
|
|
|
|
if (recvBuf != NULL)
|
|
|
|
PQfreemem(recvBuf);
|
|
|
|
recvBuf = NULL;
|
|
|
|
|
|
|
|
/*
|
2010-02-26 03:01:40 +01:00
|
|
|
* If the caller requested to block, wait for data to arrive. But if this
|
|
|
|
* is the first call after connecting, don't wait, because there might
|
|
|
|
* already be some data in libpq buffer that we haven't returned to
|
|
|
|
* caller.
|
2010-01-20 10:16:24 +01:00
|
|
|
*/
|
|
|
|
if (timeout > 0 && !justconnected)
|
|
|
|
{
|
|
|
|
if (!libpq_select(timeout))
|
|
|
|
return false;
|
|
|
|
|
|
|
|
if (PQconsumeInput(streamConn) == 0)
|
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("could not receive data from WAL stream: %s",
|
2010-01-20 10:16:24 +01:00
|
|
|
PQerrorMessage(streamConn))));
|
|
|
|
}
|
|
|
|
justconnected = false;
|
|
|
|
|
|
|
|
/* Receive CopyData message */
|
|
|
|
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
|
2010-02-26 03:01:40 +01:00
|
|
|
if (rawlen == 0) /* no data available yet, then return */
|
2010-01-20 10:16:24 +01:00
|
|
|
return false;
|
2010-02-26 03:01:40 +01:00
|
|
|
if (rawlen == -1) /* end-of-streaming or error */
|
2010-01-20 10:16:24 +01:00
|
|
|
{
|
2010-02-26 03:01:40 +01:00
|
|
|
PGresult *res;
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
res = PQgetResult(streamConn);
|
|
|
|
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
|
|
|
{
|
|
|
|
PQclear(res);
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("replication terminated by primary server")));
|
|
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("could not receive data from WAL stream: %s",
|
2010-01-20 10:16:24 +01:00
|
|
|
PQerrorMessage(streamConn))));
|
|
|
|
}
|
|
|
|
if (rawlen < -1)
|
|
|
|
ereport(ERROR,
|
2010-03-21 01:17:59 +01:00
|
|
|
(errmsg("could not receive data from WAL stream: %s",
|
2010-01-20 10:16:24 +01:00
|
|
|
PQerrorMessage(streamConn))));
|
|
|
|
|
2010-02-03 10:47:19 +01:00
|
|
|
/* Return received messages to caller */
|
|
|
|
*type = *((unsigned char *) recvBuf);
|
|
|
|
*buffer = recvBuf + sizeof(*type);
|
|
|
|
*len = rawlen - sizeof(*type);
|
2010-01-20 10:16:24 +01:00
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|