diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 9eedab652d..3071c8eace 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11597,6 +11597,34 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + logical_decoding_mode (enum) + + logical_decoding_mode configuration parameter + + + + + Allows streaming or serializing changes immediately in logical decoding. + The allowed values of logical_decoding_mode are + buffered and immediate. When set + to immediate, stream each change if + streaming option (see optional parameters set by + CREATE SUBSCRIPTION) + is enabled, otherwise, serialize each change. When set to + buffered, which is the default, decoding will stream + or serialize changes when logical_decoding_work_mem + is reached. + + + This parameter is intended to be used to test logical decoding and + replication of large transactions for which otherwise we need to + generate the changes till logical_decoding_work_mem + is reached. + + + + diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b567b8b59e..92204bd9cd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* GUC variable */ +int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to - * disk until we reach under the memory limit. + * disk or send to the output plugin until we reach under the memory limit. + * + * If logical_decoding_mode is set to "immediate", stream or serialize the changes + * immediately. * * XXX At this point we select the transactions until we reach under the memory * limit, but we might also adapt a more elaborate eviction strategy - for example @@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Bail out if logical_decoding_mode is buffered and we haven't exceeded + * the memory limit. + */ + if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED && + rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the - * logical_decoding_work_mem to a smaller value before the most recent + * If logical_decoding_mode is immediate, loop until there's no change. + * Otherwise, loop until we reach under the memory limit. One might think + * that just by evicting the largest (sub)transaction we will come under + * the memory limit based on assumption that the selected transaction is + * at least as large as the most recent change (which caused us to go over + * the memory limit). However, that is not true because a user can reduce + * the logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while (rb->size >= logical_decoding_work_mem * 1024L || + (logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE && + rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 436afe1d21..a37c9f9844 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = { {NULL, 0, false} }; +static const struct config_enum_entry logical_decoding_mode_options[] = { + {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false}, + {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false}, + {NULL, 0, false} +}; + StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2), "array length mismatch"); @@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Allows streaming or serializing each change in logical decoding."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &logical_decoding_mode, + LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index c700b55b1c..b27a43618a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -18,6 +18,14 @@ #include "utils/timestamp.h" extern PGDLLIMPORT int logical_decoding_work_mem; +extern PGDLLIMPORT int logical_decoding_mode; + +/* possible values for logical_decoding_mode */ +typedef enum +{ + LOGICAL_DECODING_MODE_BUFFERED, + LOGICAL_DECODING_MODE_IMMEDIATE +} LogicalDecodingMode; /* an individual tuple, stored in one chunk of memory */ typedef struct ReorderBufferTupleBuf diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index bc0a9cd053..db29f089a0 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -1,7 +1,7 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test streaming of large transaction containing large subtransactions +# Test streaming of transaction containing subtransactions use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -12,7 +12,7 @@ use Test::More; my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); + 'logical_decoding_mode = immediate'); $node_publisher->start; # Create subscriber node @@ -49,27 +49,27 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Insert, update and delete enough rows to exceed 64kB limit. +# Insert, update and delete some rows. $node_publisher->safe_psql( 'postgres', q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; COMMIT; @@ -80,7 +80,7 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(1667|1667|1667), +is($result, qq(12|12|12), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' ); diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 551f16df6d..1458c3a0fc 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -1,7 +1,7 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test streaming of large transaction containing multiple subtransactions and rollbacks +# Test streaming of transaction containing multiple subtransactions and rollbacks use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -12,7 +12,7 @@ use Test::More; my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); + 'logical_decoding_mode = immediate'); $node_publisher->start; # Create subscriber node @@ -48,25 +48,25 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# large (streamed) transaction with DDL, DML and ROLLBACKs +# streamed transaction with DDL, DML and ROLLBACKs $node_publisher->safe_psql( 'postgres', q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +INSERT INTO test_tab VALUES (3, md5(3::text)); SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +INSERT INTO test_tab VALUES (4, md5(4::text)); SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +INSERT INTO test_tab VALUES (5, md5(5::text)); SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +INSERT INTO test_tab VALUES (6, md5(6::text)); ROLLBACK TO s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +INSERT INTO test_tab VALUES (7, md5(7::text)); ROLLBACK TO s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +INSERT INTO test_tab VALUES (8, md5(8::text)); SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i); +INSERT INTO test_tab VALUES (9, md5(9::text)); SAVEPOINT s5; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i); +INSERT INTO test_tab VALUES (10, md5(10::text)); COMMIT; }); @@ -75,24 +75,24 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2000|0), +is($result, qq(6|0), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' ); -# large (streamed) transaction with subscriber receiving out of order -# subtransaction ROLLBACKs +# streamed transaction with subscriber receiving out of order subtransaction +# ROLLBACKs $node_publisher->safe_psql( 'postgres', q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i); +INSERT INTO test_tab VALUES (11, md5(11::text)); SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i); +INSERT INTO test_tab VALUES (12, md5(12::text)); SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i); +INSERT INTO test_tab VALUES (13, md5(13::text)); SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i); +INSERT INTO test_tab VALUES (14, md5(14::text)); RELEASE s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i); +INSERT INTO test_tab VALUES (15, md5(15::text)); ROLLBACK TO s1; COMMIT; }); @@ -102,18 +102,18 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2500|0), +is($result, qq(7|0), 'check rollback to savepoint was reflected on subscriber'); -# large (streamed) transaction with subscriber receiving rollback +# streamed transaction with subscriber receiving rollback $node_publisher->safe_psql( 'postgres', q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i); +INSERT INTO test_tab VALUES (16, md5(16::text)); SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i); +INSERT INTO test_tab VALUES (17, md5(17::text)); SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i); +INSERT INTO test_tab VALUES (18, md5(18::text)); ROLLBACK; }); @@ -122,7 +122,7 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2500|0), 'check rollback was reflected on subscriber'); +is($result, qq(7|0), 'check rollback was reflected on subscriber'); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index 4d7da82b7a..c6719c1af8 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -1,7 +1,7 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test streaming of large transaction with subtransactions, DDLs, DMLs, and +# Test streaming of transaction with subtransactions, DDLs, DMLs, and # rollbacks use strict; use warnings; @@ -13,7 +13,7 @@ use Test::More; my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); + 'logical_decoding_mode = immediate'); $node_publisher->start; # Create subscriber node @@ -49,23 +49,23 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# large (streamed) transaction with DDL, DML and ROLLBACKs +# streamed transaction with DDL, DML and ROLLBACKs $node_publisher->safe_psql( 'postgres', q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +INSERT INTO test_tab VALUES (3, md5(3::text)); ALTER TABLE test_tab ADD COLUMN c INT; SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +INSERT INTO test_tab VALUES (4, md5(4::text), -4); ALTER TABLE test_tab ADD COLUMN d INT; SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +INSERT INTO test_tab VALUES (5, md5(5::text), -5, 5*2); ALTER TABLE test_tab ADD COLUMN e INT; SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +INSERT INTO test_tab VALUES (6, md5(6::text), -6, 6*2, -6*3); ALTER TABLE test_tab DROP COLUMN c; ROLLBACK TO s1; -INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +INSERT INTO test_tab VALUES (4, md5(4::text), 4); COMMIT; }); @@ -74,7 +74,7 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(1000|500), +is($result, qq(4|1), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' ); diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 9b454106bd..a191129b9d 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -18,7 +18,7 @@ $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf( 'postgresql.conf', qq( max_prepared_transactions = 10 -logical_decoding_work_mem = 64kB +logical_decoding_mode = immediate )); $node_publisher->start; @@ -80,11 +80,11 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber'); ############################### # check that 2PC gets replicated to subscriber -# Insert, update and delete enough rows to exceed the 64kB limit. +# Insert, update and delete some rows. $node_publisher->safe_psql( 'postgres', q{ BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); @@ -105,7 +105,7 @@ $node_publisher->wait_for_catchup($appname); # check that transaction is committed on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), +is($result, qq(4|4|4), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' ); $result = $node_subscriber->safe_psql('postgres', @@ -124,11 +124,11 @@ is($result, qq(0), 'transaction is committed on subscriber'); # First, delete the data except for 2 rows (will be replicated) $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); -# Then insert, update and delete enough rows to exceed the 64kB limit. +# Then insert, update and delete some rows. $node_publisher->safe_psql( 'postgres', q{ BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); @@ -158,7 +158,7 @@ is($result, qq(0), 'transaction is aborted on subscriber'); ############################### # Check that 2PC COMMIT PREPARED is decoded properly on crash restart. -# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 1. insert, update and delete some rows. # 2. Then server crashes before the 2PC transaction is committed. # 3. After servers are restarted the pending transaction is committed. # @@ -169,7 +169,7 @@ is($result, qq(0), 'transaction is aborted on subscriber'); $node_publisher->safe_psql( 'postgres', q{ BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); @@ -188,7 +188,7 @@ $node_publisher->wait_for_catchup($appname); # check inserts are visible $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), +is($result, qq(4|4|4), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' ); @@ -206,11 +206,11 @@ is($result, qq(3334|3334|3334), # First, delete the data except for 2 rows (will be replicated) $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); -# Then insert, update and delete enough rows to exceed the 64kB limit. +# Then insert, update and delete some rows. $node_publisher->safe_psql( 'postgres', q{ BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); @@ -257,11 +257,11 @@ is($result, qq(0), 'transaction is aborted on subscriber'); # First, delete the data except for 2 rows (will be replicated) $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); -# Then insert, update and delete enough rows to exceed the 64kB limit. +# Then insert, update and delete some rows. $node_publisher->safe_psql( 'postgres', q{ BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); @@ -287,7 +287,7 @@ $node_publisher->wait_for_catchup($appname); # check that transaction is committed on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3335|3335|3335), +is($result, qq(5|5|5), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' ); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 60c71d05fe..50d86cb01b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1456,6 +1456,7 @@ LogicalDecodeStreamStopCB LogicalDecodeStreamTruncateCB LogicalDecodeTruncateCB LogicalDecodingContext +LogicalDecodingMode LogicalErrorCallbackState LogicalOutputPluginInit LogicalOutputPluginWriterPrepareWrite