diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8aa6de1785..75e195f286 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); PG_TRY(); { @@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); PG_TRY(); { @@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (sub->enabled && !slotname) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot set %s for enabled subscription", "slot_name = NONE"))); @@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (!sub->slotname && enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); values[Anum_pg_subscription_subenabled - 1] = @@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); @@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); @@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (!sub->enabled) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, @@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi { /* ERROR. */ ereport(ERROR, - (errmsg("could not drop replication slot \"%s\" on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not drop replication slot \"%s\" on publisher: %s", slotname, res->err))); } @@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not receive list of replicated tables from the publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", res->err))); /* Process tables. */ @@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) } ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to publisher when attempting to " "drop replication slot \"%s\": %s", slotname, err), /* translator: %s is an SQL ALTER command */ errhint("Use %s to disassociate the subscription from the slot.", @@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums) if (strcmp(name, pname) == 0) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("publication name \"%s\" used more than once", pname))); } @@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char * oldpublist = lappend(oldpublist, makeString(name)); else if (!addpub && !found) ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("publication \"%s\" is not in subscription \"%s\"", name, subname))); } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 021c1b36f3..6eaa84a031 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn) if (conn_opts == NULL) ereport(ERROR, - (errmsg("could not parse connection string: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("could not parse connection string: %s", _("out of memory")))); /* build a clean connection string from pieces */ @@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) { PQclear(res); ereport(ERROR, - (errmsg("could not receive database system identifier and timeline ID from " + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive database system identifier and timeline ID from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) PQclear(res); ereport(ERROR, - (errmsg("invalid response from primary server"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid response from primary server"), errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", ntuples, nfields, 3, 1))); } @@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn, pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, strlen(pubnames_str)); if (!pubnames_literal) ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); @@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, { PQclear(res); ereport(ERROR, - (errmsg("could not start WAL streaming: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not start WAL streaming: %s", pchomp(PQerrorMessage(conn->streamConn))))); } PQclear(res); @@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, - (errmsg("could not send end-of-streaming message to primary: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send end-of-streaming message to primary: %s", pchomp(PQerrorMessage(conn->streamConn))))); *next_tli = 0; @@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) */ if (PQnfields(res) < 2 || PQntuples(res) != 1) ereport(ERROR, - (errmsg("unexpected result set after end-of-streaming"))); + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0)); PQclear(res); @@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) /* End the copy */ if (PQendcopy(conn->streamConn)) ereport(ERROR, - (errmsg("error while shutting down streaming COPY: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("error while shutting down streaming COPY: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ @@ -540,7 +549,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) if (PQresultStatus(res) != PGRES_COMMAND_OK) ereport(ERROR, - (errmsg("error reading result of streaming command: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("error reading result of streaming command: %s", pchomp(PQerrorMessage(conn->streamConn))))); PQclear(res); @@ -548,7 +558,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) ereport(ERROR, - (errmsg("unexpected result after CommandComplete: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result after CommandComplete: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, { PQclear(res); ereport(ERROR, - (errmsg("could not receive timeline history file from " + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive timeline history file from " "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, PQclear(res); ereport(ERROR, - (errmsg("invalid response from primary server"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid response from primary server"), errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", ntuples, nfields))); } @@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, /* Try consuming some data. */ if (PQconsumeInput(conn->streamConn) == 0) ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* Now that we've consumed some input, try again */ @@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, return -1; ereport(ERROR, - (errmsg("unexpected result after CommandComplete: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected result after CommandComplete: %s", PQerrorMessage(conn->streamConn)))); } @@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PQclear(res); ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); } } if (rawlen < -1) ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not receive data from WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); /* Return received messages to caller */ @@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, - (errmsg("could not send data to WAL stream: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send data to WAL stream: %s", pchomp(PQerrorMessage(conn->streamConn))))); } @@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, { PQclear(res); ereport(ERROR, - (errmsg("could not create replication slot \"%s\": %s", + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not create replication slot \"%s\": %s", slotname, pchomp(PQerrorMessage(conn->streamConn))))); } @@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, /* Make sure we got expected number of fields. */ if (nfields != nRetTypes) ereport(ERROR, - (errmsg("invalid query response"), + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid query response"), errdetail("Expected %d fields, got %d fields.", nRetTypes, nfields))); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 67f907cdd9..cc50eb875b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname, if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) ereport(ERROR, - (errmsg("table \"%s.%s\" not found on publisher", + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not found on publisher", nspname, relname))); lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); @@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname, if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); /* We don't know the number of rows coming, so allocate enough space. */ @@ -851,7 +854,8 @@ copy_table(Relation rel) pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, - (errmsg("could not start initial contents copy for table \"%s.%s\": %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not start initial contents copy for table \"%s.%s\": %s", lrel.nspname, lrel.relname, res->err))); walrcv_clear_result(res); @@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_connect(MySubscription->conninfo, true, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || @@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, - (errmsg("table copy could not start transaction on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not start transaction on publisher: %s", res->err))); walrcv_clear_result(res); @@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, - (errmsg("table copy could not finish transaction on publisher: %s", + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not finish transaction on publisher: %s", res->err))); walrcv_clear_result(res); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4b112593c6..bbb659dad0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (now >= timeout) ereport(ERROR, - (errmsg("terminating logical replication worker due to timeout"))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); /* Check to see if it's time for a ping. */ if (!ping_sent) @@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg) MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); /* * We don't really use the output identify_system for anything but it diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b94910bfe9..faeea9f0cc 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -279,10 +279,13 @@ WalReceiverMain(void) PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); + wrconn = walrcv_connect(conninfo, false, + cluster_name[0] ? cluster_name : "walreceiver", + &err); if (!wrconn) ereport(ERROR, - (errmsg("could not connect to the primary server: %s", err))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err))); /* * Save user-visible connection string. This clobbers the original @@ -328,7 +331,8 @@ WalReceiverMain(void) if (strcmp(primary_sysid, standby_sysid) != 0) { ereport(ERROR, - (errmsg("database system identifier differs between the primary and standby"), + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("database system identifier differs between the primary and standby"), errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } @@ -339,7 +343,8 @@ WalReceiverMain(void) */ if (primaryTLI < startpointTLI) ereport(ERROR, - (errmsg("highest timeline %u of the primary is behind recovery timeline %u", + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); /* @@ -425,7 +430,8 @@ WalReceiverMain(void) */ if (!RecoveryInProgress()) ereport(FATAL, - (errmsg("cannot continue WAL streaming, recovery has already ended"))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ ProcessWalRcvInterrupts(); @@ -551,7 +557,8 @@ WalReceiverMain(void) if (now >= timeout) ereport(ERROR, - (errmsg("terminating walreceiver due to timeout"))); + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout"))); /* * We didn't receive anything new, for half of