diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index de1b692658..e5cd84e85e 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -339,7 +339,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.commit_time)); OutputPluginWrite(ctx, true); } @@ -382,7 +382,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.prepare_time)); OutputPluginWrite(ctx, true); } @@ -404,7 +404,7 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.commit_time)); OutputPluginWrite(ctx, true); } @@ -428,7 +428,7 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.commit_time)); OutputPluginWrite(ctx, true); } @@ -853,7 +853,7 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx, if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.prepare_time)); OutputPluginWrite(ctx, true); } @@ -882,7 +882,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", - timestamptz_to_str(txn->commit_time)); + timestamptz_to_str(txn->xact_time.commit_time)); OutputPluginWrite(ctx, true); } diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index f517a7d4af..0f5d25b948 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7641,6 +7641,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subtwophasestate char + + + State codes for two-phase mode: + d = disabled, + p = pending enablement, + e = enabled + + + subconninfo text diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index a3562f3d08..e8cb78ff1f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2811,11 +2811,17 @@ The commands accepted in replication mode are: - Protocol version. Currently versions 1 and - 2 are supported. The version 2 - is supported only for server version 14 and above, and it allows - streaming of large in-progress transactions. - + Protocol version. Currently versions 1, 2, + and 3 are supported. + + + Version 2 is supported only for server version 14 + and above, and it allows streaming of large in-progress transactions. + + + Version 3 is supported only for server version 15 + and above, and it allows streaming of two-phase transactions. + @@ -2871,10 +2877,11 @@ The commands accepted in replication mode are: The logical replication protocol sends individual transactions one by one. This means that all messages between a pair of Begin and Commit messages - belong to the same transaction. It also sends changes of large in-progress - transactions between a pair of Stream Start and Stream Stop messages. The - last stream of such a transaction contains Stream Commit or Stream Abort - message. + belong to the same transaction. Similarly, all messages between a pair of + Begin Prepare and Prepare messages belong to the same transaction. + It also sends changes of large in-progress transactions between a pair of + Stream Start and Stream Stop messages. The last stream of such a transaction + contains a Stream Commit or Stream Abort message. @@ -7390,6 +7397,272 @@ Stream Abort + +The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared) +are available since protocol version 3. + + + + + + +Begin Prepare + + + + + + +Byte1('b') + + Identifies the message as the beginning of a two-phase transaction message. + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the prepared transaction. + + + + +Int64 + + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the transaction. + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Prepare + + + + + + +Byte1('P') + + Identifies the message as a two-phase prepared transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the prepared transaction. + + + + +Int64 + + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the transaction. + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Commit Prepared + + + + + + +Byte1('K') + + Identifies the message as the commit of a two-phase transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the commit prepared. + + + + +Int64 + + The end LSN of the commit prepared transaction. + + + + +Int64 + + Commit timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the transaction. + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Rollback Prepared + + + + + + +Byte1('r') + + Identifies the message as the rollback of a two-phase transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The end LSN of the prepared transaction. + + + + +Int64 + + The end LSN of the rollback prepared transaction. + + + + +Int64 + + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int64 + + Rollback timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the transaction. + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + The following message parts are shared by the above messages. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index b3d173179f..a6f994450d 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -67,6 +67,11 @@ ALTER SUBSCRIPTION name RENAME TO < Commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true cannot be executed inside a transaction block. + + These commands also cannot be executed when the subscription has + two_phase commit enabled, unless copy_data = false. + See column subtwophasestate of + to know the actual two-phase state. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812beee37..143390593d 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -237,6 +237,43 @@ CREATE SUBSCRIPTION subscription_name + + + The streaming option cannot be used with the + two_phase option. + + + + + + two_phase (boolean) + + + Specifies whether two-phase commit is enabled for this subscription. + The default is false. + + + + When two-phase commit is enabled then the decoded transactions are sent + to the subscriber on the PREPARE TRANSACTION. By default, the transaction + prepared on the publisher is decoded as a normal transaction at commit. + + + + The two-phase commit implementation requires that the replication has + successfully passed the initial table synchronization phase. This means + even when two_phase is enabled for the subscription, the internal + two-phase state remains temporarily "pending" until the initialization + phase is completed. See column + subtwophasestate of + to know the actual two-phase state. + + + + The two_phase option cannot be used with the + streaming option. + + diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index a6c0788592..7682226b99 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -1405,7 +1405,12 @@ CREATE DATABASE foo WITH TEMPLATE template0; servers. It is then up to the user to reactivate the subscriptions in a suitable way. If the involved hosts have changed, the connection information might have to be changed. It might also be appropriate to - truncate the target tables before initiating a new full table copy. + truncate the target tables before initiating a new full table copy. If users + intend to copy initial data during refresh they must create the slot with + two_phase = false. After the initial sync, the + two_phase option will be automatically enabled by the + subscriber if the subscription had been originally created with + two_phase = true option. diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f67d813c56..6d3efb49a4 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2458,3 +2458,71 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * LookupGXact + * Check if the prepared transaction with the given GID, lsn and timestamp + * exists. + * + * Note that we always compare with the LSN where prepare ends because that is + * what is stored as origin_lsn in the 2PC file. + * + * This function is primarily used to check if the prepared transaction + * received from the upstream (remote node) already exists. Checking only GID + * is not sufficient because a different prepared xact with the same GID can + * exist on the same node. So, we are ensuring to match origin_lsn and + * origin_timestamp of prepared xact to avoid the possibility of a match of + * prepared xact from two different nodes. + */ +bool +LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, + TimestampTz origin_prepare_timestamp) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if (gxact->valid && strcmp(gxact->gid, gid) == 0) + { + char *buf; + TwoPhaseFileHeader *hdr; + + /* + * We are not expecting collisions of GXACTs (same gid) between + * publisher and subscribers, so we perform all I/O while holding + * TwoPhaseStateLock for simplicity. + * + * To move the I/O out of the lock, we need to ensure that no + * other backend commits the prepared xact in the meantime. We can + * do this optimization if we encounter many collisions in GID + * between publisher and subscriber. + */ + if (gxact->ondisk) + buf = ReadTwoPhaseFile(gxact->xid, false); + else + { + Assert(gxact->prepare_start_lsn); + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + } + + hdr = (TwoPhaseFileHeader *) buf; + + if (hdr->origin_lsn == prepare_end_lsn && + hdr->origin_timestamp == origin_prepare_timestamp) + { + found = true; + pfree(buf); + break; + } + + pfree(buf); + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 29fc4218cd..25021e25a4 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; + sub->twophasestate = subform->subtwophasestate; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, @@ -450,6 +451,39 @@ RemoveSubscriptionRel(Oid subid, Oid relid) table_close(rel, RowExclusiveLock); } +/* + * Does the subscription have any relations? + * + * Use this function only to know true/false, and when you have no need for the + * List returned by GetSubscriptionRelations. + */ +bool +HasSubscriptionRelations(Oid subid) +{ + Relation rel; + ScanKeyData skey[1]; + SysScanDesc scan; + bool has_subrels; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 1, skey); + + /* If even a single tuple exists then the subscription has tables. */ + has_subrels = HeapTupleIsValid(systable_getnext(scan)); + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return has_subrels; +} /* * Get all relations for subscription. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 999d984068..55f6e3711d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1255,5 +1255,5 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subslotname, subsynccommit, subpublications) + substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index eb88d877a5..5f834a9c30 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -59,6 +59,7 @@ #define SUBOPT_REFRESH 0x00000040 #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 +#define SUBOPT_TWOPHASE_COMMIT 0x00000200 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -79,6 +80,7 @@ typedef struct SubOpts bool refresh; bool binary; bool streaming; + bool twophase; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -123,6 +125,8 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o opts->binary = false; if (IsSet(supported_opts, SUBOPT_STREAMING)) opts->streaming = false; + if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) + opts->twophase = false; /* Parse options */ foreach(lc, stmt_options) @@ -237,6 +241,29 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + /* + * Do not allow toggling of two_phase option. Doing so could cause + * missing of transactions and lead to an inconsistent replica. + * See comments atop worker.c + * + * Note: Unsupported twophase indicates that this call originated + * from AlterSubscription. + */ + if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); + + if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; + opts->twophase = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -325,6 +352,25 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } + + /* + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be + * supported, it needs more analysis to allow them together. + */ + if (opts->twophase && + IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) && + IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) + { + if (opts->streaming && + IsSet(supported_opts, SUBOPT_STREAMING) && + IsSet(opts->specified_opts, SUBOPT_STREAMING)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ + errmsg("%s and %s are mutually exclusive options", + "two_phase = true", "streaming = true"))); + } } /* @@ -385,7 +431,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); parse_subscription_options(stmt->options, supported_opts, &opts); /* @@ -455,6 +501,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -532,10 +582,35 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (opts.create_slot) { + bool twophase_enabled = false; + Assert(opts.slot_name); - walrcv_create_slot(wrconn, opts.slot_name, false, + /* + * Even if two_phase is set, don't create the slot with + * two-phase enabled. Will enable it once all the tables are + * synced and ready. This avoids race-conditions like prepared + * transactions being skipped due to changes not being applied + * due to checks in should_apply_changes_for_rel() when + * tablesync for the corresponding tables are in progress. See + * comments atop worker.c. + * + * Note that if tables were specified but copy_data is false + * then it is safe to enable two_phase up-front because those + * tables are already initially in READY state. When the + * subscription has no tables, we leave the twophase state as + * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * PUBLICATION to work. + */ + if (opts.twophase && !opts.copy_data && tables != NIL) + twophase_enabled = true; + + walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, CRS_NOEXPORT_SNAPSHOT, NULL); + + if (twophase_enabled) + UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); + ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); @@ -865,6 +940,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { + if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set %s for two-phase enabled subscription", + "streaming = true"))); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -927,6 +1008,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + /* + * See ALTER_SUBSCRIPTION_REFRESH for details why this is + * not allowed. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Make sure refresh sees the new list of publications. */ @@ -966,6 +1058,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + /* + * See ALTER_SUBSCRIPTION_REFRESH for details why this is + * not allowed. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Only refresh the added/dropped list of publications. */ @@ -986,6 +1089,30 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts); + /* + * The subscription option "two_phase" requires that + * replication has passed the initial table synchronization + * phase before the two_phase becomes properly enabled. + * + * But, having reached this two-phase commit "enabled" state + * we must not allow any subsequent table initialization to + * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed + * when the user had requested two_phase = on mode. + * + * The exception to this restriction is when copy_data = + * false, because when copy_data is false the tablesync will + * start already in READY state and will exit directly without + * doing anything. + * + * For more details see comments atop worker.c. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); AlterSubscription_refresh(sub, opts.copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6eaa84a031..19ea159af4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -73,6 +73,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); @@ -436,6 +437,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.twophase && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", two_phase 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -851,7 +856,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, CRSSnapshotAction snapshot_action, + bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; @@ -868,6 +873,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, if (conn->logical) { appendStringInfoString(&cmd, " LOGICAL pgoutput"); + if (two_phase) + appendStringInfoString(&cmd, " TWO_PHASE"); + switch (snapshot_action) { case CRS_EXPORT_SNAPSHOT: diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 453efc51e1..2874dc0612 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -374,11 +374,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * * XXX Now, this can even lead to a deadlock if the prepare * transaction is waiting to get it logically replicated for - * distributed 2PC. Currently, we don't have an in-core - * implementation of prepares for distributed 2PC but some - * out-of-core logical replication solution can have such an - * implementation. They need to inform users to not have locks - * on catalog tables in such transactions. + * distributed 2PC. This can be avoided by disallowing + * preparing transactions that have locked [user] catalog + * tables exclusively but as of now, we ask users not to do + * such an operation. */ DecodePrepare(ctx, buf, &parsed); break; @@ -735,7 +734,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, - SnapBuildInitialConsistentPoint(ctx->snapshot_builder), + SnapBuildGetTwoPhaseAt(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d536a5f3ba..d61ef4cfad 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.initial_consistent_point); + need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= slot->data.two_phase; ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start, provided the plugin supports all the + * callbacks for two-phase. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; - slot->data.initial_consistent_point = ctx->reader->EndRecPtr; + if (slot->data.two_phase) + slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index cb42fcb34d..2c191dec04 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node, /* * Due to - harmless - race conditions during a checkpoint we could see - * values here that are older than the ones we already have in memory. - * Don't overwrite those. + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1cf59e0fb0..13c8c3bd5b 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) /* fixed fields */ pq_sendint64(out, txn->final_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); pq_sendint32(out, txn->xid); } @@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* @@ -106,6 +106,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) commit_data->committime = pq_getmsgint64(in); } +/* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) +{ + /* read fields */ + begin_data->prepare_lsn = pq_getmsgint64(in); + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn not set in begin prepare message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin prepare message"); + begin_data->prepare_time = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in prepare message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in prepare message"); + prepare_data->prepare_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepared message", flags); + + /* read fields */ + prepare_data->commit_lsn = pq_getmsgint64(in); + if (prepare_data->commit_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepared message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); + rollback_data->prepare_time = pq_getmsgint64(in); + rollback_data->rollback_time = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + /* * Write ORIGIN to the output stream. */ @@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1b4f4a528a..7378beb684 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2576,7 +2576,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn, txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; - txn->commit_time = commit_time; + txn->xact_time.commit_time = commit_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; @@ -2667,7 +2667,7 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, */ txn->final_lsn = prepare_lsn; txn->end_lsn = end_lsn; - txn->commit_time = prepare_time; + txn->xact_time.prepare_time = prepare_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; @@ -2714,7 +2714,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, Assert(txn->final_lsn != InvalidXLogRecPtr); ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, - txn->commit_time, txn->origin_id, txn->origin_lsn); + txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); /* * We send the prepare for the concurrently aborted xacts so that later @@ -2734,7 +2734,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2753,19 +2753,20 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * be later used for rollback. */ prepare_end_lsn = txn->end_lsn; - prepare_time = txn->commit_time; + prepare_time = txn->xact_time.prepare_time; /* add the gid in the txn */ txn->gid = pstrdup(gid); /* * It is possible that this transaction is not decoded at prepare time - * either because by that time we didn't have a consistent snapshot or it - * was decoded earlier but we have restarted. We only need to send the - * prepare if it was not decoded earlier. We don't need to decode the xact - * for aborts if it is not done already. + * either because by that time we didn't have a consistent snapshot, or + * two_phase was not enabled, or it was decoded earlier but we have + * restarted. We only need to send the prepare if it was not decoded + * earlier. We don't need to decode the xact for aborts if it is not done + * already. */ - if ((txn->final_lsn < initial_consistent_point) && is_commit) + if ((txn->final_lsn < two_phase_at) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; @@ -2783,12 +2784,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * prepared after the restart. */ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, - txn->commit_time, txn->origin_id, txn->origin_lsn); + txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); } txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; - txn->commit_time = commit_time; + txn->xact_time.commit_time = commit_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 04f3355f60..a14a3d6900 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,15 +165,15 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. * - * The prepared transactions that are not covered by initial snapshot - * needs to be sent later along with commit prepared and they must be - * before this point. + * The prepared transactions, that were skipped because previously + * two-phase was not enabled or are not covered by initial snapshot, need + * to be sent later along with commit prepared and they must be before + * this point. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Don't start decoding WAL until the "xl_running_xacts" information @@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point) + XLogRecPtr two_phase_at) { MemoryContext context; MemoryContext oldcontext; @@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; - builder->initial_consistent_point = initial_consistent_point; + builder->two_phase_at = two_phase_at; MemoryContextSwitchTo(oldcontext); @@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder) } /* - * Return the LSN at which the snapshot was exported + * Return the LSN at which the two-phase decoding was first enabled. */ XLogRecPtr -SnapBuildInitialConsistentPoint(SnapBuild *builder) +SnapBuildGetTwoPhaseAt(SnapBuild *builder) { - return builder->initial_consistent_point; + return builder->two_phase_at; +} + +/* + * Set the LSN at which two-phase decoding is enabled. + */ +void +SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) +{ + builder->two_phase_at = ptr; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 682c107e74..f07983a43c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -96,6 +96,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" @@ -114,8 +115,11 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static bool FetchTableStates(bool *started_tx); StringInfo copybuf = NULL; @@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Oid relid; TimestampTz last_start_time; }; - static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; @@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - if (!table_states_valid) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; - - StartTransactionCommand(); - started_tx = true; - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); - } - MemoryContextSwitchTo(oldctx); - - table_states_valid = true; - } + FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ - if (table_states && !last_start_times) + if (table_states_not_ready && !last_start_times) { HASHCTL ctl; @@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ - else if (!table_states && last_start_times) + else if (!table_states_not_ready && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } + /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + /* * Process all tables that are being synchronized. */ - foreach(lc, table_states) + foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1158,3 +1156,134 @@ copy_table_done: wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +static bool +FetchTableStates(bool *started_tx) +{ + static bool has_subrels = false; + + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY relations found then we know it does. But + * if table_state_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subrels = (list_length(table_states_not_ready) > 0) || + HasSubscriptionRelations(MySubscription->oid); + + table_states_valid = true; + } + + return has_subrels; +} + +/* + * If the subscription has no tables then return false. + * + * Otherwise, are all tablesyncs READY? + * + * Note: This function is not suitable to be called from outside of apply or + * tablesync workers because MySubscription needs to be already initialized. + */ +bool +AllTablesyncsReady(void) +{ + bool started_tx = false; + bool has_subrels = false; + + /* We need up-to-date sync state info for subscription tables here. */ + has_subrels = FetchTableStates(&started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } + + /* + * Return false when there are no tables in subscription or not all tables + * are in ready state; true otherwise. + */ + return has_subrels && list_length(table_states_not_ready) == 0; +} + +/* + * Update the two_phase state of the specified subscription in pg_subscription. + */ +void +UpdateTwoPhaseState(Oid suboid, char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + suboid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase state */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5fc620c7f1..b9a7a7ffbb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -49,6 +49,79 @@ * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. + * + * TWO_PHASE TRANSACTIONS + * ---------------------- + * Two phase transactions are replayed at prepare and then committed or + * rolled back at commit prepared and rollback prepared respectively. It is + * possible to have a prepared transaction that arrives at the apply worker + * when the tablesync is busy doing the initial copy. In this case, the apply + * worker skips all the prepared operations [e.g. inserts] while the tablesync + * is still busy (see the condition of should_apply_changes_for_rel). The + * tablesync worker might not get such a prepared transaction because say it + * was prior to the initial consistent point but might have got some later + * commits. Now, the tablesync worker will exit without doing anything for the + * prepared transaction skipped by the apply worker as the sync location for it + * will be already ahead of the apply worker's current location. This would lead + * to an "empty prepare", because later when the apply worker does the commit + * prepare, there is nothing in it (the inserts were skipped earlier). + * + * To avoid this, and similar prepare confusions the subscription's two_phase + * commit is enabled only after the initial sync is over. The two_phase option + * has been implemented as a tri-state with values DISABLED, PENDING, and + * ENABLED. + * + * Even if the user specifies they want a subscription with two_phase = on, + * internally it will start with a tri-state of PENDING which only becomes + * ENABLED after all tablesync initializations are completed - i.e. when all + * tablesync workers have reached their READY state. In other words, the value + * PENDING is only a temporary state for subscription start-up. + * + * Until the two_phase is properly available (ENABLED) the subscription will + * behave as if two_phase = off. When the apply worker detects that all + * tablesyncs have become READY (while the tri-state was PENDING) it will + * restart the apply worker process. This happens in + * process_syncing_tables_for_apply. + * + * When the (re-started) apply worker finds that all tablesyncs are READY for a + * two_phase tri-state of PENDING it start streaming messages with the + * two_phase option which in turn enables the decoding of two-phase commits at + * the publisher. Then, it updates the tri-state value from PENDING to ENABLED. + * Now, it is possible that during the time we have not enabled two_phase, the + * publisher (replication server) would have skipped some prepares but we + * ensure that such prepares are sent along with commit prepare, see + * ReorderBufferFinishPrepared. + * + * If the subscription has no tables then a two_phase tri-state PENDING is + * left unchanged. This lets the user still do an ALTER TABLE REFRESH + * PUBLICATION which might otherwise be disallowed (see below). + * + * If ever a user needs to be aware of the tri-state value, they can fetch it + * from the pg_subscription catalog (see column subtwophasestate). + * + * We don't allow to toggle two_phase option of a subscription because it can + * lead to an inconsistent replica. Consider, initially, it was on and we have + * received some prepare then we turn it off, now at commit time the server + * will send the entire transaction data along with the commit. With some more + * analysis, we can allow changing this option from off to on but not sure if + * that alone would be useful. + * + * Finally, to avoid problems mentioned in previous paragraphs from any + * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' + * to 'off' and then again back to 'on') there is a restriction for + * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when + * the two_phase tri-state is ENABLED, except when copy_data = false. + * + * We can get prepare of the same GID more than once for the genuine cases + * where we have defined multiple subscriptions for publications on the same + * server and prepared transaction has operations on tables subscribed to those + * subscriptions. For such cases, if we use the GID sent by publisher one of + * the prepares will be successful and others will fail, in which case the + * server will send them again. Now, this can lead to a deadlock if user has + * set synchronous_standby_names for all the subscriptions on subscriber. To + * avoid such deadlocks, we generate a unique GID (consisting of the + * subscription oid and the xid of the prepared transaction) for each prepare + * transaction on the subscriber. *------------------------------------------------------------------------- */ @@ -59,6 +132,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" @@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, LogicalRepTupleData *newtup, CmdType operation); +/* Compute GID for two_phase transactions */ +static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); + + /* * Should this worker apply changes for given relation. * @@ -783,6 +861,185 @@ apply_handle_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +/* + * Handle BEGIN PREPARE message. + */ +static void +apply_handle_begin_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData begin_data; + + /* Tablesync should never receive prepare. */ + if (am_tablesync_worker()) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + + logicalrep_read_begin_prepare(s, &begin_data); + + remote_final_lsn = begin_data.prepare_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_prepare(s, &prepare_data); + + if (prepare_data.prepare_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", + LSN_FORMAT_ARGS(prepare_data.prepare_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because at + * commit prepared time, we won't know whether we have skipped preparing a + * transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; + + PrepareTransactionBlock(gid); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a COMMIT PREPARED of a previously PREPARED transaction. + */ +static void +apply_handle_commit_prepared(StringInfo s) +{ + LogicalRepCommitPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_commit_prepared(s, &prepare_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* There is no transaction when COMMIT PREPARED is called */ + begin_replication_step(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared(StringInfo s) +{ + LogicalRepRollbackPreparedTxnData rollback_data; + char gid[GIDSIZE]; + + logicalrep_read_rollback_prepared(s, &rollback_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, + gid, sizeof(gid)); + + /* + * It is possible that we haven't received prepare because it occurred + * before walsender reached a consistent point or the two_phase was still + * not enabled by that time, so in such cases, we need to skip rollback + * prepared. + */ + if (LookupGXact(gid, rollback_data.prepare_end_lsn, + rollback_data.prepare_time)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; + replorigin_session_origin_timestamp = rollback_data.rollback_time; + + /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ + begin_replication_step(); + FinishPreparedTransaction(gid, false); + end_replication_step(); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(rollback_data.rollback_end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(rollback_data.rollback_end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + /* * Handle ORIGIN message. * @@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_BEGIN_PREPARE: + apply_handle_begin_prepare(s); + return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_COMMIT_PREPARED: + apply_handle_commit_prepared(s); + return; + + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + apply_handle_rollback_prepared(s); + return; } ereport(ERROR, @@ -2539,6 +2812,9 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); + /* two-phase should not be altered */ + Assert(newsub->twophasestate == MySubscription->twophasestate); + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker. @@ -3040,6 +3316,24 @@ cleanup_subxact_info() subxact_data.nsubxacts_max = 0; } +/* + * Form the prepared transaction GID for two_phase transactions. + * + * Return the GID in the supplied buffer. + */ +static void +TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) +{ + Assert(subid != InvalidRepOriginId); + + if (!TransactionIdIsValid(xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid two-phase transaction ID"))); + + snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg) XLogRecPtr origin_startpos; char *myslotname; WalRcvStreamOptions options; + int server_version; /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg) options.logical = true; options.startpoint = origin_startpos; options.slotname = myslotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); options.proto.logical.proto_version = - walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ? - LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.twophase = false; - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + if (!am_tablesync_worker()) + { + /* + * Even when the two_phase mode is requested by the user, it remains + * as the tri-state PENDING until all tablesyncs have reached READY + * state. Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + else + { + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + } + + ereport(DEBUG1, + (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + } + else + { + /* Start normal logical streaming replication. */ + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + } /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index abd5217ab1..e4314af13a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -51,6 +51,16 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -70,6 +80,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void send_repl_origin(LogicalDecodingContext *ctx, + RepOriginId origin_id, XLogRecPtr origin_lsn, + bool send_origin); /* * Entry in the map used to remember which relation schemas we sent. @@ -145,6 +158,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; + + cb->begin_prepare_cb = pgoutput_begin_prepare_txn; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -156,6 +174,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = NULL; } static void @@ -167,10 +187,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool binary_option_given = false; bool messages_option_given = false; bool streaming_given = false; + bool two_phase_option_given = false; data->binary = false; data->streaming = false; data->messages = false; + data->two_phase = false; foreach(lc, options) { @@ -246,8 +268,29 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_option_given = true; + + data->two_phase = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); + + /* + * Do additional checking for the disallowed combination of two_phase + * and streaming. While streaming and two_phase can theoretically be + * supported, it needs more analysis to allow them together. + */ + if (data->two_phase && data->streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "two_phase", "streaming"))); } } @@ -319,6 +362,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Also remember we're currently not streaming any transaction. */ in_streaming = false; + /* + * Here, we just check whether the two-phase option is passed by + * plugin and decide whether to enable it at later point of time. It + * remains enabled if the previous start-up has done so. But we only + * allow the option to be passed in with sufficient version of the + * protocol, and when the output plugin supports it. + */ + if (!data->two_phase) + ctx->twophase_opt_given = false; + else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM))); + else if (!ctx->twophase) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("two-phase commit requested, but not supported by output plugin"))); + else + ctx->twophase_opt_given = true; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -331,8 +395,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else { - /* Disable the streaming during the slot initialization mode. */ + /* + * Disable the streaming and prepared transactions during the slot + * initialization mode. + */ ctx->streaming = false; + ctx->twophase = false; } } @@ -347,29 +415,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); - if (send_replication_origin) - { - char *origin; - - /*---------- - * XXX: which behaviour do we want here? - * - * Alternatives: - * - don't send origin message if origin name not found - * (that's what we do now) - * - throw error - that will break replication, not good - * - send some special "unknown" origin - *---------- - */ - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); - } - - } + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); OutputPluginWrite(ctx, true); } @@ -388,6 +435,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* + * BEGIN PREPARE callback + */ +static void +pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_begin_prepare(ctx->out, txn); + + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); + + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, + prepare_time); + OutputPluginWrite(ctx, true); +} + /* * Write the current schema of the relation and its ancestor (if any) if not * done yet. @@ -839,18 +948,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); - if (send_replication_origin) - { - char *origin; - - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); - } - } + send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, + send_replication_origin); OutputPluginWrite(ctx, true); @@ -1270,3 +1369,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubtruncate = false; } } + +/* Send Replication origin */ +static void +send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, + XLogRecPtr origin_lsn, bool send_origin) +{ + if (send_origin) + { + char *origin; + + /*---------- + * XXX: which behaviour do we want here? + * + * Alternatives: + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin + *---------- + */ + if (replorigin_by_oid(origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + logicalrep_write_origin(ctx->out, origin, origin_lsn); + } + } +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8c18b4ed05..33b85d86cc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -283,6 +283,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; slot->data.two_phase = two_phase; + slot->data.two_phase_at = InvalidXLogRecPtr; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2be9ad967d..9a2bc37fd7 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -370,7 +370,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 321152151d..912144c43e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -51,6 +51,7 @@ #include "catalog/pg_largeobject_d.h" #include "catalog/pg_largeobject_metadata_d.h" #include "catalog/pg_proc_d.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" #include "common/connect.h" @@ -4320,6 +4321,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_substream; + int i_subtwophasestate; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4363,9 +4365,16 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " false AS subbinary,\n"); if (fout->remoteVersion >= 140000) - appendPQExpBufferStr(query, " s.substream\n"); + appendPQExpBufferStr(query, " s.substream,\n"); else - appendPQExpBufferStr(query, " false AS substream\n"); + appendPQExpBufferStr(query, " false AS substream,\n"); + + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, " s.subtwophasestate\n"); + else + appendPQExpBuffer(query, + " '%c' AS subtwophasestate\n", + LOGICALREP_TWOPHASE_STATE_DISABLED); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4386,6 +4395,7 @@ getSubscriptions(Archive *fout) i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); + i_subtwophasestate = PQfnumber(res, "subtwophasestate"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4411,6 +4421,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = pg_strdup(PQgetvalue(res, i, i_substream)); + subinfo[i].subtwophasestate = + pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4438,6 +4450,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) char **pubnames = NULL; int npubnames = 0; int i; + char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'}; if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) return; @@ -4479,6 +4492,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->substream, "f") != 0) appendPQExpBufferStr(query, ", streaming = on"); + if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) + appendPQExpBufferStr(query, ", two_phase = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ba9bc6ddd2..efb8c30e71 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -639,6 +639,7 @@ typedef struct _SubscriptionInfo char *subslotname; char *subbinary; char *substream; + char *subtwophasestate; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 2abf255798..ba658f731b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6389,7 +6389,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6423,6 +6423,12 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Binary"), gettext_noop("Streaming")); + /* Two_phase is only supported in v15 and higher */ + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ", subtwophasestate AS \"%s\"\n", + gettext_noop("Two phase commit")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 0ebd5aa41a..d6bf725971 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2764,7 +2764,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "enabled", "slot_name", "streaming", - "synchronous_commit"); + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 91786da784..e27e1a8fe8 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -58,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); +extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn, + TimestampTz origin_prepare_timestamp); #endif /* TWOPHASE_H */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index e92ecaf344..f2ecafa1da 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202107071 +#define CATALOG_VERSION_NO 202107141 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 750d46912a..21061493ea 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -22,6 +22,14 @@ #include "nodes/pg_list.h" +/* + * two_phase tri-state values. See comments atop worker.c to know more about + * these states. + */ +#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' +#define LOGICALREP_TWOPHASE_STATE_PENDING 'p' +#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' + /* ---------------- * pg_subscription definition. cpp turns this into * typedef struct FormData_pg_subscription @@ -57,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + char subtwophasestate; /* Stream two-phase transactions */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -92,6 +102,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + char twophasestate; /* Allow streaming two-phase transactions */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 4d2056318d..632381b4e3 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -87,6 +87,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); +extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid); extern List *GetSubscriptionNotReadyRelations(Oid subid); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index af551d6f4e..e0f513b773 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -89,6 +89,16 @@ typedef struct LogicalDecodingContext */ bool twophase; + /* + * Is two-phase option given by output plugin? + * + * This flag indicates that the plugin passed in the two-phase option as + * part of the START_STREAMING command. We can't rely solely on the + * twophase flag which only tells whether the plugin provided all the + * necessary two-phase callbacks. + */ + bool twophase_opt_given; + /* * State for writing output. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 55b90c03ea..63de90d94a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -13,6 +13,7 @@ #ifndef LOGICAL_PROTO_H #define LOGICAL_PROTO_H +#include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" @@ -26,12 +27,16 @@ * connect time. * * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with - * support for streaming large transactions. + * support for streaming large transactions. Introduced in PG14. + * + * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with + * support for two-phase commit decoding (at prepare time). Introduced in PG15. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 -#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 +#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM /* * Logical message types @@ -55,6 +60,10 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -122,6 +131,48 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; +/* + * Prepared transaction protocol information for begin_prepare, and prepare. + */ +typedef struct LogicalRepPreparedTxnData +{ + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz prepare_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepPreparedTxnData; + +/* + * Prepared transaction protocol information for commit prepared. + */ +typedef struct LogicalRepCommitPreparedTxnData +{ + XLogRecPtr commit_lsn; + XLogRecPtr end_lsn; + TimestampTz commit_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepCommitPreparedTxnData; + +/* + * Rollback Prepared transaction protocol information. The prepare information + * prepare_end_lsn and prepare_time are used to check if the downstream has + * received this prepared transaction in which case it can apply the rollback, + * otherwise, it can skip the rollback operation. The gid alone is not + * sufficient because the downstream node can have a prepared transaction with + * same identifier. + */ +typedef struct LogicalRepRollbackPreparedTxnData +{ + XLogRecPtr prepare_end_lsn; + XLogRecPtr rollback_end_lsn; + TimestampTz prepare_time; + TimestampTz rollback_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepRollbackPreparedTxnData; + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -129,6 +180,24 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin_prepare(StringInfo in, + LogicalRepPreparedTxnData *begin_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit_prepared(StringInfo in, + LogicalRepCommitPreparedTxnData *prepare_data); +extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); +extern void logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data); + + extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 51e7c0348d..0dc460fb70 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -27,6 +27,7 @@ typedef struct PGOutputData bool binary; bool streaming; bool messages; + bool two_phase; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ba257d81b5..5b40ff75f7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -297,7 +297,11 @@ typedef struct ReorderBufferTXN * Commit or Prepare time, only known when we read the actual commit or * prepare record. */ - TimestampTz commit_time; + union + { + TimestampTz commit_time; + TimestampTz prepare_time; + } xact_time; /* * The base snapshot is used to decode all changes until either this @@ -636,7 +640,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 2eb7e3a530..34d95eac8e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -84,11 +84,10 @@ typedef struct ReplicationSlotPersistentData XLogRecPtr confirmed_flush; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Allow decoding of prepared transactions? diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbabce6764..de7212464a 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -62,7 +62,7 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point); + XLogRecPtr two_phase_at); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); @@ -76,7 +76,8 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); -extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder); +extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); +extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4fd7c25ea7..0b607ed777 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -181,6 +181,8 @@ typedef struct List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ bool streaming; /* Streaming of large transactions */ + bool twophase; /* Streaming of two-phase transactions at + * prepare time */ } logical; } proto; } WalRcvStreamOptions; @@ -347,6 +349,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -420,8 +423,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 179eb43900..41c7487393 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,9 @@ extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +extern bool AllTablesyncsReady(void); +extern void UpdateTwoPhaseState(Oid suboid, char new_state); + void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 57f7dd9b0a..ad6b4e4bd3 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -185,19 +185,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) -- fail - publication already exists @@ -212,10 +212,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist (1 row) -- fail - publication used more then once @@ -233,10 +233,10 @@ ERROR: unrecognized subscription parameter: "copy_data" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -263,6 +263,43 @@ ALTER SUBSCRIPTION regress_testsub DISABLE; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); +ERROR: two_phase requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); +ERROR: unrecognized subscription parameter: "two_phase" +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); +ERROR: cannot set streaming = true for two-phase enabled subscription +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); +ERROR: two_phase = true and streaming = true are mutually exclusive options +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +------+-------+---------+-------------+--------+-----------+------------------+--------------------+---------- +(0 rows) + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 308c098c14..b732871407 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -202,6 +202,31 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); + +\dRs+ +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); + +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); + +\dRs+ + + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl new file mode 100644 index 0000000000..c6ada92ff0 --- /dev/null +++ b/src/test/subscription/t/021_twophase.pl @@ -0,0 +1,359 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 24; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full SELECT generate_series(1,10); + PREPARE TRANSACTION 'some_initial_data'; + COMMIT PREPARED 'some_initial_data';"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tab_full"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (two_phase = on)"); + +# Wait for subscriber to finish initialization +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Also wait for two-phase to be enabled +my $twophase_query = + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_subscriber->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +############################### +# check that 2PC gets replicated to subscriber +# then COMMIT PREPARED +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# check that 2PC gets replicated to subscriber +# then ROLLBACK PREPARED +############################### + +$node_publisher->safe_psql('postgres'," + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that ROLLBACK PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(0), 'Rows rolled back are not on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (subscriber only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (14); + INSERT INTO tab_full VALUES (15); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (14,15);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (16); + INSERT INTO tab_full VALUES (17); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (16,17);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Test nested transaction with 2PC +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# COMMIT +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_publisher->wait_for_catchup($appname); + +# check the transaction state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber'); + +# check inserts are visible. 22 should be rolled back. 21 should be committed. +$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);"); +is($result, qq(21), 'Rows committed are on the subscriber'); + +############################### +# Test using empty GID +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (51); + PREPARE TRANSACTION '';"); +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# ROLLBACK +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';"); + +# check that 2PC gets aborted on subscriber +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# copy_data=false and two_phase +############################### + +#create some test tables for copy tests +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_copy SELECT generate_series(1,5);"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', "INSERT INTO tab_copy VALUES (88);"); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(1), 'initial data in subscriber table'); + +# Setup logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_copy FOR TABLE tab_copy;"); + +my $appname_copy = 'appname_copy'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_copy + CONNECTION '$publisher_connstr application_name=$appname_copy' + PUBLICATION tap_pub_copy + WITH (two_phase=on, copy_data=false);"); + +# Wait for subscriber to finish initialization +$node_publisher->wait_for_catchup($appname_copy); + +# Also wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Also wait for two-phase to be enabled +$node_subscriber->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +# Check that the initial table data was NOT replicated (because we said copy_data=false) +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(1), 'initial data in subscriber table'); + +# Now do a prepare on publisher and check that it IS replicated +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_copy VALUES (99); + PREPARE TRANSACTION 'mygid';"); + +$node_publisher->wait_for_catchup($appname_copy); + +# Check that the transaction has been prepared on the subscriber, there will be 2 +# prepared transactions for the 2 subscriptions. +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(2), 'transaction is prepared on subscriber'); + +# Now commit the insert and verify that it IS replicated +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';"); + +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(6), 'publisher inserted data'); + +$node_publisher->wait_for_catchup($appname_copy); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(2), 'replicated data in subscriber table'); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl new file mode 100644 index 0000000000..e61d28a48b --- /dev/null +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -0,0 +1,235 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test cascading logical replication of 2PC. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 27; + +############################### +# Setup a cascade of pub/sub nodes. +# node_A -> node_B -> node_C +############################### + +# Initialize nodes +# node_A +my $node_A = get_new_node('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_A->start; +# node_B +my $node_B = get_new_node('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_B->start; +# node_C +my $node_C = get_new_node('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_C->start; + +# Create some pre-existing content on node_A +$node_A->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_A->safe_psql('postgres', " + INSERT INTO tab_full SELECT generate_series(1,10);"); + +# Create the same tables on node_B amd node_C +$node_B->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_C->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication + +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (two_phase = on)"); + +# node_B (pub) -> node_C (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_C = 'tap_sub_C'; +$node_C->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_C + CONNECTION '$node_B_connstr application_name=$appname_C' + PUBLICATION tap_pub_B + WITH (two_phase = on)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# Also wait for two-phase to be enabled +my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_B->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; +$node_C->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +is(1,1, "Cascade setup is complete"); + +my $result; + +############################### +# check that 2PC gets replicated to subscriber(s) +# then COMMIT PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction was committed on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C'); + +# check the transaction state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber C'); + +############################### +# check that 2PC gets replicated to subscriber(s) +# then ROLLBACK PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC ROLLBACK +$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction is aborted on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C'); + +# check the transaction state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +############################### +# Test nested transactions with 2PC +############################### + +# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is ended on subscriber +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +# check inserts are visible at subscriber(s). +# 22 should be rolled back. +# 21 should be committed. +$result = $node_B->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);"); +is($result, qq(21), 'Rows committed are present on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);"); +is($result, qq(21), 'Rows committed are present on subscriber C'); + +############################### +# check all the cleanup +############################### + +# cleanup the node_B => node_C pub/sub +$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C"); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node C'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node B'); + +# cleanup the node_A => node_B pub/sub +$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B"); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node B'); +$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node A'); + +# shutdown +$node_C->stop('fast'); +$node_B->stop('fast'); +$node_A->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b287c29f64..37cf4b2f76 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1390,12 +1390,15 @@ LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData LogicalRepCommitData +LogicalRepCommitPreparedTxnData LogicalRepCtxStruct LogicalRepMsgType LogicalRepPartMapEntry +LogicalRepPreparedTxnData LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation +LogicalRepRollbackPreparedTxnData LogicalRepTupleData LogicalRepTyp LogicalRepWorker