Add flush option to pg_logical_emit_message()

Since its introduction, LogLogicalMessage() (via the SQL interface
pg_logical_emit_message()) has never included a call to XLogFlush(),
causing it to potentially lose messages on a crash when used in
non-transactional mode.  This has come up to me as a problem while
playing with ideas to design a test suite for what has become
039_end_of_wal.pl introduced in bae868caf2 by Thomas Munro, because
there are no direct ways to force a WAL flush via SQL.

The default is false, to not flush messages and influence existing
use-cases where this function could be used.  If set to true, the
message emitted is flushed before returning back to the caller, making
the message durable on crash.  This new option has no effect when using
pg_logical_emit_message() in transactional mode, as the record's flush
is guaranteed by the WAL record generated by the transaction committed.

Two queries of test_decoding are tweaked to cover the new code path for
the flush.

Bump catalog version.

Author: Michael Paquier
Reviewed-by: Andres Freund, Amit Kapila, Fujii Masao, Tung Nguyen, Tomas
Vondra
Discussion: https://postgr.es/m/ZNsdThSe2qgsfs7R@paquier.xyz
This commit is contained in:
Michael Paquier 2023-10-18 11:24:59 +09:00
parent 19fa977311
commit 173b56f1ef
9 changed files with 51 additions and 13 deletions

View File

@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
init
(1 row)
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
-- These two cover the path for the flush variant.
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
?column?
----------
msg1
(1 row)
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
?column?
----------
msg2

View File

@ -3,8 +3,9 @@ SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
-- These two cover the path for the flush variant.
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
BEGIN;
SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');

View File

@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_logical_emit_message</primary>
</indexterm>
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
<returnvalue>pg_lsn</returnvalue>
</para>
<para role="func_signature">
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
<returnvalue>pg_lsn</returnvalue>
</para>
<para>
@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
recognize messages that are interesting for them.
The <parameter>content</parameter> parameter is the content of the
message, given either in text or binary form.
The <parameter>flush</parameter> parameter (default set to
<literal>false</literal>) controls if the message is immediately
flushed to WAL or not. <parameter>flush</parameter> has no effect
with <parameter>transactional</parameter>, as the message's WAL
record is flushed along with its transaction.
</para></entry>
</row>
</tbody>

View File

@ -446,6 +446,26 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
CREATE OR REPLACE FUNCTION pg_logical_emit_message(
transactional boolean,
prefix text,
message text,
flush boolean DEFAULT false)
RETURNS pg_lsn
LANGUAGE INTERNAL
STRICT VOLATILE
AS 'pg_logical_emit_message_text';
CREATE OR REPLACE FUNCTION pg_logical_emit_message(
transactional boolean,
prefix text,
message bytea,
flush boolean DEFAULT false)
RETURNS pg_lsn
LANGUAGE INTERNAL
STRICT VOLATILE
AS 'pg_logical_emit_message_bytea';
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
IN temporary boolean DEFAULT false,

View File

@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
bool transactional = PG_GETARG_BOOL(0);
char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
bytea *data = PG_GETARG_BYTEA_PP(2);
bool flush = PG_GETARG_BOOL(3);
XLogRecPtr lsn;
lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
transactional);
transactional, flush);
PG_RETURN_LSN(lsn);
}

View File

@ -44,9 +44,10 @@
*/
XLogRecPtr
LogLogicalMessage(const char *prefix, const char *message, size_t size,
bool transactional)
bool transactional, bool flush)
{
xl_logical_message xlrec;
XLogRecPtr lsn;
/*
* Force xid to be allocated if we're emitting a transactional message.
@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
/* allow origin filtering */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
/*
* Make sure that the message hits disk before leaving if emitting a
* non-transactional message when flush is requested.
*/
if (!transactional && flush)
XLogFlush(lsn);
return lsn;
}
/*

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202310161
#define CATALOG_VERSION_NO 202310181
#endif

View File

@ -11167,11 +11167,11 @@
prosrc => 'pg_replication_slot_advance' },
{ oid => '3577', descr => 'emit a textual logical decoding message',
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
prorettype => 'pg_lsn', proargtypes => 'bool text text',
prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
prosrc => 'pg_logical_emit_message_text' },
{ oid => '3578', descr => 'emit a binary logical decoding message',
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
prosrc => 'pg_logical_emit_message_bytea' },
# event triggers

View File

@ -30,7 +30,8 @@ typedef struct xl_logical_message
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
size_t size, bool transactional);
size_t size, bool transactional,
bool flush);
/* RMGR API */
#define XLOG_LOGICAL_MESSAGE 0x00