diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9732982d93..9f5bf4b639 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1156,3 +1156,56 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * Get string representing LogicalRepMsgType. + */ +char * +logicalrep_message_type(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_BEGIN_PREPARE: + return "BEGIN PREPARE"; + case LOGICAL_REP_MSG_PREPARE: + return "PREPARE"; + case LOGICAL_REP_MSG_COMMIT_PREPARED: + return "COMMIT PREPARED"; + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + return "ROLLBACK PREPARED"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_STOP: + return "STREAM STOP"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_PREPARE: + return "STREAM PREPARE"; + } + + elog(ERROR, "invalid logical replication message type \"%c\"", action); + + return NULL; /* keep compiler quiet */ +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 38b493e4f5..295b1e06de 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -203,12 +203,6 @@ typedef struct FlushPosition static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); -typedef struct SlotErrCallbackArg -{ - LogicalRepRelMapEntry *rel; - int remote_attnum; -} SlotErrCallbackArg; - typedef struct ApplyExecutionData { EState *estate; /* executor state, used to track resources */ @@ -221,6 +215,27 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +/* Struct for saving and restoring apply errcontext information */ +typedef struct ApplyErrorCallbackArg +{ + LogicalRepMsgType command; /* 0 if invalid */ + LogicalRepRelMapEntry *rel; + + /* Remote node information */ + int remote_attnum; /* -1 if invalid */ + TransactionId remote_xid; + TimestampTz ts; /* commit, rollback, or prepare timestamp */ +} ApplyErrorCallbackArg; + +static ApplyErrorCallbackArg apply_error_callback_arg = +{ + .command = 0, + .rel = NULL, + .remote_attnum = -1, + .remote_xid = InvalidTransactionId, + .ts = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -335,6 +350,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for apply error callback */ +static void apply_error_callback(void *arg); +static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); +static inline void reset_apply_error_context_info(void); + /* * Should this worker apply changes for given relation. * @@ -580,26 +600,6 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); } -/* - * Error callback to give more context info about data conversion failures - * while reading data from the remote server. - */ -static void -slot_store_error_callback(void *arg) -{ - SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; - LogicalRepRelMapEntry *rel; - - /* Nothing to do if remote attribute number is not set */ - if (errarg->remote_attnum < 0) - return; - - rel = errarg->rel; - errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"", - rel->remoterel.nspname, rel->remoterel.relname, - rel->remoterel.attnames[errarg->remote_attnum]); -} - /* * Store tuple data into slot. * @@ -611,19 +611,9 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, { int natts = slot->tts_tupleDescriptor->natts; int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; ExecClearTuple(slot); - /* Push callback + info on the error context stack */ - errarg.rel = rel; - errarg.remote_attnum = -1; - errcallback.callback = slot_store_error_callback; - errcallback.arg = (void *) &errarg; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - /* Call the "in" function for each non-dropped, non-null attribute */ Assert(natts == rel->attrmap->maplen); for (i = 0; i < natts; i++) @@ -637,7 +627,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, Assert(remoteattnum < tupleData->ncols); - errarg.remote_attnum = remoteattnum; + /* Set attnum for error callback */ + apply_error_callback_arg.remote_attnum = remoteattnum; if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) { @@ -685,7 +676,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, slot->tts_isnull[i] = true; } - errarg.remote_attnum = -1; + /* Reset attnum for error callback */ + apply_error_callback_arg.remote_attnum = -1; } else { @@ -699,9 +691,6 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } } - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - ExecStoreVirtualTuple(slot); } @@ -724,8 +713,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, { int natts = slot->tts_tupleDescriptor->natts; int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; /* We'll fill "slot" with a virtual tuple, so we must start with ... */ ExecClearTuple(slot); @@ -739,14 +726,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum)); memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); - /* For error reporting, push callback + info on the error context stack */ - errarg.rel = rel; - errarg.remote_attnum = -1; - errcallback.callback = slot_store_error_callback; - errcallback.arg = (void *) &errarg; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - /* Call the "in" function for each replaced attribute */ Assert(natts == rel->attrmap->maplen); for (i = 0; i < natts; i++) @@ -763,7 +742,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, { StringInfo colvalue = &tupleData->colvalues[remoteattnum]; - errarg.remote_attnum = remoteattnum; + /* Set attnum for error callback */ + apply_error_callback_arg.remote_attnum = remoteattnum; if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) { @@ -807,13 +787,11 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, slot->tts_isnull[i] = true; } - errarg.remote_attnum = -1; + /* Reset attnum for error callback */ + apply_error_callback_arg.remote_attnum = -1; } } - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - /* And finally, declare that "slot" contains a valid virtual tuple */ ExecStoreVirtualTuple(slot); } @@ -827,6 +805,7 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + set_apply_error_context_xact(begin_data.xid, begin_data.committime); remote_final_lsn = begin_data.final_lsn; @@ -860,6 +839,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -877,6 +857,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); + set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); remote_final_lsn = begin_data.prepare_lsn; @@ -962,6 +943,7 @@ apply_handle_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -974,6 +956,7 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -1001,6 +984,7 @@ apply_handle_commit_prepared(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1013,6 +997,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1050,6 +1035,7 @@ apply_handle_rollback_prepared(StringInfo s) process_syncing_tables(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1076,6 +1062,7 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); + set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1100,6 +1087,8 @@ apply_handle_stream_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1156,6 +1145,8 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + set_apply_error_context_xact(stream_xid, 0); + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -1212,6 +1203,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1235,7 +1227,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + set_apply_error_context_xact(xid, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -1260,6 +1255,8 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + set_apply_error_context_xact(subxid, 0); + subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1284,6 +1281,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1315,6 +1313,8 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + reset_apply_error_context_info(); } /* @@ -1459,6 +1459,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + set_apply_error_context_xact(xid, commit_data.committime); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1473,6 +1474,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1592,6 +1595,9 @@ apply_handle_insert(StringInfo s) return; } + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1615,6 +1621,9 @@ apply_handle_insert(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1713,6 +1722,9 @@ apply_handle_update(StringInfo s) return; } + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1766,6 +1778,9 @@ apply_handle_update(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1869,6 +1884,9 @@ apply_handle_delete(StringInfo s) return; } + /* Set relation for error callback */ + apply_error_callback_arg.rel = rel; + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -1894,6 +1912,9 @@ apply_handle_delete(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + apply_error_callback_arg.rel = NULL; + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -2328,44 +2349,53 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + LogicalRepMsgType saved_command; + + /* + * Set the current command being applied. Since this function can be + * called recusively when applying spooled changes, save the current + * command. + */ + saved_command = apply_error_callback_arg.command; + apply_error_callback_arg.command = action; switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2374,49 +2404,52 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_STOP: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); - return; + break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); - return; + break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); - return; + break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); - return; + break; case LOGICAL_REP_MSG_STREAM_PREPARE: apply_handle_stream_prepare(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Reset the current command */ + apply_error_callback_arg.command = saved_command; } /* @@ -2517,6 +2550,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; TimeLineID tli; + ErrorContextCallback errcallback; /* * Init the ApplyMessageContext which we clean up after each replication @@ -2537,6 +2571,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); + /* + * Push apply error context callback. Fields will be filled during + * applying a change. + */ + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + /* This outer loop iterates once per wait. */ for (;;) { @@ -2737,6 +2779,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } } + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + /* All done */ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } @@ -3649,3 +3694,59 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +/* Error callback to give more context info about the change being applied */ +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + + if (apply_error_callback_arg.command == 0) + return; + + initStringInfo(&buf); + appendStringInfo(&buf, _("processing remote data during \"%s\""), + logicalrep_message_type(errarg->command)); + + /* append relation information */ + if (errarg->rel) + { + appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname); + if (errarg->remote_attnum >= 0) + appendStringInfo(&buf, _(" column \"%s\""), + errarg->rel->remoterel.attnames[errarg->remote_attnum]); + } + + /* append transaction information */ + if (TransactionIdIsNormal(errarg->remote_xid)) + { + appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid); + if (errarg->ts != 0) + appendStringInfo(&buf, _(" at %s"), + timestamptz_to_str(errarg->ts)); + } + + errcontext("%s", buf.data); + pfree(buf.data); +} + +/* Set transaction information of apply error callback */ +static inline void +set_apply_error_context_xact(TransactionId xid, TimestampTz ts) +{ + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.ts = ts; +} + +/* Reset all information of apply error callback */ +static inline void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.command = 0; + apply_error_callback_arg.rel = NULL; + apply_error_callback_arg.remote_attnum = -1; + set_apply_error_context_xact(InvalidTransactionId, 0); +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 95c1561ca0..83741dcf42 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -246,5 +246,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern char *logicalrep_message_type(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 37cf4b2f76..621d0cb4da 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -113,6 +113,7 @@ Append AppendPath AppendRelInfo AppendState +ApplyErrorCallbackArg ApplyExecutionData ApplySubXactData Archive @@ -2423,7 +2424,6 @@ SlabBlock SlabChunk SlabContext SlabSlot -SlotErrCallbackArg SlotNumber SlruCtl SlruCtlData