Add logical change details to logical replication worker errcontext.

Previously, on the subscriber, we set the error context callback for the
tuple data conversion failures. This commit replaces the existing error
context callback with a comprehensive one so that it shows not only the
details of data conversion failures but also the details of logical change
being applied by the apply worker or table sync worker. The additional
information displayed will be the command, transaction id, and timestamp.

The error context is added to an error only when applying a change but not
while doing other work like receiving data etc.

This will help users in diagnosing the problems that occur during logical
replication. It also can be used for future work that allows skipping a
particular transaction on the subscriber.

Author: Masahiko Sawada
Reviewed-by: Hou Zhijie, Greg Nancarrow, Haiying Tang, Amit Kapila
Tested-by: Haiying Tang
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-08-27 08:30:23 +05:30
parent 191dce109b
commit abc0910e2e
4 changed files with 235 additions and 80 deletions

View File

@ -1156,3 +1156,56 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
*xid = pq_getmsgint(in, 4); *xid = pq_getmsgint(in, 4);
*subxid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4);
} }
/*
* Get string representing LogicalRepMsgType.
*/
char *
logicalrep_message_type(LogicalRepMsgType action)
{
switch (action)
{
case LOGICAL_REP_MSG_BEGIN:
return "BEGIN";
case LOGICAL_REP_MSG_COMMIT:
return "COMMIT";
case LOGICAL_REP_MSG_ORIGIN:
return "ORIGIN";
case LOGICAL_REP_MSG_INSERT:
return "INSERT";
case LOGICAL_REP_MSG_UPDATE:
return "UPDATE";
case LOGICAL_REP_MSG_DELETE:
return "DELETE";
case LOGICAL_REP_MSG_TRUNCATE:
return "TRUNCATE";
case LOGICAL_REP_MSG_RELATION:
return "RELATION";
case LOGICAL_REP_MSG_TYPE:
return "TYPE";
case LOGICAL_REP_MSG_MESSAGE:
return "MESSAGE";
case LOGICAL_REP_MSG_BEGIN_PREPARE:
return "BEGIN PREPARE";
case LOGICAL_REP_MSG_PREPARE:
return "PREPARE";
case LOGICAL_REP_MSG_COMMIT_PREPARED:
return "COMMIT PREPARED";
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
return "ROLLBACK PREPARED";
case LOGICAL_REP_MSG_STREAM_START:
return "STREAM START";
case LOGICAL_REP_MSG_STREAM_STOP:
return "STREAM STOP";
case LOGICAL_REP_MSG_STREAM_COMMIT:
return "STREAM COMMIT";
case LOGICAL_REP_MSG_STREAM_ABORT:
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);
return NULL; /* keep compiler quiet */
}

View File

@ -203,12 +203,6 @@ typedef struct FlushPosition
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
LogicalRepRelMapEntry *rel;
int remote_attnum;
} SlotErrCallbackArg;
typedef struct ApplyExecutionData typedef struct ApplyExecutionData
{ {
EState *estate; /* executor state, used to track resources */ EState *estate; /* executor state, used to track resources */
@ -221,6 +215,27 @@ typedef struct ApplyExecutionData
PartitionTupleRouting *proute; /* partition routing info */ PartitionTupleRouting *proute; /* partition routing info */
} ApplyExecutionData; } ApplyExecutionData;
/* Struct for saving and restoring apply errcontext information */
typedef struct ApplyErrorCallbackArg
{
LogicalRepMsgType command; /* 0 if invalid */
LogicalRepRelMapEntry *rel;
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.ts = 0,
};
/* /*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the * Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle. * xidhash and along with it create the streaming file and store the fileset handle.
@ -335,6 +350,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
/* Common streaming function to apply all the spooled messages */ /* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
static inline void reset_apply_error_context_info(void);
/* /*
* Should this worker apply changes for given relation. * Should this worker apply changes for given relation.
* *
@ -580,26 +600,6 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
} }
/*
* Error callback to give more context info about data conversion failures
* while reading data from the remote server.
*/
static void
slot_store_error_callback(void *arg)
{
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
LogicalRepRelMapEntry *rel;
/* Nothing to do if remote attribute number is not set */
if (errarg->remote_attnum < 0)
return;
rel = errarg->rel;
errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
rel->remoterel.nspname, rel->remoterel.relname,
rel->remoterel.attnames[errarg->remote_attnum]);
}
/* /*
* Store tuple data into slot. * Store tuple data into slot.
* *
@ -611,19 +611,9 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
{ {
int natts = slot->tts_tupleDescriptor->natts; int natts = slot->tts_tupleDescriptor->natts;
int i; int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
ExecClearTuple(slot); ExecClearTuple(slot);
/* Push callback + info on the error context stack */
errarg.rel = rel;
errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* Call the "in" function for each non-dropped, non-null attribute */ /* Call the "in" function for each non-dropped, non-null attribute */
Assert(natts == rel->attrmap->maplen); Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
@ -637,7 +627,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
Assert(remoteattnum < tupleData->ncols); Assert(remoteattnum < tupleData->ncols);
errarg.remote_attnum = remoteattnum; /* Set attnum for error callback */
apply_error_callback_arg.remote_attnum = remoteattnum;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{ {
@ -685,7 +676,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
slot->tts_isnull[i] = true; slot->tts_isnull[i] = true;
} }
errarg.remote_attnum = -1; /* Reset attnum for error callback */
apply_error_callback_arg.remote_attnum = -1;
} }
else else
{ {
@ -699,9 +691,6 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
} }
} }
/* Pop the error context stack */
error_context_stack = errcallback.previous;
ExecStoreVirtualTuple(slot); ExecStoreVirtualTuple(slot);
} }
@ -724,8 +713,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
{ {
int natts = slot->tts_tupleDescriptor->natts; int natts = slot->tts_tupleDescriptor->natts;
int i; int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
/* We'll fill "slot" with a virtual tuple, so we must start with ... */ /* We'll fill "slot" with a virtual tuple, so we must start with ... */
ExecClearTuple(slot); ExecClearTuple(slot);
@ -739,14 +726,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum)); memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
/* For error reporting, push callback + info on the error context stack */
errarg.rel = rel;
errarg.remote_attnum = -1;
errcallback.callback = slot_store_error_callback;
errcallback.arg = (void *) &errarg;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* Call the "in" function for each replaced attribute */ /* Call the "in" function for each replaced attribute */
Assert(natts == rel->attrmap->maplen); Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
@ -763,7 +742,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
{ {
StringInfo colvalue = &tupleData->colvalues[remoteattnum]; StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.remote_attnum = remoteattnum; /* Set attnum for error callback */
apply_error_callback_arg.remote_attnum = remoteattnum;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{ {
@ -807,13 +787,11 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
slot->tts_isnull[i] = true; slot->tts_isnull[i] = true;
} }
errarg.remote_attnum = -1; /* Reset attnum for error callback */
apply_error_callback_arg.remote_attnum = -1;
} }
} }
/* Pop the error context stack */
error_context_stack = errcallback.previous;
/* And finally, declare that "slot" contains a valid virtual tuple */ /* And finally, declare that "slot" contains a valid virtual tuple */
ExecStoreVirtualTuple(slot); ExecStoreVirtualTuple(slot);
} }
@ -827,6 +805,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data; LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data); logicalrep_read_begin(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.committime);
remote_final_lsn = begin_data.final_lsn; remote_final_lsn = begin_data.final_lsn;
@ -860,6 +839,7 @@ apply_handle_commit(StringInfo s)
process_syncing_tables(commit_data.end_lsn); process_syncing_tables(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -877,6 +857,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data); logicalrep_read_begin_prepare(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
remote_final_lsn = begin_data.prepare_lsn; remote_final_lsn = begin_data.prepare_lsn;
@ -962,6 +943,7 @@ apply_handle_prepare(StringInfo s)
process_syncing_tables(prepare_data.end_lsn); process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -974,6 +956,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE]; char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data); logicalrep_read_commit_prepared(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
/* Compute GID for two_phase transactions. */ /* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@ -1001,6 +984,7 @@ apply_handle_commit_prepared(StringInfo s)
process_syncing_tables(prepare_data.end_lsn); process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -1013,6 +997,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE]; char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data); logicalrep_read_rollback_prepared(s, &rollback_data);
set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
/* Compute GID for two_phase transactions. */ /* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@ -1050,6 +1035,7 @@ apply_handle_rollback_prepared(StringInfo s)
process_syncing_tables(rollback_data.rollback_end_lsn); process_syncing_tables(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -1076,6 +1062,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message"))); errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data); logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@ -1100,6 +1087,8 @@ apply_handle_stream_prepare(StringInfo s)
process_syncing_tables(prepare_data.end_lsn); process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -1156,6 +1145,8 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction"))); errmsg_internal("invalid transaction ID in streamed replication transaction")));
set_apply_error_context_xact(stream_xid, 0);
/* /*
* Initialize the xidhash table if we haven't yet. This will be used for * Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent * the entire duration of the apply worker so create it in permanent
@ -1212,6 +1203,7 @@ apply_handle_stream_stop(StringInfo s)
MemoryContextReset(LogicalStreamingContext); MemoryContextReset(LogicalStreamingContext);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -1235,7 +1227,10 @@ apply_handle_stream_abort(StringInfo s)
* just delete the files with serialized info. * just delete the files with serialized info.
*/ */
if (xid == subxid) if (xid == subxid)
{
set_apply_error_context_xact(xid, 0);
stream_cleanup_files(MyLogicalRepWorker->subid, xid); stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else else
{ {
/* /*
@ -1260,6 +1255,8 @@ apply_handle_stream_abort(StringInfo s)
char path[MAXPGPATH]; char path[MAXPGPATH];
StreamXidHash *ent; StreamXidHash *ent;
set_apply_error_context_xact(subxid, 0);
subidx = -1; subidx = -1;
begin_replication_step(); begin_replication_step();
subxact_info_read(MyLogicalRepWorker->subid, xid); subxact_info_read(MyLogicalRepWorker->subid, xid);
@ -1284,6 +1281,7 @@ apply_handle_stream_abort(StringInfo s)
cleanup_subxact_info(); cleanup_subxact_info();
end_replication_step(); end_replication_step();
CommitTransactionCommand(); CommitTransactionCommand();
reset_apply_error_context_info();
return; return;
} }
@ -1315,6 +1313,8 @@ apply_handle_stream_abort(StringInfo s)
end_replication_step(); end_replication_step();
CommitTransactionCommand(); CommitTransactionCommand();
} }
reset_apply_error_context_info();
} }
/* /*
@ -1459,6 +1459,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP"))); errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data); xid = logicalrep_read_stream_commit(s, &commit_data);
set_apply_error_context_xact(xid, commit_data.committime);
elog(DEBUG1, "received commit for streamed transaction %u", xid); elog(DEBUG1, "received commit for streamed transaction %u", xid);
@ -1473,6 +1474,8 @@ apply_handle_stream_commit(StringInfo s)
process_syncing_tables(commit_data.end_lsn); process_syncing_tables(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
} }
/* /*
@ -1592,6 +1595,9 @@ apply_handle_insert(StringInfo s)
return; return;
} }
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
/* Initialize the executor state. */ /* Initialize the executor state. */
edata = create_edata_for_relation(rel); edata = create_edata_for_relation(rel);
estate = edata->estate; estate = edata->estate;
@ -1615,6 +1621,9 @@ apply_handle_insert(StringInfo s)
finish_edata(edata); finish_edata(edata);
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
end_replication_step(); end_replication_step();
@ -1713,6 +1722,9 @@ apply_handle_update(StringInfo s)
return; return;
} }
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
/* Check if we can do the update. */ /* Check if we can do the update. */
check_relation_updatable(rel); check_relation_updatable(rel);
@ -1766,6 +1778,9 @@ apply_handle_update(StringInfo s)
finish_edata(edata); finish_edata(edata);
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
end_replication_step(); end_replication_step();
@ -1869,6 +1884,9 @@ apply_handle_delete(StringInfo s)
return; return;
} }
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
/* Check if we can do the delete. */ /* Check if we can do the delete. */
check_relation_updatable(rel); check_relation_updatable(rel);
@ -1894,6 +1912,9 @@ apply_handle_delete(StringInfo s)
finish_edata(edata); finish_edata(edata);
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
end_replication_step(); end_replication_step();
@ -2328,44 +2349,53 @@ static void
apply_dispatch(StringInfo s) apply_dispatch(StringInfo s)
{ {
LogicalRepMsgType action = pq_getmsgbyte(s); LogicalRepMsgType action = pq_getmsgbyte(s);
LogicalRepMsgType saved_command;
/*
* Set the current command being applied. Since this function can be
* called recusively when applying spooled changes, save the current
* command.
*/
saved_command = apply_error_callback_arg.command;
apply_error_callback_arg.command = action;
switch (action) switch (action)
{ {
case LOGICAL_REP_MSG_BEGIN: case LOGICAL_REP_MSG_BEGIN:
apply_handle_begin(s); apply_handle_begin(s);
return; break;
case LOGICAL_REP_MSG_COMMIT: case LOGICAL_REP_MSG_COMMIT:
apply_handle_commit(s); apply_handle_commit(s);
return; break;
case LOGICAL_REP_MSG_INSERT: case LOGICAL_REP_MSG_INSERT:
apply_handle_insert(s); apply_handle_insert(s);
return; break;
case LOGICAL_REP_MSG_UPDATE: case LOGICAL_REP_MSG_UPDATE:
apply_handle_update(s); apply_handle_update(s);
return; break;
case LOGICAL_REP_MSG_DELETE: case LOGICAL_REP_MSG_DELETE:
apply_handle_delete(s); apply_handle_delete(s);
return; break;
case LOGICAL_REP_MSG_TRUNCATE: case LOGICAL_REP_MSG_TRUNCATE:
apply_handle_truncate(s); apply_handle_truncate(s);
return; break;
case LOGICAL_REP_MSG_RELATION: case LOGICAL_REP_MSG_RELATION:
apply_handle_relation(s); apply_handle_relation(s);
return; break;
case LOGICAL_REP_MSG_TYPE: case LOGICAL_REP_MSG_TYPE:
apply_handle_type(s); apply_handle_type(s);
return; break;
case LOGICAL_REP_MSG_ORIGIN: case LOGICAL_REP_MSG_ORIGIN:
apply_handle_origin(s); apply_handle_origin(s);
return; break;
case LOGICAL_REP_MSG_MESSAGE: case LOGICAL_REP_MSG_MESSAGE:
@ -2374,49 +2404,52 @@ apply_dispatch(StringInfo s)
* Although, it could be used by other applications that use this * Although, it could be used by other applications that use this
* output plugin. * output plugin.
*/ */
return; break;
case LOGICAL_REP_MSG_STREAM_START: case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s); apply_handle_stream_start(s);
return; break;
case LOGICAL_REP_MSG_STREAM_STOP: case LOGICAL_REP_MSG_STREAM_STOP:
apply_handle_stream_stop(s); apply_handle_stream_stop(s);
return; break;
case LOGICAL_REP_MSG_STREAM_ABORT: case LOGICAL_REP_MSG_STREAM_ABORT:
apply_handle_stream_abort(s); apply_handle_stream_abort(s);
return; break;
case LOGICAL_REP_MSG_STREAM_COMMIT: case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s); apply_handle_stream_commit(s);
return; break;
case LOGICAL_REP_MSG_BEGIN_PREPARE: case LOGICAL_REP_MSG_BEGIN_PREPARE:
apply_handle_begin_prepare(s); apply_handle_begin_prepare(s);
return; break;
case LOGICAL_REP_MSG_PREPARE: case LOGICAL_REP_MSG_PREPARE:
apply_handle_prepare(s); apply_handle_prepare(s);
return; break;
case LOGICAL_REP_MSG_COMMIT_PREPARED: case LOGICAL_REP_MSG_COMMIT_PREPARED:
apply_handle_commit_prepared(s); apply_handle_commit_prepared(s);
return; break;
case LOGICAL_REP_MSG_ROLLBACK_PREPARED: case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
apply_handle_rollback_prepared(s); apply_handle_rollback_prepared(s);
return; break;
case LOGICAL_REP_MSG_STREAM_PREPARE: case LOGICAL_REP_MSG_STREAM_PREPARE:
apply_handle_stream_prepare(s); apply_handle_stream_prepare(s);
return; break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid logical replication message type \"%c\"", action)));
} }
ereport(ERROR, /* Reset the current command */
(errcode(ERRCODE_PROTOCOL_VIOLATION), apply_error_callback_arg.command = saved_command;
errmsg_internal("invalid logical replication message type \"%c\"",
action)));
} }
/* /*
@ -2517,6 +2550,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp(); TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false; bool ping_sent = false;
TimeLineID tli; TimeLineID tli;
ErrorContextCallback errcallback;
/* /*
* Init the ApplyMessageContext which we clean up after each replication * Init the ApplyMessageContext which we clean up after each replication
@ -2537,6 +2571,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */ /* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
/*
* Push apply error context callback. Fields will be filled during
* applying a change.
*/
errcallback.callback = apply_error_callback;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* This outer loop iterates once per wait. */ /* This outer loop iterates once per wait. */
for (;;) for (;;)
{ {
@ -2737,6 +2779,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
} }
} }
/* Pop the error context stack */
error_context_stack = errcallback.previous;
/* All done */ /* All done */
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
} }
@ -3649,3 +3694,59 @@ IsLogicalWorker(void)
{ {
return MyLogicalRepWorker != NULL; return MyLogicalRepWorker != NULL;
} }
/* Error callback to give more context info about the change being applied */
static void
apply_error_callback(void *arg)
{
StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
initStringInfo(&buf);
appendStringInfo(&buf, _("processing remote data during \"%s\""),
logicalrep_message_type(errarg->command));
/* append relation information */
if (errarg->rel)
{
appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname);
if (errarg->remote_attnum >= 0)
appendStringInfo(&buf, _(" column \"%s\""),
errarg->rel->remoterel.attnames[errarg->remote_attnum]);
}
/* append transaction information */
if (TransactionIdIsNormal(errarg->remote_xid))
{
appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
if (errarg->ts != 0)
appendStringInfo(&buf, _(" at %s"),
timestamptz_to_str(errarg->ts));
}
errcontext("%s", buf.data);
pfree(buf.data);
}
/* Set transaction information of apply error callback */
static inline void
set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
{
apply_error_callback_arg.remote_xid = xid;
apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
static inline void
reset_apply_error_context_info(void)
{
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId, 0);
}

View File

@ -246,5 +246,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid); TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid); TransactionId *subxid);
extern char *logicalrep_message_type(LogicalRepMsgType action);
#endif /* LOGICAL_PROTO_H */ #endif /* LOGICAL_PROTO_H */

View File

@ -113,6 +113,7 @@ Append
AppendPath AppendPath
AppendRelInfo AppendRelInfo
AppendState AppendState
ApplyErrorCallbackArg
ApplyExecutionData ApplyExecutionData
ApplySubXactData ApplySubXactData
Archive Archive
@ -2423,7 +2424,6 @@ SlabBlock
SlabChunk SlabChunk
SlabContext SlabContext
SlabSlot SlabSlot
SlotErrCallbackArg
SlotNumber SlotNumber
SlruCtl SlruCtl
SlruCtlData SlruCtlData