diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a5f558353f..80cdb508dd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1457,6 +1457,9 @@ exec_replication_command(const char *cmd_string) CHECK_FOR_INTERRUPTS(); + /* + * Parse the command. + */ cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_SIZES); @@ -1467,33 +1470,49 @@ exec_replication_command(const char *cmd_string) if (parse_rc != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - (errmsg_internal("replication command parser returned %d", - parse_rc)))); + errmsg_internal("replication command parser returned %d", + parse_rc))); + replication_scanner_finish(); cmd_node = replication_parse_result; + /* + * If it's a SQL command, just clean up our mess and return false; the + * caller will take care of executing it. + */ + if (IsA(cmd_node, SQLCmd)) + { + if (MyDatabaseId == InvalidOid) + ereport(ERROR, + (errmsg("cannot execute SQL commands in WAL sender for physical replication"))); + + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + /* Tell the caller that this wasn't a WalSender command. */ + return false; + } + + /* + * Report query to various monitoring facilities. For this purpose, we + * report replication commands just like SQL commands. + */ + debug_query_string = cmd_string; + + pgstat_report_activity(STATE_RUNNING, cmd_string); + /* * Log replication command if log_replication_commands is enabled. Even * when it's disabled, log the command with DEBUG1 level for backward - * compatibility. Note that SQL commands are not logged here, and will be - * logged later if log_statement is enabled. + * compatibility. */ - if (cmd_node->type != T_SQLCmd) - ereport(log_replication_commands ? LOG : DEBUG1, - (errmsg("received replication command: %s", cmd_string))); + ereport(log_replication_commands ? LOG : DEBUG1, + (errmsg("received replication command: %s", cmd_string))); /* - * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was - * called outside of transaction the snapshot should be cleared here. + * Disallow replication commands in aborted transaction blocks. */ - if (!IsTransactionBlock()) - SnapBuildClearExportedSnapshot(); - - /* - * For aborted transactions, don't allow anything except pure SQL, the - * exec_simple_query() will handle it correctly. - */ - if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) + if (IsAbortedTransactionBlockState()) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " @@ -1509,9 +1528,6 @@ exec_replication_command(const char *cmd_string) initStringInfo(&reply_message); initStringInfo(&tmpbuf); - /* Report to pgstat that this process is running */ - pgstat_report_activity(STATE_RUNNING, NULL); - switch (cmd_node->type) { case T_IdentifySystemCmd: @@ -1561,17 +1577,6 @@ exec_replication_command(const char *cmd_string) } break; - case T_SQLCmd: - if (MyDatabaseId == InvalidOid) - ereport(ERROR, - (errmsg("cannot execute SQL commands in WAL sender for physical replication"))); - - /* Report to pgstat that this process is now idle */ - pgstat_report_activity(STATE_IDLE, NULL); - - /* Tell the caller that this wasn't a WalSender command. */ - return false; - default: elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); @@ -1586,6 +1591,7 @@ exec_replication_command(const char *cmd_string) /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); + debug_query_string = NULL; return true; }