Make walsenders show their replication commands in pg_stat_activity.

A walsender process that has executed a SQL command left the text of
that command in pg_stat_activity.query indefinitely, which is quite
confusing if it's in RUNNING state but not doing that query.  An easy
and useful fix is to treat replication commands as if they were SQL
queries, and show them in pg_stat_activity according to the same rules
as for regular queries.  While we're at it, it seems also sensible to
set debug_query_string, allowing error logging and debugging to see
the replication command.

While here, clean up assorted silliness in exec_replication_command:
* Clean up SQLCmd code path, and fix its only-accidentally-not-buggy
  memory management.
* Remove useless duplicate call of SnapBuildClearExportedSnapshot().
* replication_scanner_finish() was never called.

Back-patch of commit f560209c6 into v10-v13.  I'd originally felt
that this didn't merit back-patching, but subsequent confusion
while debugging walsender problems suggests that it'll be useful.
Also, the original commit has now aged long enough to provide some
comfort that it won't cause problems.

Discussion: https://postgr.es/m/2673480.1624557299@sss.pgh.pa.us
Discussion: https://postgr.es/m/880181.1600026471@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2021-06-25 10:46:10 -04:00
parent c4c9c77e56
commit c39983600b
1 changed files with 37 additions and 31 deletions

View File

@ -1457,6 +1457,9 @@ exec_replication_command(const char *cmd_string)
CHECK_FOR_INTERRUPTS();
/*
* Parse the command.
*/
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_SIZES);
@ -1467,33 +1470,49 @@ exec_replication_command(const char *cmd_string)
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg_internal("replication command parser returned %d",
parse_rc))));
errmsg_internal("replication command parser returned %d",
parse_rc)));
replication_scanner_finish();
cmd_node = replication_parse_result;
/*
* If it's a SQL command, just clean up our mess and return false; the
* caller will take care of executing it.
*/
if (IsA(cmd_node, SQLCmd))
{
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
/* Tell the caller that this wasn't a WalSender command. */
return false;
}
/*
* Report query to various monitoring facilities. For this purpose, we
* report replication commands just like SQL commands.
*/
debug_query_string = cmd_string;
pgstat_report_activity(STATE_RUNNING, cmd_string);
/*
* Log replication command if log_replication_commands is enabled. Even
* when it's disabled, log the command with DEBUG1 level for backward
* compatibility. Note that SQL commands are not logged here, and will be
* logged later if log_statement is enabled.
* compatibility.
*/
if (cmd_node->type != T_SQLCmd)
ereport(log_replication_commands ? LOG : DEBUG1,
(errmsg("received replication command: %s", cmd_string)));
ereport(log_replication_commands ? LOG : DEBUG1,
(errmsg("received replication command: %s", cmd_string)));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
* called outside of transaction the snapshot should be cleared here.
* Disallow replication commands in aborted transaction blocks.
*/
if (!IsTransactionBlock())
SnapBuildClearExportedSnapshot();
/*
* For aborted transactions, don't allow anything except pure SQL, the
* exec_simple_query() will handle it correctly.
*/
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
if (IsAbortedTransactionBlockState())
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
@ -1509,9 +1528,6 @@ exec_replication_command(const char *cmd_string)
initStringInfo(&reply_message);
initStringInfo(&tmpbuf);
/* Report to pgstat that this process is running */
pgstat_report_activity(STATE_RUNNING, NULL);
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
@ -1561,17 +1577,6 @@ exec_replication_command(const char *cmd_string)
}
break;
case T_SQLCmd:
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
/* Tell the caller that this wasn't a WalSender command. */
return false;
default:
elog(ERROR, "unrecognized replication command node tag: %u",
cmd_node->type);
@ -1586,6 +1591,7 @@ exec_replication_command(const char *cmd_string)
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
debug_query_string = NULL;
return true;
}