diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6c01e7b991..c3b0bf50de 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -32,59 +32,72 @@ PG_MODULE_MAGIC; void _PG_init(void); -/* Current connection to the primary, if any */ -static PGconn *streamConn = NULL; - -/* Buffer for currently read records */ -static char *recvBuf = NULL; +struct WalReceiverConn +{ + /* Current connection to the primary, if any */ + PGconn *streamConn; + /* Used to remember if the connection is logical or physical */ + bool logical; + /* Buffer for currently read records */ + char *recvBuf; +}; /* Prototypes for interface functions */ -static void libpqrcv_connect(char *conninfo); -static char *libpqrcv_get_conninfo(void); -static void libpqrcv_identify_system(TimeLineID *primary_tli); -static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); -static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, - char *slotname); -static void libpqrcv_endstreaming(TimeLineID *next_tli); -static int libpqrcv_receive(char **buffer, pgsocket *wait_fd); -static void libpqrcv_send(const char *buffer, int nbytes); -static void libpqrcv_disconnect(void); +static WalReceiverConn *libpqrcv_connect(const char *conninfo, + bool logical, const char *appname); +static char *libpqrcv_get_conninfo(WalReceiverConn *conn); +static char *libpqrcv_identify_system(WalReceiverConn *conn, + TimeLineID *primary_tli); +static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, + TimeLineID tli, char **filename, + char **content, int *len); +static bool libpqrcv_startstreaming(WalReceiverConn *conn, + TimeLineID tli, XLogRecPtr startpoint, + const char *slotname); +static void libpqrcv_endstreaming(WalReceiverConn *conn, + TimeLineID *next_tli); +static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, + pgsocket *wait_fd); +static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, + int nbytes); +static void libpqrcv_disconnect(WalReceiverConn *conn); + +static WalReceiverFunctionsType PQWalReceiverFunctions = { + libpqrcv_connect, + libpqrcv_get_conninfo, + libpqrcv_identify_system, + libpqrcv_readtimelinehistoryfile, + libpqrcv_startstreaming, + libpqrcv_endstreaming, + libpqrcv_receive, + libpqrcv_send, + libpqrcv_disconnect +}; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(const char *query); +static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); /* - * Module load callback + * Module initialization function */ void _PG_init(void) { - /* Tell walreceiver how to reach us */ - if (walrcv_connect != NULL || walrcv_identify_system != NULL || - walrcv_readtimelinehistoryfile != NULL || - walrcv_startstreaming != NULL || walrcv_endstreaming != NULL || - walrcv_receive != NULL || walrcv_send != NULL || - walrcv_disconnect != NULL) + if (WalReceiverFunctions != NULL) elog(ERROR, "libpqwalreceiver already loaded"); - walrcv_connect = libpqrcv_connect; - walrcv_get_conninfo = libpqrcv_get_conninfo; - walrcv_identify_system = libpqrcv_identify_system; - walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile; - walrcv_startstreaming = libpqrcv_startstreaming; - walrcv_endstreaming = libpqrcv_endstreaming; - walrcv_receive = libpqrcv_receive; - walrcv_send = libpqrcv_send; - walrcv_disconnect = libpqrcv_disconnect; + WalReceiverFunctions = &PQWalReceiverFunctions; } /* * Establish the connection to the primary server for XLOG streaming */ -static void -libpqrcv_connect(char *conninfo) +static WalReceiverConn * +libpqrcv_connect(const char *conninfo, bool logical, const char *appname) { + WalReceiverConn *conn; const char *keys[5]; const char *vals[5]; + int i = 0; /* * We use the expand_dbname parameter to process the connection string (or @@ -93,22 +106,29 @@ libpqrcv_connect(char *conninfo) * database name is ignored by the server in replication mode, but specify * "replication" for .pgpass lookup. */ - keys[0] = "dbname"; - vals[0] = conninfo; - keys[1] = "replication"; - vals[1] = "true"; - keys[2] = "dbname"; - vals[2] = "replication"; - keys[3] = "fallback_application_name"; - vals[3] = "walreceiver"; - keys[4] = NULL; - vals[4] = NULL; + keys[i] = "dbname"; + vals[i] = conninfo; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + if (!logical) + { + keys[++i] = "dbname"; + vals[i] = "replication"; + } + keys[++i] = "fallback_application_name"; + vals[i] = appname; + keys[++i] = NULL; + vals[i] = NULL; - streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); - if (PQstatus(streamConn) != CONNECTION_OK) + conn = palloc0(sizeof(WalReceiverConn)); + conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); + if (PQstatus(conn->streamConn) != CONNECTION_OK) ereport(ERROR, (errmsg("could not connect to the primary server: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); + conn->logical = logical; + + return conn; } /* @@ -116,17 +136,17 @@ libpqrcv_connect(char *conninfo) * are obfuscated. */ static char * -libpqrcv_get_conninfo(void) +libpqrcv_get_conninfo(WalReceiverConn *conn) { PQconninfoOption *conn_opts; PQconninfoOption *conn_opt; PQExpBufferData buf; char *retval; - Assert(streamConn != NULL); + Assert(conn->streamConn != NULL); initPQExpBuffer(&buf); - conn_opts = PQconninfo(streamConn); + conn_opts = PQconninfo(conn->streamConn); if (conn_opts == NULL) ereport(ERROR, @@ -164,25 +184,24 @@ libpqrcv_get_conninfo(void) * Check that primary's system identifier matches ours, and fetch the current * timeline ID of the primary. */ -static void -libpqrcv_identify_system(TimeLineID *primary_tli) +static char * +libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) { PGresult *res; char *primary_sysid; - char standby_sysid[32]; /* * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); ereport(ERROR, (errmsg("could not receive database system identifier and timeline ID from " "the primary server: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } if (PQnfields(res) < 3 || PQntuples(res) != 1) { @@ -195,24 +214,11 @@ libpqrcv_identify_system(TimeLineID *primary_tli) errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", ntuples, nfields, 3, 1))); } - primary_sysid = PQgetvalue(res, 0, 0); + primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); - - /* - * Confirm that the system identifier of the primary is the same as ours. - */ - snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, - GetSystemIdentifier()); - if (strcmp(primary_sysid, standby_sysid) != 0) - { - primary_sysid = pstrdup(primary_sysid); - PQclear(res); - ereport(ERROR, - (errmsg("database system identifier differs between the primary and standby"), - errdetail("The primary's identifier is %s, the standby's identifier is %s.", - primary_sysid, standby_sysid))); - } PQclear(res); + + return primary_sysid; } /* @@ -226,21 +232,30 @@ libpqrcv_identify_system(TimeLineID *primary_tli) * throws an ERROR. */ static bool -libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) +libpqrcv_startstreaming(WalReceiverConn *conn, + TimeLineID tli, XLogRecPtr startpoint, + const char *slotname) { - char cmd[256]; + StringInfoData cmd; PGresult *res; + Assert(!conn->logical); + + initStringInfo(&cmd); + /* Start streaming from the point requested by startup process */ if (slotname != NULL) - snprintf(cmd, sizeof(cmd), - "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname, - (uint32) (startpoint >> 32), (uint32) startpoint, tli); + appendStringInfo(&cmd, + "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", + slotname, + (uint32) (startpoint >> 32), (uint32) startpoint, + tli); else - snprintf(cmd, sizeof(cmd), - "START_REPLICATION %X/%X TIMELINE %u", - (uint32) (startpoint >> 32), (uint32) startpoint, tli); - res = libpqrcv_PQexec(cmd); + appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + tli); + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) { @@ -252,7 +267,7 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) PQclear(res); ereport(ERROR, (errmsg("could not start WAL streaming: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } PQclear(res); return true; @@ -263,14 +278,17 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) * reported by the server, or 0 if it did not report it. */ static void -libpqrcv_endstreaming(TimeLineID *next_tli) +libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) { PGresult *res; - if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) + if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || + PQflush(conn->streamConn)) ereport(ERROR, (errmsg("could not send end-of-streaming message to primary: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); + + *next_tli = 0; /* * After COPY is finished, we should receive a result set indicating the @@ -282,7 +300,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli) * called after receiving CopyDone from the backend - the walreceiver * never terminates replication on its own initiative. */ - res = PQgetResult(streamConn); + res = PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -296,47 +314,58 @@ libpqrcv_endstreaming(TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = PQgetResult(streamConn); + res = PQgetResult(conn->streamConn); + } + else if (PQresultStatus(res) == PGRES_COPY_OUT) + { + PQclear(res); + + /* End the copy */ + PQendcopy(conn->streamConn); + + /* CommandComplete should follow */ + res = PQgetResult(conn->streamConn); } - else - *next_tli = 0; if (PQresultStatus(res) != PGRES_COMMAND_OK) ereport(ERROR, (errmsg("error reading result of streaming command: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); PQclear(res); /* Verify that there are no more results */ - res = PQgetResult(streamConn); + res = PQgetResult(conn->streamConn); if (res != NULL) ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } /* * Fetch the timeline history file for 'tli' from primary. */ static void -libpqrcv_readtimelinehistoryfile(TimeLineID tli, - char **filename, char **content, int *len) +libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, + TimeLineID tli, char **filename, + char **content, int *len) { PGresult *res; char cmd[64]; + Assert(!conn->logical); + /* * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(cmd); + res = libpqrcv_PQexec(conn->streamConn, cmd); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); ereport(ERROR, (errmsg("could not receive timeline history file from " "the primary server: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } if (PQnfields(res) != 2 || PQntuples(res) != 1) { @@ -374,7 +403,7 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli, * Queries are always executed on the connection in streamConn. */ static PGresult * -libpqrcv_PQexec(const char *query) +libpqrcv_PQexec(PGconn *streamConn, const char *query) { PGresult *result = NULL; PGresult *lastResult = NULL; @@ -455,10 +484,12 @@ libpqrcv_PQexec(const char *query) * Disconnect connection to primary, if any. */ static void -libpqrcv_disconnect(void) +libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(streamConn); - streamConn = NULL; + PQfinish(conn->streamConn); + if (conn->recvBuf != NULL) + PQfreemem(conn->recvBuf); + pfree(conn); } /* @@ -478,30 +509,31 @@ libpqrcv_disconnect(void) * ereports on error. */ static int -libpqrcv_receive(char **buffer, pgsocket *wait_fd) +libpqrcv_receive(WalReceiverConn *conn, char **buffer, + pgsocket *wait_fd) { int rawlen; - if (recvBuf != NULL) - PQfreemem(recvBuf); - recvBuf = NULL; + if (conn->recvBuf != NULL) + PQfreemem(conn->recvBuf); + conn->recvBuf = NULL; /* Try to receive a CopyData message */ - rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); if (rawlen == 0) { /* Try consuming some data. */ - if (PQconsumeInput(streamConn) == 0) + if (PQconsumeInput(conn->streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); /* Now that we've consumed some input, try again */ - rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); if (rawlen == 0) { /* Tell caller to try again when our socket is ready. */ - *wait_fd = PQsocket(streamConn); + *wait_fd = PQsocket(conn->streamConn); return 0; } } @@ -509,7 +541,7 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) { PGresult *res; - res = PQgetResult(streamConn); + res = PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_COPY_IN) { @@ -521,16 +553,16 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) PQclear(res); ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } } if (rawlen < -1) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); /* Return received messages to caller */ - *buffer = recvBuf; + *buffer = conn->recvBuf; return rawlen; } @@ -540,11 +572,11 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd) * ereports on error. */ static void -libpqrcv_send(const char *buffer, int nbytes) +libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) { - if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || - PQflush(streamConn)) + if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 || + PQflush(conn->streamConn)) ereport(ERROR, (errmsg("could not send data to WAL stream: %s", - PQerrorMessage(streamConn)))); + PQerrorMessage(conn->streamConn)))); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8bfb041560..cc3cf7d214 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -74,16 +74,9 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; -/* libpqreceiver hooks to these when loaded */ -walrcv_connect_type walrcv_connect = NULL; -walrcv_get_conninfo_type walrcv_get_conninfo = NULL; -walrcv_identify_system_type walrcv_identify_system = NULL; -walrcv_startstreaming_type walrcv_startstreaming = NULL; -walrcv_endstreaming_type walrcv_endstreaming = NULL; -walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL; -walrcv_receive_type walrcv_receive = NULL; -walrcv_send_type walrcv_send = NULL; -walrcv_disconnect_type walrcv_disconnect = NULL; +/* libpqwalreceiver connection */ +static WalReceiverConn *wrconn = NULL; +WalReceiverFunctionsType *WalReceiverFunctions = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -286,14 +279,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || - walrcv_get_conninfo == NULL || - walrcv_startstreaming == NULL || - walrcv_endstreaming == NULL || - walrcv_identify_system == NULL || - walrcv_readtimelinehistoryfile == NULL || - walrcv_receive == NULL || walrcv_send == NULL || - walrcv_disconnect == NULL) + if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* @@ -307,14 +293,14 @@ WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); - walrcv_connect(conninfo); + wrconn = walrcv_connect(conninfo, false, "walreceiver"); DisableWalRcvImmediateExit(); /* * Save user-visible connection string. This clobbers the original * conninfo, for security. */ - tmp_conninfo = walrcv_get_conninfo(); + tmp_conninfo = walrcv_get_conninfo(wrconn); SpinLockAcquire(&walrcv->mutex); memset(walrcv->conninfo, 0, MAXCONNINFO); if (tmp_conninfo) @@ -328,12 +314,25 @@ WalReceiverMain(void) first_stream = true; for (;;) { + char *primary_sysid; + char standby_sysid[32]; + /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command, */ EnableWalRcvImmediateExit(); - walrcv_identify_system(&primaryTLI); + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + ereport(ERROR, + (errmsg("database system identifier differs between the primary and standby"), + errdetail("The primary's identifier is %s, the standby's identifier is %s.", + primary_sysid, standby_sysid))); + } DisableWalRcvImmediateExit(); /* @@ -370,7 +369,7 @@ WalReceiverMain(void) * on the new timeline. */ ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(startpointTLI, startpoint, + if (walrcv_startstreaming(wrconn, startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { if (first_stream) @@ -422,7 +421,7 @@ WalReceiverMain(void) } /* See if we can read data immediately */ - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); if (len != 0) { /* @@ -453,7 +452,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -570,7 +569,7 @@ WalReceiverMain(void) * our side, too. */ EnableWalRcvImmediateExit(); - walrcv_endstreaming(&primaryTLI); + walrcv_endstreaming(wrconn, &primaryTLI); DisableWalRcvImmediateExit(); /* @@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) tli))); EnableWalRcvImmediateExit(); - walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); + walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); DisableWalRcvImmediateExit(); /* @@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg) SpinLockRelease(&walrcv->mutex); /* Terminate the connection gracefully. */ - if (walrcv_disconnect != NULL) - walrcv_disconnect(); + if (wrconn != NULL) + walrcv_disconnect(wrconn); /* Wake up the startup process to notice promptly that we're gone */ WakeupRecovery(); @@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) (uint32) (applyPtr >> 32), (uint32) applyPtr, requestReply ? " (reply requested)" : ""); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); } /* @@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed) pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, nextEpoch, 4); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); if (TransactionIdIsValid(xmin)) master_has_standby_xmin = true; else diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index afbb8d8b95..edb14b5f75 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -134,33 +134,64 @@ typedef struct extern WalRcvData *WalRcv; +struct WalReceiverConn; +typedef struct WalReceiverConn WalReceiverConn; + /* libpqwalreceiver hooks */ -typedef void (*walrcv_connect_type) (char *conninfo); -extern PGDLLIMPORT walrcv_connect_type walrcv_connect; +typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, + const char *appname); +typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); +typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, + TimeLineID *primary_tli); +typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, + TimeLineID tli, + char **filename, + char **content, int *size); +typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, + TimeLineID tli, + XLogRecPtr startpoint, + const char *slotname); +typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, + TimeLineID *next_tli); +typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer, + pgsocket *wait_fd); +typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, + int nbytes); +typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); -typedef char *(*walrcv_get_conninfo_type) (void); -extern PGDLLIMPORT walrcv_get_conninfo_type walrcv_get_conninfo; +typedef struct WalReceiverFunctionsType +{ + walrcv_connect_fn connect; + walrcv_get_conninfo_fn get_conninfo; + walrcv_identify_system_fn identify_system; + walrcv_readtimelinehistoryfile_fn readtimelinehistoryfile; + walrcv_startstreaming_fn startstreaming; + walrcv_endstreaming_fn endstreaming; + walrcv_receive_fn receive; + walrcv_send_fn send; + walrcv_disconnect_fn disconnect; +} WalReceiverFunctionsType; -typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli); -extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system; +extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size); -extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile; - -typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname); -extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; - -typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); -extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; - -typedef int (*walrcv_receive_type) (char **buffer, pgsocket *wait_fd); -extern PGDLLIMPORT walrcv_receive_type walrcv_receive; - -typedef void (*walrcv_send_type) (const char *buffer, int nbytes); -extern PGDLLIMPORT walrcv_send_type walrcv_send; - -typedef void (*walrcv_disconnect_type) (void); -extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; +#define walrcv_connect(conninfo, logical, appname) \ + WalReceiverFunctions->connect(conninfo, logical, appname) +#define walrcv_get_conninfo(conn) \ + WalReceiverFunctions->get_conninfo(conn) +#define walrcv_identify_system(conn, primary_tli) \ + WalReceiverFunctions->identify_system(conn, primary_tli) +#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ + WalReceiverFunctions->readtimelinehistoryfile(conn, tli, filename, content, size) +#define walrcv_startstreaming(conn, tli, startpoint, slotname) \ + WalReceiverFunctions->startstreaming(conn, tli, startpoint, slotname) +#define walrcv_endstreaming(conn, next_tli) \ + WalReceiverFunctions->endstreaming(conn, next_tli) +#define walrcv_receive(conn, buffer, wait_fd) \ + WalReceiverFunctions->receive(conn, buffer, wait_fd) +#define walrcv_send(conn, buffer, nbytes) \ + WalReceiverFunctions->send(conn, buffer, nbytes) +#define walrcv_disconnect(conn) \ + WalReceiverFunctions->disconnect(conn) /* prototypes for functions in walreceiver.c */ extern void WalReceiverMain(void) pg_attribute_noreturn();