Fix CREATE INDEX CONCURRENTLY for the newest prepared transactions.

The purpose of commit 8a54e12a38 was to
fix this, and it sufficed when the PREPARE TRANSACTION completed before
the CIC looked for lock conflicts.  Otherwise, things still broke.  As
before, in a cluster having used CIC while having enabled prepared
transactions, queries that use the resulting index can silently fail to
find rows.  It may be necessary to reindex to recover from past
occurrences; REINDEX CONCURRENTLY suffices.  Fix this for future index
builds by making CIC wait for arbitrarily-recent prepared transactions
and for ordinary transactions that may yet PREPARE TRANSACTION.  As part
of that, have PREPARE TRANSACTION transfer locks to its dummy PGPROC
before it calls ProcArrayClearTransaction().  Back-patch to 9.6 (all
supported versions).

Andrey Borodin, reviewed (in earlier versions) by Andres Freund.

Discussion: https://postgr.es/m/01824242-AA92-4FE9-9BA7-AEBAFFEA3D0C@yandex-team.ru
This commit is contained in:
Noah Misch 2021-10-23 18:36:38 -07:00
parent e428699cb3
commit 5184932432
9 changed files with 466 additions and 36 deletions

View File

@ -410,14 +410,24 @@ MarkAsPreparing(TransactionId xid, const char *gid,
proc->pgprocno = gxact->pgprocno;
SHMQueueElemInit(&(proc->links));
proc->waitStatus = STATUS_OK;
/* We set up the gxact's VXID as InvalidBackendId/XID */
proc->lxid = (LocalTransactionId) xid;
if (LocalTransactionIdIsValid(MyProc->lxid))
{
/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
proc->lxid = MyProc->lxid;
proc->backendId = MyBackendId;
}
else
{
Assert(AmStartupProcess() || !IsPostmasterEnvironment);
/* GetLockConflicts() uses this to specify a wait on the XID */
proc->lxid = xid;
proc->backendId = InvalidBackendId;
}
pgxact->xid = xid;
pgxact->xmin = InvalidTransactionId;
pgxact->delayChkpt = false;
pgxact->vacuumFlags = 0;
proc->pid = 0;
proc->backendId = InvalidBackendId;
proc->databaseId = databaseid;
proc->roleId = owner;
proc->isBackgroundWorker = false;
@ -801,6 +811,50 @@ TwoPhaseGetGXact(TransactionId xid)
return result;
}
/*
* TwoPhaseGetXidByVirtualXID
* Lookup VXID among xacts prepared since last startup.
*
* (This won't find recovered xacts.) If more than one matches, return any
* and set "have_more" to true. To witness multiple matches, a single
* BackendId must consume 2^32 LXIDs, with no intervening database restart.
*/
TransactionId
TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
bool *have_more)
{
int i;
TransactionId result = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGPROC *proc;
VirtualTransactionId proc_vxid;
if (!gxact->valid)
continue;
proc = &ProcGlobal->allProcs[gxact->pgprocno];
GET_VXID_FROM_PGPROC(proc_vxid, *proc);
if (VirtualTransactionIdEquals(vxid, proc_vxid))
{
if (result != InvalidTransactionId)
{
*have_more = true;
break;
}
result = ProcGlobal->allPgXact[gxact->pgprocno].xid;
}
}
LWLockRelease(TwoPhaseStateLock);
return result;
}
/*
* TwoPhaseGetDummyProc
* Get the dummy backend ID for prepared transaction specified by XID

View File

@ -2368,6 +2368,13 @@ PrepareTransaction(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
/*
* Transfer our locks to a dummy PGPROC. This has to be done before
* ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
* conclude "xact already committed or aborted" for our locks.
*/
PostPrepare_Locks(xid);
/*
* Let others know about no transaction in progress by me. This has to be
* done *after* the prepared transaction has been marked valid, else
@ -2407,7 +2414,6 @@ PrepareTransaction(void)
PostPrepare_MultiXact(xid);
PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,

View File

@ -822,9 +822,10 @@ XactLockTableWaitErrorCb(void *arg)
* To do this, obtain the current list of lockers, and wait on their VXIDs
* until they are finished.
*
* Note we don't try to acquire the locks on the given locktags, only the VXIDs
* of its lock holders; if somebody grabs a conflicting lock on the objects
* after we obtained our initial list of lockers, we will not wait for them.
* Note we don't try to acquire the locks on the given locktags, only the
* VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
* on the objects after we obtained our initial list of lockers, we will not
* wait for them.
*/
void
WaitForLockersMultiple(List *locktags, LOCKMODE lockmode)

View File

@ -2787,8 +2787,12 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
*
* The result array is palloc'd and is terminated with an invalid VXID.
*
* Of course, the result could be out of date by the time it's returned,
* so use of this function has to be thought about carefully.
* Of course, the result could be out of date by the time it's returned, so
* use of this function has to be thought about carefully. Similarly, a
* PGPROC with no "lxid" will be considered non-conflicting regardless of any
* lock it holds. Existing callers don't care about a locker after that
* locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
* pg_xact updates and before releasing locks.
*
* Note we never include the current xact's vxid in the result array,
* since an xact never blocks itself.
@ -4405,37 +4409,80 @@ VirtualXactLockTableCleanup(void)
}
}
/*
* XactLockForVirtualXact
*
* If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
* NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
* functions, it assumes "xid" is never a subtransaction and that "xid" is
* prepared, committed, or aborted.
*
* If !TransactionIdIsValid(xid), this locks every prepared XID having been
* known as "vxid" before its PREPARE TRANSACTION.
*/
static bool
XactLockForVirtualXact(VirtualTransactionId vxid,
TransactionId xid, bool wait)
{
bool more = false;
/* There is no point to wait for 2PCs if you have no 2PCs. */
if (max_prepared_xacts == 0)
return true;
do
{
LockAcquireResult lar;
LOCKTAG tag;
/* Clear state from previous iterations. */
if (more)
{
xid = InvalidTransactionId;
more = false;
}
/* If we have no xid, try to find one. */
if (!TransactionIdIsValid(xid))
xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
if (!TransactionIdIsValid(xid))
{
Assert(!more);
return true;
}
/* Check or wait for XID completion. */
SET_LOCKTAG_TRANSACTION(tag, xid);
lar = LockAcquire(&tag, ShareLock, false, !wait);
if (lar == LOCKACQUIRE_NOT_AVAIL)
return false;
LockRelease(&tag, ShareLock, false);
} while (more);
return true;
}
/*
* VirtualXactLock
*
* If wait = true, wait until the given VXID has been released, and then
* return true.
* If wait = true, wait as long as the given VXID or any XID acquired by the
* same transaction is still running. Then, return true.
*
* If wait = false, just check whether the VXID is still running, and return
* true or false.
* If wait = false, just check whether that VXID or one of those XIDs is still
* running, and return true or false.
*/
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
LOCKTAG tag;
PGPROC *proc;
TransactionId xid = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
if (VirtualTransactionIdIsPreparedXact(vxid))
{
LockAcquireResult lar;
/*
* Prepared transactions don't hold vxid locks. The
* LocalTransactionId is always a normal, locked XID.
*/
SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
lar = LockAcquire(&tag, ShareLock, false, !wait);
if (lar != LOCKACQUIRE_NOT_AVAIL)
LockRelease(&tag, ShareLock, false);
return lar != LOCKACQUIRE_NOT_AVAIL;
}
if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
/* no vxid lock; localTransactionId is a normal, locked XID */
return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
@ -4449,7 +4496,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
*/
proc = BackendIdGetProc(vxid.backendId);
if (proc == NULL)
return true;
return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
/*
* We must acquire this lock before checking the backendId and lxid
@ -4458,12 +4505,12 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
*/
LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
/* If the transaction has ended, our work here is done. */
if (proc->backendId != vxid.backendId
|| proc->fpLocalTransactionId != vxid.localTransactionId)
{
/* VXID ended */
LWLockRelease(&proc->backendLock);
return true;
return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
}
/*
@ -4510,6 +4557,16 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
proc->fpVXIDLock = false;
}
/*
* If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
* search. The proc might have assigned this XID but not yet locked it,
* in which case the proc will lock this XID before releasing the VXID.
* The backendLock critical section excludes VirtualXactLockTableCleanup(),
* so we won't save an XID of a different VXID. It doesn't matter whether
* we save this before or after setting up the primary lock table entry.
*/
xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
/* Done with proc->fpLockBits */
LWLockRelease(&proc->backendLock);
@ -4517,7 +4574,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
(void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
return true;
return XactLockForVirtualXact(vxid, xid, wait);
}
/*

View File

@ -64,6 +64,20 @@
* (XXX is it worth testing likewise for duplicate catcache flush entries?
* Probably not.)
*
* Many subsystems own higher-level caches that depend on relcache and/or
* catcache, and they register callbacks here to invalidate their caches.
* While building a higher-level cache entry, a backend may receive a
* callback for the being-built entry or one of its dependencies. This
* implies the new higher-level entry would be born stale, and it might
* remain stale for the life of the backend. Many caches do not prevent
* that. They rely on DDL for can't-miss catalog changes taking
* AccessExclusiveLock on suitable objects. (For a change made with less
* locking, backends might never read the change.) The relation cache,
* however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
* than the beginning of the next transaction. Hence, when a relevant
* invalidation callback arrives during a build, relcache.c reattempts that
* build. Caches with similar needs could do likewise.
*
* If a relcache flush is issued for a system relation that we preload
* from the relcache init file, we must also delete the init file so that
* it will be rebuilt during the next backend restart. The actual work of

View File

@ -0,0 +1,205 @@
# Copyright (c) 2021, PostgreSQL Global Development Group
# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
use strict;
use warnings;
use Config;
use PostgresNode;
use TestLib;
use Test::More tests => 6;
my ($node, $result);
#
# Test set-up
#
$node = get_new_node('CIC_2PC_test');
$node->init;
$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
$node->start;
$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
$node->safe_psql(
'postgres', q(
CREATE FUNCTION heapallindexed() RETURNS void AS $$
DECLARE
count_seqscan int;
count_idxscan int;
BEGIN
count_seqscan := (SELECT count(*) FROM tbl);
SET enable_seqscan = off;
count_idxscan := (SELECT count(*) FROM tbl);
RESET enable_seqscan;
IF count_seqscan <> count_idxscan THEN
RAISE 'seqscan found % rows, but idxscan found % rows',
count_seqscan, count_idxscan;
END IF;
END
$$ LANGUAGE plpgsql;
));
#
# Run 3 overlapping 2PC transactions with CIC
#
# We have two concurrent background psql processes: $main_h for INSERTs and
# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED
# statements.
#
my $main_in = '';
my $main_out = '';
my $main_timer = IPC::Run::timeout(180);
my $main_h =
$node->background_psql('postgres', \$main_in, \$main_out,
$main_timer, on_error_stop => 1);
$main_in .= q(
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint1
);
pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
my $cic_in = '';
my $cic_out = '';
my $cic_timer = IPC::Run::timeout(180);
my $cic_h =
$node->background_psql('postgres', \$cic_in, \$cic_out,
$cic_timer, on_error_stop => 1);
$cic_in .= q(
\echo start
CREATE INDEX CONCURRENTLY idx ON tbl(i);
);
pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
$main_in .= q(
PREPARE TRANSACTION 'a';
);
$main_in .= q(
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint2
);
pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
$main_in .= q(
PREPARE TRANSACTION 'b';
BEGIN;
INSERT INTO tbl VALUES(0);
\echo syncpoint3
);
pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
$main_in .= q(
PREPARE TRANSACTION 'c';
COMMIT PREPARED 'c';
);
$main_h->pump_nb;
$main_h->finish;
$cic_h->finish;
$result = $node->psql('postgres',
q(BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT heapallindexed()));
is($result, '0', 'all indexed after overlapping 2PC');
#
# Server restart shall not change whether prepared xact blocks CIC
#
$node->safe_psql(
'postgres', q(
BEGIN;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'spans_restart';
BEGIN;
CREATE TABLE unused ();
PREPARE TRANSACTION 'persists_forever';
));
$node->restart;
my $reindex_in = '';
my $reindex_out = '';
my $reindex_timer = IPC::Run::timeout(180);
my $reindex_h =
$node->background_psql('postgres', \$reindex_in, \$reindex_out,
$reindex_timer, on_error_stop => 1);
$reindex_in .= q(
\echo start
DROP INDEX CONCURRENTLY idx;
CREATE INDEX CONCURRENTLY idx ON tbl(i);
);
pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
$reindex_h->finish;
$result = $node->psql('postgres',
q(BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT heapallindexed()));
is($result, '0', 'all indexed after 2PC and restart');
#
# Stress CIC+2PC with pgbench
#
# Fix broken index first
$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
# Run background pgbench with CIC. We cannot mix-in this script into single
# pgbench: CIC will deadlock with itself occasionally.
my $pgbench_out = '';
my $pgbench_timer = IPC::Run::timeout(180);
my $pgbench_h = $node->background_pgbench(
'--no-vacuum --client=1 --transactions=100',
{
'002_pgbench_concurrent_cic' => q(
DROP INDEX CONCURRENTLY idx;
CREATE INDEX CONCURRENTLY idx ON tbl(i);
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT heapallindexed();
ROLLBACK;
)
},
\$pgbench_out,
$pgbench_timer);
# Run pgbench.
$node->pgbench(
'--no-vacuum --client=5 --transactions=100',
0,
[qr{actually processed}],
[qr{^$}],
'concurrent INSERTs w/ 2PC',
{
'002_pgbench_concurrent_2pc' => q(
BEGIN;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'c:client_id';
COMMIT PREPARED 'c:client_id';
),
'002_pgbench_concurrent_2pc_savepoint' => q(
BEGIN;
SAVEPOINT s1;
INSERT INTO tbl VALUES(0);
PREPARE TRANSACTION 'c:client_id';
COMMIT PREPARED 'c:client_id';
)
});
$pgbench_h->pump_nb;
$pgbench_h->finish();
unlike($pgbench_out, qr/aborted in/, "pgbench with CIC works");
# done
$node->stop;
done_testing();

View File

@ -33,6 +33,8 @@ extern void TwoPhaseShmemInit(void);
extern void AtAbort_Twophase(void);
extern void PostPrepare_Twophase(void);
extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
bool *have_more);
extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid);
extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid);

View File

@ -47,10 +47,11 @@ extern bool Debug_deadlocks;
/*
* Top-level transactions are identified by VirtualTransactionIDs comprising
* PGPROC fields backendId and lxid. For prepared transactions, the
* LocalTransactionId is an ordinary XID. These are guaranteed unique over
* the short term, but will be reused after a database restart or XID
* wraparound; hence they should never be stored on disk.
* PGPROC fields backendId and lxid. For recovered prepared transactions, the
* LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
* refers to that kind. These are guaranteed unique over the short term, but
* will be reused after a database restart or XID wraparound; hence they
* should never be stored on disk.
*
* Note that struct VirtualTransactionId can not be assumed to be atomically
* assignable as a whole. However, type LocalTransactionId is assumed to
@ -70,7 +71,7 @@ typedef struct
#define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
#define VirtualTransactionIdIsValid(vxid) \
(LocalTransactionIdIsValid((vxid).localTransactionId))
#define VirtualTransactionIdIsPreparedXact(vxid) \
#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
((vxid).backendId == InvalidBackendId)
#define VirtualTransactionIdEquals(vxid1, vxid2) \
((vxid1).backendId == (vxid2).backendId && \

View File

@ -1340,6 +1340,96 @@ sub psql
}
}
=pod
=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, which the
caller may use to send input to B<psql>. The process's stdin is sourced from
the $stdin scalar reference, and its stdout and stderr go to the $stdout
scalar reference. This allows the caller to act on other parts of the system
while idling this backend.
The specified timer object is attached to the harness, as well. It's caller's
responsibility to select the timeout length, and to restart the timer after
each command if the timeout is per-command.
psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
disabled. That may be overridden by passing extra psql parameters.
Dies on failure to invoke psql, or if psql fails to connect. Errors occurring
later are the caller's problem. psql runs with on_error_stop by default so
that it will stop running sql and return 3 if passed SQL results in an error.
Be sure to "finish" the harness when done with it.
=over
=item on_error_stop => 1
By default, the B<psql> method invokes the B<psql> program with ON_ERROR_STOP=1
set, so SQL execution is stopped at the first error and exit code 3 is
returned. Set B<on_error_stop> to 0 to ignore errors instead.
=item replication => B<value>
If set, add B<replication=value> to the conninfo string.
Passing the literal value C<database> results in a logical replication
connection.
=item extra_params => ['--single-transaction']
If given, it must be an array reference containing additional parameters to B<psql>.
=back
=cut
sub background_psql
{
my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
local $ENV{PGHOST} = $self->host;
local $ENV{PGPORT} = $self->port;
my $replication = $params{replication};
my @psql_params = (
'psql',
'-XAtq',
'-d',
$self->connstr($dbname)
. (defined $replication ? " replication=$replication" : ""),
'-f',
'-');
$params{on_error_stop} = 1 unless defined $params{on_error_stop};
push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop};
push @psql_params, @{ $params{extra_params} }
if defined $params{extra_params};
# Ensure there is no data waiting to be sent:
$$stdin = "" if ref($stdin);
# IPC::Run would otherwise append to existing contents:
$$stdout = "" if ref($stdout);
my $harness = IPC::Run::start \@psql_params,
'<', $stdin, '>', $stdout, $timer;
# Request some output, and pump until we see it. This means that psql
# connection failures are caught here, relieving callers of the need to
# handle those. (Right now, we have no particularly good handling for
# errors anyway, but that might be added later.)
my $banner = "background_psql: ready";
$$stdin = "\\echo $banner\n";
pump $harness until $$stdout =~ /$banner/ || $timer->is_expired;
die "psql startup timed out" if $timer->is_expired;
return $harness;
}
# Common sub of pgbench-invoking interfaces. Makes any requested script files
# and returns pgbench command-line options causing use of those files.
sub _pgbench_make_files