diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 186057bd93..d8ff8664fb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -716,8 +716,8 @@ static void logicalrep_worker_onexit(int code, Datum arg) { /* Disconnect gracefully from the remote side. */ - if (wrconn) - walrcv_disconnect(wrconn); + if (LogRepWorkerWalRcvConn) + walrcv_disconnect(LogRepWorkerWalRcvConn); logicalrep_worker_detach(); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7881079e96..2db88dc41a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -295,7 +295,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - walrcv_endstreaming(wrconn, &tli); + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); finish_sync_worker(); } else @@ -591,7 +591,7 @@ copy_read_data(void *outbuf, int minread, int maxread) for (;;) { /* Try read the data. */ - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); CHECK_FOR_INTERRUPTS(); @@ -665,7 +665,8 @@ fetch_remote_table_info(char *nspname, char *relname, " AND c.relkind = 'r'", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(tableRow), tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -701,9 +702,11 @@ fetch_remote_table_info(char *nspname, char *relname, " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, - (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""), + (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? + "AND a.attgenerated = ''" : ""), lrel->remoteid); - res = walrcv_exec(wrconn, cmd.data, 4, attrRow); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrRow), attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -773,7 +776,7 @@ copy_table(Relation rel) initStringInfo(&cmd); appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); - res = walrcv_exec(wrconn, cmd.data, 0, NULL); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, @@ -840,8 +843,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * application_name, so that it is different from the main apply worker, * so that synchronous replication can distinguish them. */ - wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); - if (wrconn == NULL) + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -886,7 +890,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * inside the transaction so that we can use the snapshot made * by the slot to get existing data. */ - res = walrcv_exec(wrconn, + res = walrcv_exec(LogRepWorkerWalRcvConn, "BEGIN READ ONLY ISOLATION LEVEL " "REPEATABLE READ", 0, NULL); if (res->status != WALRCV_OK_COMMAND) @@ -903,14 +907,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * that is consistent with the lsn used by the slot to start * decoding. */ - walrcv_create_slot(wrconn, slotname, true, + walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true, CRS_USE_SNAPSHOT, origin_startpos); PushActiveSnapshot(GetTransactionSnapshot()); copy_table(rel); PopActiveSnapshot(); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("table copy could not finish transaction on publisher"), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ce8f4106aa..9f0d13cbfa 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -96,7 +96,7 @@ typedef struct SlotErrCallbackArg static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; @@ -1158,7 +1158,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); if (len != 0) { @@ -1238,7 +1238,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -1268,7 +1268,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { TimeLineID tli; - walrcv_endstreaming(wrconn, &tli); + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); break; } @@ -1431,7 +1431,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) (uint32) (flushpos >> 32), (uint32) flushpos ); - walrcv_send(wrconn, reply_message->data, reply_message->len); + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); if (recvpos > last_recvpos) last_recvpos = recvpos; @@ -1743,9 +1744,9 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); - wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, - &err); - if (wrconn == NULL) + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -1753,8 +1754,7 @@ ApplyWorkerMain(Datum main_arg) * We don't really use the output identify_system for anything but it * does some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(wrconn, &startpointTLI); - + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); } /* @@ -1773,7 +1773,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.publication_names = MySubscription->publications; /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 05f4936419..8d605445d8 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -60,7 +60,7 @@ typedef struct LogicalRepWorker extern MemoryContext ApplyContext; /* libpqreceiver connection */ -extern struct WalReceiverConn *wrconn; +extern struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern Subscription *MySubscription;