Logical decoding of TRUNCATE
Add a new WAL record type for TRUNCATE, which is only used when wal_level >= logical. (For physical replication, TRUNCATE is already replicated via SMGR records.) Add new callback for logical decoding output plugins to receive TRUNCATE actions. Author: Simon Riggs <simon@2ndquadrant.com> Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it> Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
This commit is contained in:
parent
b508a56f2f
commit
5dfd1e5a66
|
@ -39,7 +39,7 @@ submake-test_decoding:
|
||||||
|
|
||||||
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
|
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
|
||||||
decoding_into_rel binary prepared replorigin time messages \
|
decoding_into_rel binary prepared replorigin time messages \
|
||||||
spill slot
|
spill slot truncate
|
||||||
|
|
||||||
regresscheck: | submake-regress submake-test_decoding temp-install
|
regresscheck: | submake-regress submake-test_decoding temp-install
|
||||||
$(pg_regress_check) \
|
$(pg_regress_check) \
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
||||||
|
?column?
|
||||||
|
----------
|
||||||
|
init
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE tab1 (id serial unique, data int);
|
||||||
|
CREATE TABLE tab2 (a int primary key, b int);
|
||||||
|
TRUNCATE tab1;
|
||||||
|
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
|
||||||
|
TRUNCATE tab1, tab2;
|
||||||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||||
|
data
|
||||||
|
------------------------------------------------------
|
||||||
|
BEGIN
|
||||||
|
table public.tab1: TRUNCATE: (no-flags)
|
||||||
|
COMMIT
|
||||||
|
BEGIN
|
||||||
|
table public.tab1: TRUNCATE: restart_seqs cascade
|
||||||
|
COMMIT
|
||||||
|
BEGIN
|
||||||
|
table public.tab1, public.tab2: TRUNCATE: (no-flags)
|
||||||
|
COMMIT
|
||||||
|
(9 rows)
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
||||||
|
|
||||||
|
CREATE TABLE tab1 (id serial unique, data int);
|
||||||
|
CREATE TABLE tab2 (a int primary key, b int);
|
||||||
|
|
||||||
|
TRUNCATE tab1;
|
||||||
|
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
|
||||||
|
TRUNCATE tab1, tab2;
|
||||||
|
|
||||||
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
static void pg_decode_change(LogicalDecodingContext *ctx,
|
static void pg_decode_change(LogicalDecodingContext *ctx,
|
||||||
ReorderBufferTXN *txn, Relation rel,
|
ReorderBufferTXN *txn, Relation rel,
|
||||||
ReorderBufferChange *change);
|
ReorderBufferChange *change);
|
||||||
|
static void pg_decode_truncate(LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn,
|
||||||
|
int nrelations, Relation relations[],
|
||||||
|
ReorderBufferChange *change);
|
||||||
static bool pg_decode_filter(LogicalDecodingContext *ctx,
|
static bool pg_decode_filter(LogicalDecodingContext *ctx,
|
||||||
RepOriginId origin_id);
|
RepOriginId origin_id);
|
||||||
static void pg_decode_message(LogicalDecodingContext *ctx,
|
static void pg_decode_message(LogicalDecodingContext *ctx,
|
||||||
|
@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
|
||||||
cb->startup_cb = pg_decode_startup;
|
cb->startup_cb = pg_decode_startup;
|
||||||
cb->begin_cb = pg_decode_begin_txn;
|
cb->begin_cb = pg_decode_begin_txn;
|
||||||
cb->change_cb = pg_decode_change;
|
cb->change_cb = pg_decode_change;
|
||||||
|
cb->truncate_cb = pg_decode_truncate;
|
||||||
cb->commit_cb = pg_decode_commit_txn;
|
cb->commit_cb = pg_decode_commit_txn;
|
||||||
cb->filter_by_origin_cb = pg_decode_filter;
|
cb->filter_by_origin_cb = pg_decode_filter;
|
||||||
cb->shutdown_cb = pg_decode_shutdown;
|
cb->shutdown_cb = pg_decode_shutdown;
|
||||||
|
@ -480,6 +485,59 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
|
int nrelations, Relation relations[], ReorderBufferChange *change)
|
||||||
|
{
|
||||||
|
TestDecodingData *data;
|
||||||
|
MemoryContext old;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
|
||||||
|
/* output BEGIN if we haven't yet */
|
||||||
|
if (data->skip_empty_xacts && !data->xact_wrote_changes)
|
||||||
|
{
|
||||||
|
pg_output_begin(ctx, data, txn, false);
|
||||||
|
}
|
||||||
|
data->xact_wrote_changes = true;
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
|
||||||
|
appendStringInfoString(ctx->out, "table ");
|
||||||
|
|
||||||
|
for (i = 0; i < nrelations; i++)
|
||||||
|
{
|
||||||
|
if (i > 0)
|
||||||
|
appendStringInfoString(ctx->out, ", ");
|
||||||
|
|
||||||
|
appendStringInfoString(ctx->out,
|
||||||
|
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
|
||||||
|
NameStr(relations[i]->rd_rel->relname)));
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(ctx->out, ": TRUNCATE:");
|
||||||
|
|
||||||
|
if (change->data.truncate.restart_seqs
|
||||||
|
|| change->data.truncate.cascade)
|
||||||
|
{
|
||||||
|
if (change->data.truncate.restart_seqs)
|
||||||
|
appendStringInfo(ctx->out, " restart_seqs");
|
||||||
|
if (change->data.truncate.cascade)
|
||||||
|
appendStringInfo(ctx->out, " cascade");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
appendStringInfoString(ctx->out, " (no-flags)");
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(old);
|
||||||
|
MemoryContextReset(data->context);
|
||||||
|
|
||||||
|
OutputPluginWrite(ctx, true);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
pg_decode_message(LogicalDecodingContext *ctx,
|
pg_decode_message(LogicalDecodingContext *ctx,
|
||||||
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
||||||
|
|
|
@ -383,6 +383,7 @@ typedef struct OutputPluginCallbacks
|
||||||
LogicalDecodeStartupCB startup_cb;
|
LogicalDecodeStartupCB startup_cb;
|
||||||
LogicalDecodeBeginCB begin_cb;
|
LogicalDecodeBeginCB begin_cb;
|
||||||
LogicalDecodeChangeCB change_cb;
|
LogicalDecodeChangeCB change_cb;
|
||||||
|
LogicalDecodeTruncateCB truncate_cb;
|
||||||
LogicalDecodeCommitCB commit_cb;
|
LogicalDecodeCommitCB commit_cb;
|
||||||
LogicalDecodeMessageCB message_cb;
|
LogicalDecodeMessageCB message_cb;
|
||||||
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
||||||
|
@ -394,8 +395,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
|
||||||
The <function>begin_cb</function>, <function>change_cb</function>
|
The <function>begin_cb</function>, <function>change_cb</function>
|
||||||
and <function>commit_cb</function> callbacks are required,
|
and <function>commit_cb</function> callbacks are required,
|
||||||
while <function>startup_cb</function>,
|
while <function>startup_cb</function>,
|
||||||
<function>filter_by_origin_cb</function>
|
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
|
||||||
and <function>shutdown_cb</function> are optional.
|
and <function>shutdown_cb</function> are optional.
|
||||||
|
If <function>truncate_cb</function> is not set but a
|
||||||
|
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
|
||||||
</para>
|
</para>
|
||||||
</sect2>
|
</sect2>
|
||||||
|
|
||||||
|
@ -590,6 +593,28 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
|
||||||
</note>
|
</note>
|
||||||
</sect3>
|
</sect3>
|
||||||
|
|
||||||
|
<sect3 id="logicaldecoding-output-plugin-truncate">
|
||||||
|
<title>Truncate Callback</title>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The <function>truncate_cb</function> callback is called for a
|
||||||
|
<command>TRUNCATE</command> command.
|
||||||
|
<programlisting>
|
||||||
|
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn,
|
||||||
|
int nrelations,
|
||||||
|
Relation relations[],
|
||||||
|
ReorderBufferChange *change);
|
||||||
|
</programlisting>
|
||||||
|
The parameters are analogous to the <function>change_cb</function>
|
||||||
|
callback. However, because <command>TRUNCATE</command> actions on
|
||||||
|
tables connected by foreign keys need to be executed together, this
|
||||||
|
callback receives an array of relations instead of just a single one.
|
||||||
|
See the description of the <xref linkend="sql-truncate"/> statement for
|
||||||
|
details.
|
||||||
|
</para>
|
||||||
|
</sect3>
|
||||||
|
|
||||||
<sect3 id="logicaldecoding-output-plugin-filter-origin">
|
<sect3 id="logicaldecoding-output-plugin-filter-origin">
|
||||||
<title>Origin Filter Callback</title>
|
<title>Origin Filter Callback</title>
|
||||||
|
|
||||||
|
|
|
@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
|
||||||
case XLOG_HEAP_UPDATE:
|
case XLOG_HEAP_UPDATE:
|
||||||
heap_xlog_update(record, false);
|
heap_xlog_update(record, false);
|
||||||
break;
|
break;
|
||||||
|
case XLOG_HEAP_TRUNCATE:
|
||||||
|
/*
|
||||||
|
* TRUNCATE is a no-op because the actions are already logged as
|
||||||
|
* SMGR WAL records. TRUNCATE WAL record only exists for logical
|
||||||
|
* decoding.
|
||||||
|
*/
|
||||||
|
break;
|
||||||
case XLOG_HEAP_HOT_UPDATE:
|
case XLOG_HEAP_HOT_UPDATE:
|
||||||
heap_xlog_update(record, true);
|
heap_xlog_update(record, true);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
|
||||||
xlrec->new_offnum,
|
xlrec->new_offnum,
|
||||||
xlrec->new_xmax);
|
xlrec->new_xmax);
|
||||||
}
|
}
|
||||||
|
else if (info == XLOG_HEAP_TRUNCATE)
|
||||||
|
{
|
||||||
|
xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
|
||||||
|
appendStringInfo(buf, "cascade ");
|
||||||
|
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
|
||||||
|
appendStringInfo(buf, "restart_seqs ");
|
||||||
|
appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
|
||||||
|
for (i = 0; i < xlrec->nrelids; i++)
|
||||||
|
appendStringInfo(buf, " %u", xlrec->relids[i]);
|
||||||
|
}
|
||||||
else if (info == XLOG_HEAP_CONFIRM)
|
else if (info == XLOG_HEAP_CONFIRM)
|
||||||
{
|
{
|
||||||
xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
|
xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
|
||||||
|
@ -186,6 +199,9 @@ heap_identify(uint8 info)
|
||||||
case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
|
case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
|
||||||
id = "HOT_UPDATE+INIT";
|
id = "HOT_UPDATE+INIT";
|
||||||
break;
|
break;
|
||||||
|
case XLOG_HEAP_TRUNCATE:
|
||||||
|
id = "TRUNCATE";
|
||||||
|
break;
|
||||||
case XLOG_HEAP_CONFIRM:
|
case XLOG_HEAP_CONFIRM:
|
||||||
id = "HEAP_CONFIRM";
|
id = "HEAP_CONFIRM";
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
|
#include "access/heapam_xlog.h"
|
||||||
#include "access/multixact.h"
|
#include "access/multixact.h"
|
||||||
#include "access/reloptions.h"
|
#include "access/reloptions.h"
|
||||||
#include "access/relscan.h"
|
#include "access/relscan.h"
|
||||||
|
@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
{
|
{
|
||||||
List *rels = NIL;
|
List *rels = NIL;
|
||||||
List *relids = NIL;
|
List *relids = NIL;
|
||||||
List *seq_relids = NIL;
|
List *relids_logged = NIL;
|
||||||
EState *estate;
|
|
||||||
ResultRelInfo *resultRelInfos;
|
|
||||||
ResultRelInfo *resultRelInfo;
|
|
||||||
SubTransactionId mySubid;
|
|
||||||
ListCell *cell;
|
ListCell *cell;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
truncate_check_rel(rel);
|
truncate_check_rel(rel);
|
||||||
rels = lappend(rels, rel);
|
rels = lappend(rels, rel);
|
||||||
relids = lappend_oid(relids, myrelid);
|
relids = lappend_oid(relids, myrelid);
|
||||||
|
/* Log this relation only if needed for logical decoding */
|
||||||
|
if (RelationIsLogicallyLogged(rel))
|
||||||
|
relids_logged = lappend_oid(relids_logged, myrelid);
|
||||||
|
|
||||||
if (recurse)
|
if (recurse)
|
||||||
{
|
{
|
||||||
|
@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
truncate_check_rel(rel);
|
truncate_check_rel(rel);
|
||||||
rels = lappend(rels, rel);
|
rels = lappend(rels, rel);
|
||||||
relids = lappend_oid(relids, childrelid);
|
relids = lappend_oid(relids, childrelid);
|
||||||
|
/* Log this relation only if needed for logical decoding */
|
||||||
|
if (RelationIsLogicallyLogged(rel))
|
||||||
|
relids_logged = lappend_oid(relids_logged, childrelid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
||||||
|
@ -1379,7 +1382,47 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
|
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ExecuteTruncateGuts(rels, relids, relids_logged,
|
||||||
|
stmt->behavior, stmt->restart_seqs);
|
||||||
|
|
||||||
|
/* And close the rels */
|
||||||
|
foreach(cell, rels)
|
||||||
|
{
|
||||||
|
Relation rel = (Relation) lfirst(cell);
|
||||||
|
|
||||||
|
heap_close(rel, NoLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteTruncateGuts
|
||||||
|
*
|
||||||
|
* Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
|
||||||
|
* command (see above) as well as replication subscribers that execute a
|
||||||
|
* replicated TRUNCATE action.
|
||||||
|
*
|
||||||
|
* explicit_rels is the list of Relations to truncate that the command
|
||||||
|
* specified. relids is the list of Oids corresponding to explicit_rels.
|
||||||
|
* relids_logged is the list of Oids (a subset of relids) that require
|
||||||
|
* WAL-logging. This is all a bit redundant, but the existing callers have
|
||||||
|
* this information handy in this form.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
|
||||||
|
DropBehavior behavior, bool restart_seqs)
|
||||||
|
{
|
||||||
|
List *rels;
|
||||||
|
List *seq_relids = NIL;
|
||||||
|
EState *estate;
|
||||||
|
ResultRelInfo *resultRelInfos;
|
||||||
|
ResultRelInfo *resultRelInfo;
|
||||||
|
SubTransactionId mySubid;
|
||||||
|
ListCell *cell;
|
||||||
|
Oid *logrelids;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
* Open, exclusive-lock, and check all the explicitly-specified relations
|
||||||
|
*
|
||||||
* In CASCADE mode, suck in all referencing relations as well. This
|
* In CASCADE mode, suck in all referencing relations as well. This
|
||||||
* requires multiple iterations to find indirectly-dependent relations. At
|
* requires multiple iterations to find indirectly-dependent relations. At
|
||||||
* each phase, we need to exclusive-lock new rels before looking for their
|
* each phase, we need to exclusive-lock new rels before looking for their
|
||||||
|
@ -1387,7 +1430,8 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
* soon as we open it, to avoid a faux pas such as holding lock for a long
|
* soon as we open it, to avoid a faux pas such as holding lock for a long
|
||||||
* time on a rel we have no permissions for.
|
* time on a rel we have no permissions for.
|
||||||
*/
|
*/
|
||||||
if (stmt->behavior == DROP_CASCADE)
|
rels = list_copy(explicit_rels);
|
||||||
|
if (behavior == DROP_CASCADE)
|
||||||
{
|
{
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
|
@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
truncate_check_rel(rel);
|
truncate_check_rel(rel);
|
||||||
rels = lappend(rels, rel);
|
rels = lappend(rels, rel);
|
||||||
relids = lappend_oid(relids, relid);
|
relids = lappend_oid(relids, relid);
|
||||||
|
/* Log this relation only if needed for logical decoding */
|
||||||
|
if (RelationIsLogicallyLogged(rel))
|
||||||
|
relids_logged = lappend_oid(relids_logged, relid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
#ifdef USE_ASSERT_CHECKING
|
#ifdef USE_ASSERT_CHECKING
|
||||||
heap_truncate_check_FKs(rels, false);
|
heap_truncate_check_FKs(rels, false);
|
||||||
#else
|
#else
|
||||||
if (stmt->behavior == DROP_RESTRICT)
|
if (behavior == DROP_RESTRICT)
|
||||||
heap_truncate_check_FKs(rels, false);
|
heap_truncate_check_FKs(rels, false);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
* We want to do this early since it's pointless to do all the truncation
|
* We want to do this early since it's pointless to do all the truncation
|
||||||
* work only to fail on sequence permissions.
|
* work only to fail on sequence permissions.
|
||||||
*/
|
*/
|
||||||
if (stmt->restart_seqs)
|
if (restart_seqs)
|
||||||
{
|
{
|
||||||
foreach(cell, rels)
|
foreach(cell, rels)
|
||||||
{
|
{
|
||||||
|
@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
ResetSequence(seq_relid);
|
ResetSequence(seq_relid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Write a WAL record to allow this set of actions to be logically decoded.
|
||||||
|
*
|
||||||
|
* Assemble an array of relids so we can write a single WAL record for the
|
||||||
|
* whole action.
|
||||||
|
*/
|
||||||
|
if (list_length(relids_logged) > 0)
|
||||||
|
{
|
||||||
|
xl_heap_truncate xlrec;
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
/* should only get here if wal_level >= logical */
|
||||||
|
Assert(XLogLogicalInfoActive());
|
||||||
|
|
||||||
|
logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
|
||||||
|
foreach (cell, relids_logged)
|
||||||
|
logrelids[i++] = lfirst_oid(cell);
|
||||||
|
|
||||||
|
xlrec.dbId = MyDatabaseId;
|
||||||
|
xlrec.nrelids = list_length(relids_logged);
|
||||||
|
xlrec.flags = 0;
|
||||||
|
if (behavior == DROP_CASCADE)
|
||||||
|
xlrec.flags |= XLH_TRUNCATE_CASCADE;
|
||||||
|
if (restart_seqs)
|
||||||
|
xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
|
||||||
|
|
||||||
|
XLogBeginInsert();
|
||||||
|
XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
|
||||||
|
XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
|
||||||
|
|
||||||
|
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
|
||||||
|
|
||||||
|
(void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Process all AFTER STATEMENT TRUNCATE triggers.
|
* Process all AFTER STATEMENT TRUNCATE triggers.
|
||||||
*/
|
*/
|
||||||
|
@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
|
||||||
/* We can clean up the EState now */
|
/* We can clean up the EState now */
|
||||||
FreeExecutorState(estate);
|
FreeExecutorState(estate);
|
||||||
|
|
||||||
/* And close the rels (can't do this while EState still holds refs) */
|
/*
|
||||||
|
* Close any rels opened by CASCADE (can't do this while EState still
|
||||||
|
* holds refs)
|
||||||
|
*/
|
||||||
|
rels = list_difference_ptr(rels, explicit_rels);
|
||||||
foreach(cell, rels)
|
foreach(cell, rels)
|
||||||
{
|
{
|
||||||
Relation rel = (Relation) lfirst(cell);
|
Relation rel = (Relation) lfirst(cell);
|
||||||
|
|
|
@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu
|
||||||
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
|
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||||
|
|
||||||
|
@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||||
DecodeDelete(ctx, buf);
|
DecodeDelete(ctx, buf);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case XLOG_HEAP_TRUNCATE:
|
||||||
|
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||||
|
DecodeTruncate(ctx, buf);
|
||||||
|
break;
|
||||||
|
|
||||||
case XLOG_HEAP_INPLACE:
|
case XLOG_HEAP_INPLACE:
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -826,6 +832,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||||
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Parse XLOG_HEAP_TRUNCATE from wal
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||||
|
{
|
||||||
|
XLogReaderState *r = buf->record;
|
||||||
|
xl_heap_truncate *xlrec;
|
||||||
|
ReorderBufferChange *change;
|
||||||
|
|
||||||
|
xlrec = (xl_heap_truncate *) XLogRecGetData(r);
|
||||||
|
|
||||||
|
/* only interested in our database */
|
||||||
|
if (xlrec->dbId != ctx->slot->data.database)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* output plugin doesn't look for this origin, no need to queue */
|
||||||
|
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
|
||||||
|
return;
|
||||||
|
|
||||||
|
change = ReorderBufferGetChange(ctx->reorder);
|
||||||
|
change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
|
||||||
|
change->origin_id = XLogRecGetOrigin(r);
|
||||||
|
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
|
||||||
|
change->data.truncate.cascade = true;
|
||||||
|
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
|
||||||
|
change->data.truncate.restart_seqs = true;
|
||||||
|
change->data.truncate.nrelids = xlrec->nrelids;
|
||||||
|
change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
|
||||||
|
memcpy(change->data.truncate.relids, xlrec->relids,
|
||||||
|
xlrec->nrelids * sizeof(Oid));
|
||||||
|
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
|
||||||
|
buf->origptr, change);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
|
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
|
||||||
*
|
*
|
||||||
|
|
|
@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
XLogRecPtr commit_lsn);
|
XLogRecPtr commit_lsn);
|
||||||
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
Relation relation, ReorderBufferChange *change);
|
Relation relation, ReorderBufferChange *change);
|
||||||
|
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
|
int nrelations, Relation relations[], ReorderBufferChange *change);
|
||||||
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
XLogRecPtr message_lsn, bool transactional,
|
XLogRecPtr message_lsn, bool transactional,
|
||||||
const char *prefix, Size message_size, const char *message);
|
const char *prefix, Size message_size, const char *message);
|
||||||
|
@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options,
|
||||||
/* wrap output plugin callbacks, so we can add error context information */
|
/* wrap output plugin callbacks, so we can add error context information */
|
||||||
ctx->reorder->begin = begin_cb_wrapper;
|
ctx->reorder->begin = begin_cb_wrapper;
|
||||||
ctx->reorder->apply_change = change_cb_wrapper;
|
ctx->reorder->apply_change = change_cb_wrapper;
|
||||||
|
ctx->reorder->apply_truncate = truncate_cb_wrapper;
|
||||||
ctx->reorder->commit = commit_cb_wrapper;
|
ctx->reorder->commit = commit_cb_wrapper;
|
||||||
ctx->reorder->message = message_cb_wrapper;
|
ctx->reorder->message = message_cb_wrapper;
|
||||||
|
|
||||||
|
@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
error_context_stack = errcallback.previous;
|
error_context_stack = errcallback.previous;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||||
|
int nrelations, Relation relations[], ReorderBufferChange *change)
|
||||||
|
{
|
||||||
|
LogicalDecodingContext *ctx = cache->private_data;
|
||||||
|
LogicalErrorCallbackState state;
|
||||||
|
ErrorContextCallback errcallback;
|
||||||
|
|
||||||
|
Assert(!ctx->fast_forward);
|
||||||
|
|
||||||
|
if (!ctx->callbacks.truncate_cb)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* Push callback + info on the error context stack */
|
||||||
|
state.ctx = ctx;
|
||||||
|
state.callback_name = "truncate";
|
||||||
|
state.report_location = change->lsn;
|
||||||
|
errcallback.callback = output_plugin_error_callback;
|
||||||
|
errcallback.arg = (void *) &state;
|
||||||
|
errcallback.previous = error_context_stack;
|
||||||
|
error_context_stack = &errcallback;
|
||||||
|
|
||||||
|
/* set output state */
|
||||||
|
ctx->accept_writes = true;
|
||||||
|
ctx->write_xid = txn->xid;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* report this change's lsn so replies from clients can give an up2date
|
||||||
|
* answer. This won't ever be enough (and shouldn't be!) to confirm
|
||||||
|
* receipt of this transaction, but it might allow another transaction's
|
||||||
|
* commit to be confirmed with one message.
|
||||||
|
*/
|
||||||
|
ctx->write_location = change->lsn;
|
||||||
|
|
||||||
|
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
|
||||||
|
|
||||||
|
/* Pop the error context stack */
|
||||||
|
error_context_stack = errcallback.previous;
|
||||||
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||||
{
|
{
|
||||||
|
|
|
@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
||||||
|
case REORDER_BUFFER_CHANGE_TRUNCATE:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||||
specinsert = change;
|
specinsert = change;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case REORDER_BUFFER_CHANGE_TRUNCATE:
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
int nrelids = change->data.truncate.nrelids;
|
||||||
|
int nrelations = 0;
|
||||||
|
Relation *relations;
|
||||||
|
|
||||||
|
relations = palloc0(nrelids * sizeof(Relation));
|
||||||
|
for (i = 0; i < nrelids; i++)
|
||||||
|
{
|
||||||
|
Oid relid = change->data.truncate.relids[i];
|
||||||
|
Relation relation;
|
||||||
|
|
||||||
|
relation = RelationIdGetRelation(relid);
|
||||||
|
|
||||||
|
if (relation == NULL)
|
||||||
|
elog(ERROR, "could not open relation with OID %u", relid);
|
||||||
|
|
||||||
|
if (!RelationIsLogicallyLogged(relation))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
relations[nrelations++] = relation;
|
||||||
|
}
|
||||||
|
|
||||||
|
rb->apply_truncate(rb, txn, nrelations, relations, change);
|
||||||
|
|
||||||
|
for (i = 0; i < nrelations; i++)
|
||||||
|
RelationClose(relations[i]);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case REORDER_BUFFER_CHANGE_MESSAGE:
|
case REORDER_BUFFER_CHANGE_MESSAGE:
|
||||||
rb->message(rb, txn, change->lsn, true,
|
rb->message(rb, txn, change->lsn, true,
|
||||||
change->data.msg.prefix,
|
change->data.msg.prefix,
|
||||||
|
@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case REORDER_BUFFER_CHANGE_TRUNCATE:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
||||||
|
@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
/* the base struct contains all the data, easy peasy */
|
/* the base struct contains all the data, easy peasy */
|
||||||
|
case REORDER_BUFFER_CHANGE_TRUNCATE:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
||||||
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
#define XLOG_HEAP_INSERT 0x00
|
#define XLOG_HEAP_INSERT 0x00
|
||||||
#define XLOG_HEAP_DELETE 0x10
|
#define XLOG_HEAP_DELETE 0x10
|
||||||
#define XLOG_HEAP_UPDATE 0x20
|
#define XLOG_HEAP_UPDATE 0x20
|
||||||
/* 0x030 is free, was XLOG_HEAP_MOVE */
|
#define XLOG_HEAP_TRUNCATE 0x30
|
||||||
#define XLOG_HEAP_HOT_UPDATE 0x40
|
#define XLOG_HEAP_HOT_UPDATE 0x40
|
||||||
#define XLOG_HEAP_CONFIRM 0x50
|
#define XLOG_HEAP_CONFIRM 0x50
|
||||||
#define XLOG_HEAP_LOCK 0x60
|
#define XLOG_HEAP_LOCK 0x60
|
||||||
|
@ -109,6 +109,27 @@ typedef struct xl_heap_delete
|
||||||
|
|
||||||
#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
|
#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
|
||||||
|
|
||||||
|
/*
|
||||||
|
* xl_heap_delete flag values, 8 bits are available.
|
||||||
|
*/
|
||||||
|
#define XLH_TRUNCATE_CASCADE (1<<0)
|
||||||
|
#define XLH_TRUNCATE_RESTART_SEQS (1<<1)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For truncate we list all truncated relids in an array, followed by all
|
||||||
|
* sequence relids that need to be restarted, if any.
|
||||||
|
* All rels are always within the same database, so we just list dbid once.
|
||||||
|
*/
|
||||||
|
typedef struct xl_heap_truncate
|
||||||
|
{
|
||||||
|
Oid dbId;
|
||||||
|
uint32 nrelids;
|
||||||
|
uint8 flags;
|
||||||
|
Oid relids[FLEXIBLE_ARRAY_MEMBER];
|
||||||
|
} xl_heap_truncate;
|
||||||
|
|
||||||
|
#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids))
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
|
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
|
||||||
* or updated tuple in WAL; we can save a few bytes by reconstructing the
|
* or updated tuple in WAL; we can save a few bytes by reconstructing the
|
||||||
|
|
|
@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
|
||||||
extern void CheckTableNotInUse(Relation rel, const char *stmt);
|
extern void CheckTableNotInUse(Relation rel, const char *stmt);
|
||||||
|
|
||||||
extern void ExecuteTruncate(TruncateStmt *stmt);
|
extern void ExecuteTruncate(TruncateStmt *stmt);
|
||||||
|
extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
|
||||||
|
DropBehavior behavior, bool restart_seqs);
|
||||||
|
|
||||||
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
|
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
|
||||||
Relation relation,
|
Relation relation,
|
||||||
ReorderBufferChange *change);
|
ReorderBufferChange *change);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Callback for every TRUNCATE in a successful transaction.
|
||||||
|
*/
|
||||||
|
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn,
|
||||||
|
int nrelations,
|
||||||
|
Relation relations[],
|
||||||
|
ReorderBufferChange *change);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Called for every (explicit or implicit) COMMIT of a successful transaction.
|
* Called for every (explicit or implicit) COMMIT of a successful transaction.
|
||||||
*/
|
*/
|
||||||
|
@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks
|
||||||
LogicalDecodeStartupCB startup_cb;
|
LogicalDecodeStartupCB startup_cb;
|
||||||
LogicalDecodeBeginCB begin_cb;
|
LogicalDecodeBeginCB begin_cb;
|
||||||
LogicalDecodeChangeCB change_cb;
|
LogicalDecodeChangeCB change_cb;
|
||||||
|
LogicalDecodeTruncateCB truncate_cb;
|
||||||
LogicalDecodeCommitCB commit_cb;
|
LogicalDecodeCommitCB commit_cb;
|
||||||
LogicalDecodeMessageCB message_cb;
|
LogicalDecodeMessageCB message_cb;
|
||||||
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
||||||
|
|
|
@ -59,7 +59,8 @@ enum ReorderBufferChangeType
|
||||||
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
|
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
|
||||||
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
|
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
|
||||||
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
|
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
|
||||||
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
|
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
|
||||||
|
REORDER_BUFFER_CHANGE_TRUNCATE
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -99,6 +100,18 @@ typedef struct ReorderBufferChange
|
||||||
ReorderBufferTupleBuf *newtuple;
|
ReorderBufferTupleBuf *newtuple;
|
||||||
} tp;
|
} tp;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
|
||||||
|
* one set of relations to be truncated.
|
||||||
|
*/
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
Size nrelids;
|
||||||
|
bool cascade;
|
||||||
|
bool restart_seqs;
|
||||||
|
Oid *relids;
|
||||||
|
} truncate;
|
||||||
|
|
||||||
/* Message with arbitrary data. */
|
/* Message with arbitrary data. */
|
||||||
struct
|
struct
|
||||||
{
|
{
|
||||||
|
@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) (
|
||||||
Relation relation,
|
Relation relation,
|
||||||
ReorderBufferChange *change);
|
ReorderBufferChange *change);
|
||||||
|
|
||||||
|
/* truncate callback signature */
|
||||||
|
typedef void (*ReorderBufferApplyTruncateCB) (
|
||||||
|
ReorderBuffer *rb,
|
||||||
|
ReorderBufferTXN *txn,
|
||||||
|
int nrelations,
|
||||||
|
Relation relations[],
|
||||||
|
ReorderBufferChange *change);
|
||||||
|
|
||||||
/* begin callback signature */
|
/* begin callback signature */
|
||||||
typedef void (*ReorderBufferBeginCB) (
|
typedef void (*ReorderBufferBeginCB) (
|
||||||
ReorderBuffer *rb,
|
ReorderBuffer *rb,
|
||||||
|
@ -328,6 +349,7 @@ struct ReorderBuffer
|
||||||
*/
|
*/
|
||||||
ReorderBufferBeginCB begin;
|
ReorderBufferBeginCB begin;
|
||||||
ReorderBufferApplyChangeCB apply_change;
|
ReorderBufferApplyChangeCB apply_change;
|
||||||
|
ReorderBufferApplyTruncateCB apply_truncate;
|
||||||
ReorderBufferCommitCB commit;
|
ReorderBufferCommitCB commit;
|
||||||
ReorderBufferMessageCB message;
|
ReorderBufferMessageCB message;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue