diff --git a/contrib/amcheck/t/003_cic_2pc.pl b/contrib/amcheck/t/003_cic_2pc.pl new file mode 100644 index 0000000000..803b99a738 --- /dev/null +++ b/contrib/amcheck/t/003_cic_2pc.pl @@ -0,0 +1,188 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; + +use Test::More tests => 6; + +my ($node, $result); + +# +# Test set-up +# +$node = PostgresNode->new('CIC_2PC_test'); +$node->init; +$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10'); +$node->append_conf('postgresql.conf', 'lock_timeout = 180000'); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION amcheck)); +$node->safe_psql('postgres', q(CREATE TABLE tbl(i int))); + + +# +# Run 3 overlapping 2PC transactions with CIC +# +# We have two concurrent background psql processes: $main_h for INSERTs and +# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED +# statements. +# + +my $main_in = ''; +my $main_out = ''; +my $main_timer = IPC::Run::timeout(180); + +my $main_h = + $node->background_psql('postgres', \$main_in, \$main_out, + $main_timer, on_error_stop => 1); +$main_in .= q( +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint1 +); +pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired; + +my $cic_in = ''; +my $cic_out = ''; +my $cic_timer = IPC::Run::timeout(180); +my $cic_h = + $node->background_psql('postgres', \$cic_in, \$cic_out, + $cic_timer, on_error_stop => 1); +$cic_in .= q( +\echo start +CREATE INDEX CONCURRENTLY idx ON tbl(i); +); +pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired; + +$main_in .= q( +PREPARE TRANSACTION 'a'; +); + +$main_in .= q( +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint2 +); +pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired; + +$node->safe_psql('postgres', q(COMMIT PREPARED 'a';)); + +$main_in .= q( +PREPARE TRANSACTION 'b'; +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint3 +); +pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired; + +$node->safe_psql('postgres', q(COMMIT PREPARED 'b';)); + +$main_in .= q( +PREPARE TRANSACTION 'c'; +COMMIT PREPARED 'c'; +); +$main_h->pump_nb; + +$main_h->finish; +$cic_h->finish; + +$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); +is($result, '0', 'bt_index_check after overlapping 2PC'); + + +# +# Server restart shall not change whether prepared xact blocks CIC +# + +$node->safe_psql( + 'postgres', q( +BEGIN; +INSERT INTO tbl VALUES(0); +PREPARE TRANSACTION 'spans_restart'; +BEGIN; +CREATE TABLE unused (); +PREPARE TRANSACTION 'persists_forever'; +)); +$node->restart; + +my $reindex_in = ''; +my $reindex_out = ''; +my $reindex_timer = IPC::Run::timeout(180); +my $reindex_h = + $node->background_psql('postgres', \$reindex_in, \$reindex_out, + $reindex_timer, on_error_stop => 1); +$reindex_in .= q( +\echo start +DROP INDEX CONCURRENTLY idx; +CREATE INDEX CONCURRENTLY idx ON tbl(i); +); +pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired; + +$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'"); +$reindex_h->finish; +$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true))); +is($result, '0', 'bt_index_check after 2PC and restart'); + + +# +# Stress CIC+2PC with pgbench +# + +# Fix broken index first +$node->safe_psql('postgres', q(REINDEX TABLE tbl;)); + +# Run background pgbench with CIC. We cannot mix-in this script into single +# pgbench: CIC will deadlock with itself occasionally. +my $pgbench_out = ''; +my $pgbench_timer = IPC::Run::timeout(180); +my $pgbench_h = $node->background_pgbench( + '--no-vacuum --client=1 --transactions=100', + { + '002_pgbench_concurrent_cic' => q( + DROP INDEX CONCURRENTLY idx; + CREATE INDEX CONCURRENTLY idx ON tbl(i); + SELECT bt_index_check('idx',true); + ) + }, + \$pgbench_out, + $pgbench_timer); + +# Run pgbench. +$node->pgbench( + '--no-vacuum --client=5 --transactions=100', + 0, + [qr{actually processed}], + [qr{^$}], + 'concurrent INSERTs w/ 2PC', + { + '002_pgbench_concurrent_2pc' => q( + BEGIN; + INSERT INTO tbl VALUES(0); + PREPARE TRANSACTION 'c:client_id'; + COMMIT PREPARED 'c:client_id'; + ), + '002_pgbench_concurrent_2pc_savepoint' => q( + BEGIN; + SAVEPOINT s1; + INSERT INTO tbl VALUES(0); + PREPARE TRANSACTION 'c:client_id'; + COMMIT PREPARED 'c:client_id'; + ) + }); + +$pgbench_h->pump_nb; +$pgbench_h->finish(); +$result = + ($Config{osname} eq "MSWin32") + ? ($pgbench_h->full_results)[0] + : $pgbench_h->result(0); +is($result, 0, "pgbench with CIC works"); + +# done +$node->stop; +done_testing(); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2156de187c..f6e7fa71d8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -459,14 +459,24 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->pgprocno = gxact->pgprocno; SHMQueueElemInit(&(proc->links)); proc->waitStatus = PROC_WAIT_STATUS_OK; - /* We set up the gxact's VXID as InvalidBackendId/XID */ - proc->lxid = (LocalTransactionId) xid; + if (LocalTransactionIdIsValid(MyProc->lxid)) + { + /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */ + proc->lxid = MyProc->lxid; + proc->backendId = MyBackendId; + } + else + { + Assert(AmStartupProcess() || !IsPostmasterEnvironment); + /* GetLockConflicts() uses this to specify a wait on the XID */ + proc->lxid = xid; + proc->backendId = InvalidBackendId; + } proc->xid = xid; Assert(proc->xmin == InvalidTransactionId); proc->delayChkpt = false; proc->statusFlags = 0; proc->pid = 0; - proc->backendId = InvalidBackendId; proc->databaseId = databaseid; proc->roleId = owner; proc->tempNamespaceId = InvalidOid; @@ -846,6 +856,53 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) return result; } +/* + * TwoPhaseGetXidByVirtualXID + * Lookup VXID among xacts prepared since last startup. + * + * (This won't find recovered xacts.) If more than one matches, return any + * and set "have_more" to true. To witness multiple matches, a single + * BackendId must consume 2^32 LXIDs, with no intervening database restart. + */ +TransactionId +TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, + bool *have_more) +{ + int i; + TransactionId result = InvalidTransactionId; + + Assert(VirtualTransactionIdIsValid(vxid)); + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGPROC *proc; + VirtualTransactionId proc_vxid; + + if (!gxact->valid) + continue; + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + GET_VXID_FROM_PGPROC(proc_vxid, *proc); + if (VirtualTransactionIdEquals(vxid, proc_vxid)) + { + /* Startup process sets proc->backendId to InvalidBackendId. */ + Assert(!gxact->inredo); + + if (result != InvalidTransactionId) + { + *have_more = true; + break; + } + result = gxact->xid; + } + } + + LWLockRelease(TwoPhaseStateLock); + + return result; +} + /* * TwoPhaseGetDummyBackendId * Get the dummy backend ID for prepared transaction specified by XID diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d96881c0de..ca6f6d57d3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2506,6 +2506,13 @@ PrepareTransaction(void) /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; + /* + * Transfer our locks to a dummy PGPROC. This has to be done before + * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would + * conclude "xact already committed or aborted" for our locks. + */ + PostPrepare_Locks(xid); + /* * Let others know about no transaction in progress by me. This has to be * done *after* the prepared transaction has been marked valid, else @@ -2545,7 +2552,6 @@ PrepareTransaction(void) PostPrepare_MultiXact(xid); - PostPrepare_Locks(xid); PostPrepare_PredicateLocks(xid); ResourceOwnerRelease(TopTransactionResourceOwner, diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index cdf2266d6d..2db0424ad9 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -871,9 +871,10 @@ XactLockTableWaitErrorCb(void *arg) * To do this, obtain the current list of lockers, and wait on their VXIDs * until they are finished. * - * Note we don't try to acquire the locks on the given locktags, only the VXIDs - * of its lock holders; if somebody grabs a conflicting lock on the objects - * after we obtained our initial list of lockers, we will not wait for them. + * Note we don't try to acquire the locks on the given locktags, only the + * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock + * on the objects after we obtained our initial list of lockers, we will not + * wait for them. */ void WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress) diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 364654e106..c25af7fe09 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -2900,8 +2900,12 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) * The result array is palloc'd and is terminated with an invalid VXID. * *countp, if not null, is updated to the number of items set. * - * Of course, the result could be out of date by the time it's returned, - * so use of this function has to be thought about carefully. + * Of course, the result could be out of date by the time it's returned, so + * use of this function has to be thought about carefully. Similarly, a + * PGPROC with no "lxid" will be considered non-conflicting regardless of any + * lock it holds. Existing callers don't care about a locker after that + * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after + * pg_xact updates and before releasing locks. * * Note we never include the current xact's vxid in the result array, * since an xact never blocks itself. @@ -4529,37 +4533,80 @@ VirtualXactLockTableCleanup(void) } } +/* + * XactLockForVirtualXact + * + * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid, + * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those + * functions, it assumes "xid" is never a subtransaction and that "xid" is + * prepared, committed, or aborted. + * + * If !TransactionIdIsValid(xid), this locks every prepared XID having been + * known as "vxid" before its PREPARE TRANSACTION. + */ +static bool +XactLockForVirtualXact(VirtualTransactionId vxid, + TransactionId xid, bool wait) +{ + bool more = false; + + /* There is no point to wait for 2PCs if you have no 2PCs. */ + if (max_prepared_xacts == 0) + return true; + + do + { + LockAcquireResult lar; + LOCKTAG tag; + + /* Clear state from previous iterations. */ + if (more) + { + xid = InvalidTransactionId; + more = false; + } + + /* If we have no xid, try to find one. */ + if (!TransactionIdIsValid(xid)) + xid = TwoPhaseGetXidByVirtualXID(vxid, &more); + if (!TransactionIdIsValid(xid)) + { + Assert(!more); + return true; + } + + /* Check or wait for XID completion. */ + SET_LOCKTAG_TRANSACTION(tag, xid); + lar = LockAcquire(&tag, ShareLock, false, !wait); + if (lar == LOCKACQUIRE_NOT_AVAIL) + return false; + LockRelease(&tag, ShareLock, false); + } while (more); + + return true; +} + /* * VirtualXactLock * - * If wait = true, wait until the given VXID has been released, and then - * return true. + * If wait = true, wait as long as the given VXID or any XID acquired by the + * same transaction is still running. Then, return true. * - * If wait = false, just check whether the VXID is still running, and return - * true or false. + * If wait = false, just check whether that VXID or one of those XIDs is still + * running, and return true or false. */ bool VirtualXactLock(VirtualTransactionId vxid, bool wait) { LOCKTAG tag; PGPROC *proc; + TransactionId xid = InvalidTransactionId; Assert(VirtualTransactionIdIsValid(vxid)); - if (VirtualTransactionIdIsPreparedXact(vxid)) - { - LockAcquireResult lar; - - /* - * Prepared transactions don't hold vxid locks. The - * LocalTransactionId is always a normal, locked XID. - */ - SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId); - lar = LockAcquire(&tag, ShareLock, false, !wait); - if (lar != LOCKACQUIRE_NOT_AVAIL) - LockRelease(&tag, ShareLock, false); - return lar != LOCKACQUIRE_NOT_AVAIL; - } + if (VirtualTransactionIdIsRecoveredPreparedXact(vxid)) + /* no vxid lock; localTransactionId is a normal, locked XID */ + return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait); SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid); @@ -4573,7 +4620,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) */ proc = BackendIdGetProc(vxid.backendId); if (proc == NULL) - return true; + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); /* * We must acquire this lock before checking the backendId and lxid @@ -4582,12 +4629,12 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) */ LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); - /* If the transaction has ended, our work here is done. */ if (proc->backendId != vxid.backendId || proc->fpLocalTransactionId != vxid.localTransactionId) { + /* VXID ended */ LWLockRelease(&proc->fpInfoLock); - return true; + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); } /* @@ -4634,6 +4681,16 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) proc->fpVXIDLock = false; } + /* + * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID() + * search. The proc might have assigned this XID but not yet locked it, + * in which case the proc will lock this XID before releasing the VXID. + * The fpInfoLock critical section excludes VirtualXactLockTableCleanup(), + * so we won't save an XID of a different VXID. It doesn't matter whether + * we save this before or after setting up the primary lock table entry. + */ + xid = proc->xid; + /* Done with proc->fpLockBits */ LWLockRelease(&proc->fpInfoLock); @@ -4641,7 +4698,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) (void) LockAcquire(&tag, ShareLock, false, false); LockRelease(&tag, ShareLock, false); - return true; + return XactLockForVirtualXact(vxid, xid, wait); } /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index dd8586ab4d..777fab4915 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -65,6 +65,20 @@ * (XXX is it worth testing likewise for duplicate catcache flush entries? * Probably not.) * + * Many subsystems own higher-level caches that depend on relcache and/or + * catcache, and they register callbacks here to invalidate their caches. + * While building a higher-level cache entry, a backend may receive a + * callback for the being-built entry or one of its dependencies. This + * implies the new higher-level entry would be born stale, and it might + * remain stale for the life of the backend. Many caches do not prevent + * that. They rely on DDL for can't-miss catalog changes taking + * AccessExclusiveLock on suitable objects. (For a change made with less + * locking, backends might never read the change.) The relation cache, + * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later + * than the beginning of the next transaction. Hence, when a relevant + * invalidation callback arrives during a build, relcache.c reattempts that + * build. Caches with similar needs could do likewise. + * * If a relcache flush is issued for a system relation that we preload * from the relcache init file, we must also delete the init file so that * it will be rebuilt during the next backend restart. The actual work of diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index e27e1a8fe8..2d758422a7 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -34,6 +34,8 @@ extern void TwoPhaseShmemInit(void); extern void AtAbort_Twophase(void); extern void PostPrepare_Twophase(void); +extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, + bool *have_more); extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held); extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 9b2a421c32..a5286fab89 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -47,10 +47,11 @@ extern bool Debug_deadlocks; /* * Top-level transactions are identified by VirtualTransactionIDs comprising - * PGPROC fields backendId and lxid. For prepared transactions, the - * LocalTransactionId is an ordinary XID. These are guaranteed unique over - * the short term, but will be reused after a database restart or XID - * wraparound; hence they should never be stored on disk. + * PGPROC fields backendId and lxid. For recovered prepared transactions, the + * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never + * refers to that kind. These are guaranteed unique over the short term, but + * will be reused after a database restart or XID wraparound; hence they + * should never be stored on disk. * * Note that struct VirtualTransactionId can not be assumed to be atomically * assignable as a whole. However, type LocalTransactionId is assumed to @@ -70,7 +71,7 @@ typedef struct #define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId) #define VirtualTransactionIdIsValid(vxid) \ (LocalTransactionIdIsValid((vxid).localTransactionId)) -#define VirtualTransactionIdIsPreparedXact(vxid) \ +#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \ ((vxid).backendId == InvalidBackendId) #define VirtualTransactionIdEquals(vxid1, vxid2) \ ((vxid1).backendId == (vxid2).backendId && \