Don't force-assign transaction id when exporting a snapshot.

Previously we required every exported transaction to have an xid
assigned. That was used to check that the exporting transaction is
still running, which in turn is needed to guarantee that that
necessary rows haven't been removed in between exporting and importing
the snapshot.

The exported xid caused unnecessary problems with logical decoding,
because slot creation has to wait for all concurrent xid to finish,
which in turn serializes concurrent slot creation.   It also
prohibited snapshots to be exported on hot-standby replicas.

Instead export the virtual transactionid, which avoids the unnecessary
serialization and the inability to export snapshots on standbys. This
changes the file name of the exported snapshot, but since we never
documented what that one means, that seems ok.

Author: Petr Jelinek, slightly editorialized by me
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/f598b4b8-8cd7-0d54-0939-adda763d8c34@2ndquadrant.com
This commit is contained in:
Andres Freund 2017-06-14 11:57:21 -07:00
parent b6966d4627
commit 6c2003f8a1
7 changed files with 115 additions and 67 deletions

View File

@ -222,8 +222,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
000003A1-1
---------------------
00000003-0000001B-1
(1 row)
</programlisting>
@ -233,7 +233,7 @@ SELECT pg_export_snapshot();
<programlisting>
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT '000003A1-1';
SET TRANSACTION SNAPSHOT '00000003-0000001B-1';
</programlisting></para>
</refsect1>

View File

@ -262,7 +262,7 @@ static bool ExportInProgress = false;
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
static void SnapBuildFreeSnapshot(Snapshot snap);
@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap)
* and ->subxip/subxcnt values.
*/
static Snapshot
SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
SnapBuildBuildSnapshot(SnapBuild *builder)
{
Snapshot snapshot;
Size ssize;
@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
if (TransactionIdIsValid(MyPgXact->xmin))
elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
snap = SnapBuildBuildSnapshot(builder);
/*
* We know that snap->xmin is alive, enforced by the logical xmin
@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
if (builder->snapshot)
SnapBuildSnapDecRefcount(builder->snapshot);
builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
builder->snapshot = SnapBuildBuildSnapshot(builder);
/* we might need to execute invalidations, add snapshot */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
{
SnapBuildSnapDecRefcount(builder->snapshot);
}
builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
builder->snapshot = SnapBuildBuildSnapshot(builder);
SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferSetRestartPoint(builder->reorder, lsn);

View File

@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot)
* Returns TRUE if successful, FALSE if source xact is no longer running.
*/
bool
ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
ProcArrayInstallImportedXmin(TransactionId xmin,
VirtualTransactionId *sourcevxid)
{
bool result = false;
ProcArrayStruct *arrayP = procArray;
int index;
Assert(TransactionIdIsNormal(xmin));
if (!TransactionIdIsNormal(sourcexid))
if (!sourcevxid)
return false;
/* Get lock so source xact can't end while we're doing this */
@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
xid = pgxact->xid; /* fetch just once */
if (xid != sourcexid)
/* We are only interested in the specific virtual transaction. */
if (proc->backendId != sourcevxid->backendId)
continue;
if (proc->lxid != sourcevxid->localTransactionId)
continue;
/*

View File

@ -148,7 +148,7 @@
* predicate lock maintenance
* GetSerializableTransactionSnapshot(Snapshot snapshot)
* SetSerializableTransactionSnapshot(Snapshot snapshot,
* TransactionId sourcexid)
* VirtualTransactionId *sourcevxid)
* RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno,
@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
static void SummarizeOldestCommittedSxact(void);
static Snapshot GetSafeSnapshot(Snapshot snapshot);
static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
TransactionId sourcexid);
VirtualTransactionId *sourcevxid,
int sourcepid);
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
PREDICATELOCKTARGETTAG *parent);
@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot)
* one passed to it, but we avoid assuming that here.
*/
snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
InvalidTransactionId);
NULL, InvalidPid);
if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */
@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
return GetSafeSnapshot(snapshot);
return GetSerializableTransactionSnapshotInt(snapshot,
InvalidTransactionId);
NULL, InvalidPid);
}
/*
@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
*/
void
SetSerializableTransactionSnapshot(Snapshot snapshot,
TransactionId sourcexid)
VirtualTransactionId *sourcevxid,
int sourcepid)
{
Assert(IsolationIsSerializable());
@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
sourcepid);
}
/*
@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
*/
static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot,
TransactionId sourcexid)
VirtualTransactionId *sourcevxid,
int sourcepid)
{
PGPROC *proc;
VirtualTransactionId vxid;
@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
} while (!sxact);
/* Get the snapshot, or check that it's safe to use */
if (!TransactionIdIsValid(sourcexid))
if (!sourcevxid)
snapshot = GetSnapshotData(snapshot);
else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
{
ReleasePredXact(sxact);
LWLockRelease(SerializableXactHashLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
errdetail("The source transaction %u is not running anymore.",
sourcexid)));
errdetail("The source process with pid %d is not running anymore.",
sourcepid)));
}
/*

View File

@ -58,6 +58,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@ -211,11 +212,15 @@ static Snapshot FirstXactSnapshot = NULL;
/* Define pathname of exported-snapshot files */
#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
#define XactExportFilePath(path, xid, num, suffix) \
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
xid, num, suffix)
/* Current xact's exported snapshots (a list of Snapshot structs) */
/* Structure holding info about exported snapshot. */
typedef struct ExportedSnapshot
{
char *snapfile;
Snapshot snapshot;
} ExportedSnapshot;
/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
@ -558,8 +563,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot.
*/
static void
SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
PGPROC *sourceproc)
SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
int sourcepid, PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
@ -617,12 +622,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
errmsg("could not import the requested snapshot"),
errdetail("The source transaction is not running anymore.")));
}
else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
errdetail("The source transaction %u is not running anymore.",
sourcexid)));
errdetail("The source process with pid %d is not running anymore.",
sourcepid)));
/*
* In transaction-snapshot mode, the first snapshot must live until end of
@ -632,7 +637,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
if (IsolationUsesXactSnapshot())
{
if (IsolationIsSerializable())
SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
sourcepid);
/* Make a saved copy */
CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot;
@ -1075,33 +1081,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin)
*/
if (exportedSnapshots != NIL)
{
TransactionId myxid = GetTopTransactionId();
int i;
char buf[MAXPGPATH];
ListCell *lc;
/*
* Get rid of the files. Unlink failure is only a WARNING because (1)
* it's too late to abort the transaction, and (2) leaving a leaked
* file around has little real consequence anyway.
*/
for (i = 1; i <= list_length(exportedSnapshots); i++)
{
XactExportFilePath(buf, myxid, i, "");
if (unlink(buf))
elog(WARNING, "could not unlink file \"%s\": %m", buf);
}
/*
* As with the FirstXactSnapshot, we needn't spend any effort on
* cleaning up the per-snapshot data structures, but we do need to
* remove them from RegisteredSnapshots to prevent a warning below.
*
* We also also need to remove the snapshots from RegisteredSnapshots
* to prevent a warning below.
*
* As with the FirstXactSnapshot, we don't need to free resources of
* the snapshot iself as it will go away with the memory context.
*/
foreach(lc, exportedSnapshots)
{
Snapshot snap = (Snapshot) lfirst(lc);
ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
pairingheap_remove(&RegisteredSnapshots, &snap->ph_node);
if (unlink(esnap->snapfile))
elog(WARNING, "could not unlink file \"%s\": %m",
esnap->snapfile);
pairingheap_remove(&RegisteredSnapshots,
&esnap->snapshot->ph_node);
}
exportedSnapshots = NIL;
@ -1159,6 +1161,7 @@ ExportSnapshot(Snapshot snapshot)
{
TransactionId topXid;
TransactionId *children;
ExportedSnapshot *esnap;
int nchildren;
int addTopXid;
StringInfoData buf;
@ -1183,9 +1186,9 @@ ExportSnapshot(Snapshot snapshot)
*/
/*
* This will assign a transaction ID if we do not yet have one.
* Get our transaction ID if there is one, to include in the snapshot.
*/
topXid = GetTopTransactionId();
topXid = GetTopTransactionIdIfAny();
/*
* We cannot export a snapshot from a subtransaction because there's no
@ -1204,6 +1207,13 @@ ExportSnapshot(Snapshot snapshot)
*/
nchildren = xactGetCommittedChildren(&children);
/*
* Generate file path for the snapshot. We start numbering of snapshots
* inside the transaction from 1.
*/
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
/*
* Copy the snapshot into TopTransactionContext, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to
@ -1213,7 +1223,10 @@ ExportSnapshot(Snapshot snapshot)
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
exportedSnapshots = lappend(exportedSnapshots, snapshot);
esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
esnap->snapfile = pstrdup(path);
esnap->snapshot = snapshot;
exportedSnapshots = lappend(exportedSnapshots, esnap);
MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++;
@ -1226,7 +1239,8 @@ ExportSnapshot(Snapshot snapshot)
*/
initStringInfo(&buf);
appendStringInfo(&buf, "xid:%u\n", topXid);
appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
appendStringInfo(&buf, "pid:%d\n", MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
@ -1245,7 +1259,8 @@ ExportSnapshot(Snapshot snapshot)
* xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.)
*/
addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
addTopXid = (TransactionIdIsValid(topXid) &&
TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
for (i = 0; i < snapshot->xcnt; i++)
appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
@ -1276,7 +1291,7 @@ ExportSnapshot(Snapshot snapshot)
* ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check).
*/
XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR,
(errcode_for_file_access(),
@ -1298,8 +1313,6 @@ ExportSnapshot(Snapshot snapshot)
* Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix.
*/
XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
if (rename(pathtmp, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
@ -1384,6 +1397,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename)
return val;
}
static void
parseVxidFromText(const char *prefix, char **s, const char *filename,
VirtualTransactionId *vxid)
{
char *ptr = *s;
int prefixlen = strlen(prefix);
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (!ptr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
}
/*
* ImportSnapshot
* Import a previously exported snapshot. The argument should be a
@ -1399,7 +1436,8 @@ ImportSnapshot(const char *idstr)
char *filebuf;
int xcnt;
int i;
TransactionId src_xid;
VirtualTransactionId src_vxid;
int src_pid;
Oid src_dbid;
int src_isolevel;
bool src_readonly;
@ -1463,7 +1501,8 @@ ImportSnapshot(const char *idstr)
*/
memset(&snapshot, 0, sizeof(snapshot));
src_xid = parseXidFromText("xid:", &filebuf, path);
parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
src_pid = parseIntFromText("pid:", &filebuf, path);
/* we abuse parseXidFromText a bit here ... */
src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path);
@ -1513,7 +1552,7 @@ ImportSnapshot(const char *idstr)
* don't trouble to check the array elements, just the most critical
* fields.
*/
if (!TransactionIdIsNormal(src_xid) ||
if (!VirtualTransactionIdIsValid(src_vxid) ||
!OidIsValid(src_dbid) ||
!TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax))
@ -1554,7 +1593,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
SetTransactionSnapshot(&snapshot, src_xid, NULL);
SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
}
/*
@ -2141,5 +2180,5 @@ RestoreSnapshot(char *start_address)
void
RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
{
SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
}

View File

@ -14,6 +14,7 @@
#ifndef PREDICATE_H
#define PREDICATE_H
#include "storage/lock.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
/* predicate lock maintenance */
extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
TransactionId sourcexid);
VirtualTransactionId *sourcevxid,
int sourcepid);
extern void RegisterPredicateLockingXid(TransactionId xid);
extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);

View File

@ -82,7 +82,7 @@ extern int GetMaxSnapshotSubxidCount(void);
extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
TransactionId sourcexid);
VirtualTransactionId *sourcevxid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
extern RunningTransactions GetRunningTransactionData(void);