diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml index ca55a5b196..188d2ed92e 100644 --- a/doc/src/sgml/ref/set_transaction.sgml +++ b/doc/src/sgml/ref/set_transaction.sgml @@ -222,8 +222,8 @@ SET SESSION CHARACTERISTICS AS TRANSACTION transa BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; SELECT pg_export_snapshot(); pg_export_snapshot --------------------- - 000003A1-1 +--------------------- + 00000003-0000001B-1 (1 row) @@ -233,7 +233,7 @@ SELECT pg_export_snapshot(); BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; -SET TRANSACTION SNAPSHOT '000003A1-1'; +SET TRANSACTION SNAPSHOT '00000003-0000001B-1'; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 8848f5b4ec..e06aa0992a 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -262,7 +262,7 @@ static bool ExportInProgress = false; static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ -static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid); +static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); static void SnapBuildFreeSnapshot(Snapshot snap); @@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap) * and ->subxip/subxcnt values. */ static Snapshot -SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) +SnapBuildBuildSnapshot(SnapBuild *builder) { Snapshot snapshot; Size ssize; @@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) if (TransactionIdIsValid(MyPgXact->xmin)) elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid"); - snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); + snap = SnapBuildBuildSnapshot(builder); /* * We know that snap->xmin is alive, enforced by the logical xmin @@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, if (builder->snapshot) SnapBuildSnapDecRefcount(builder->snapshot); - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* we might need to execute invalidations, add snapshot */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) @@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) { SnapBuildSnapDecRefcount(builder->snapshot); } - builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId); + builder->snapshot = SnapBuildBuildSnapshot(builder); SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetRestartPoint(builder->reorder, lsn); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8a71536791..dfddfc4002 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1793,14 +1793,15 @@ GetSnapshotData(Snapshot snapshot) * Returns TRUE if successful, FALSE if source xact is no longer running. */ bool -ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) +ProcArrayInstallImportedXmin(TransactionId xmin, + VirtualTransactionId *sourcevxid) { bool result = false; ProcArrayStruct *arrayP = procArray; int index; Assert(TransactionIdIsNormal(xmin)); - if (!TransactionIdIsNormal(sourcexid)) + if (!sourcevxid) return false; /* Get lock so source xact can't end while we're doing this */ @@ -1817,8 +1818,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) if (pgxact->vacuumFlags & PROC_IN_VACUUM) continue; - xid = pgxact->xid; /* fetch just once */ - if (xid != sourcexid) + /* We are only interested in the specific virtual transaction. */ + if (proc->backendId != sourcevxid->backendId) + continue; + if (proc->lxid != sourcevxid->localTransactionId) continue; /* diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 27c4af91cb..bce505a3fa 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -148,7 +148,7 @@ * predicate lock maintenance * GetSerializableTransactionSnapshot(Snapshot snapshot) * SetSerializableTransactionSnapshot(Snapshot snapshot, - * TransactionId sourcexid) + * VirtualTransactionId *sourcevxid) * RegisterPredicateLockingXid(void) * PredicateLockRelation(Relation relation, Snapshot snapshot) * PredicateLockPage(Relation relation, BlockNumber blkno, @@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid); + VirtualTransactionId *sourcevxid, + int sourcepid); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); @@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot) * one passed to it, but we avoid assuming that here. */ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, - InvalidTransactionId); + NULL, InvalidPid); if (MySerializableXact == InvalidSerializableXact) return snapshot; /* no concurrent r/w xacts; it's safe */ @@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) return GetSafeSnapshot(snapshot); return GetSerializableTransactionSnapshotInt(snapshot, - InvalidTransactionId); + NULL, InvalidPid); } /* @@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) */ void SetSerializableTransactionSnapshot(Snapshot snapshot, - TransactionId sourcexid) + VirtualTransactionId *sourcevxid, + int sourcepid) { Assert(IsolationIsSerializable()); @@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); - (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid); + (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid, + sourcepid); } /* @@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, */ static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid) + VirtualTransactionId *sourcevxid, + int sourcepid) { PGPROC *proc; VirtualTransactionId vxid; @@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, } while (!sxact); /* Get the snapshot, or check that it's safe to use */ - if (!TransactionIdIsValid(sourcexid)) + if (!sourcevxid) snapshot = GetSnapshotData(snapshot); - else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid)) + else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid)) { ReleasePredXact(sxact); LWLockRelease(SerializableXactHashLock); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), - errdetail("The source transaction %u is not running anymore.", - sourcexid))); + errdetail("The source process with pid %d is not running anymore.", + sourcepid))); } /* diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index b3d4fe3ae2..2b6fca9241 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -58,6 +58,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinval.h" +#include "storage/sinvaladt.h" #include "storage/spin.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -211,11 +212,15 @@ static Snapshot FirstXactSnapshot = NULL; /* Define pathname of exported-snapshot files */ #define SNAPSHOT_EXPORT_DIR "pg_snapshots" -#define XactExportFilePath(path, xid, num, suffix) \ - snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \ - xid, num, suffix) -/* Current xact's exported snapshots (a list of Snapshot structs) */ +/* Structure holding info about exported snapshot. */ +typedef struct ExportedSnapshot +{ + char *snapfile; + Snapshot snapshot; +} ExportedSnapshot; + +/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */ static List *exportedSnapshots = NIL; /* Prototypes for local functions */ @@ -558,8 +563,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, - PGPROC *sourceproc) +SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, + int sourcepid, PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -617,12 +622,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, errmsg("could not import the requested snapshot"), errdetail("The source transaction is not running anymore."))); } - else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), - errdetail("The source transaction %u is not running anymore.", - sourcexid))); + errdetail("The source process with pid %d is not running anymore.", + sourcepid))); /* * In transaction-snapshot mode, the first snapshot must live until end of @@ -632,7 +637,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, if (IsolationUsesXactSnapshot()) { if (IsolationIsSerializable()) - SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid); + SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid, + sourcepid); /* Make a saved copy */ CurrentSnapshot = CopySnapshot(CurrentSnapshot); FirstXactSnapshot = CurrentSnapshot; @@ -1075,33 +1081,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin) */ if (exportedSnapshots != NIL) { - TransactionId myxid = GetTopTransactionId(); - int i; - char buf[MAXPGPATH]; ListCell *lc; /* * Get rid of the files. Unlink failure is only a WARNING because (1) * it's too late to abort the transaction, and (2) leaving a leaked * file around has little real consequence anyway. - */ - for (i = 1; i <= list_length(exportedSnapshots); i++) - { - XactExportFilePath(buf, myxid, i, ""); - if (unlink(buf)) - elog(WARNING, "could not unlink file \"%s\": %m", buf); - } - - /* - * As with the FirstXactSnapshot, we needn't spend any effort on - * cleaning up the per-snapshot data structures, but we do need to - * remove them from RegisteredSnapshots to prevent a warning below. + * + * We also also need to remove the snapshots from RegisteredSnapshots + * to prevent a warning below. + * + * As with the FirstXactSnapshot, we don't need to free resources of + * the snapshot iself as it will go away with the memory context. */ foreach(lc, exportedSnapshots) { - Snapshot snap = (Snapshot) lfirst(lc); + ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc); - pairingheap_remove(&RegisteredSnapshots, &snap->ph_node); + if (unlink(esnap->snapfile)) + elog(WARNING, "could not unlink file \"%s\": %m", + esnap->snapfile); + + pairingheap_remove(&RegisteredSnapshots, + &esnap->snapshot->ph_node); } exportedSnapshots = NIL; @@ -1159,6 +1161,7 @@ ExportSnapshot(Snapshot snapshot) { TransactionId topXid; TransactionId *children; + ExportedSnapshot *esnap; int nchildren; int addTopXid; StringInfoData buf; @@ -1183,9 +1186,9 @@ ExportSnapshot(Snapshot snapshot) */ /* - * This will assign a transaction ID if we do not yet have one. + * Get our transaction ID if there is one, to include in the snapshot. */ - topXid = GetTopTransactionId(); + topXid = GetTopTransactionIdIfAny(); /* * We cannot export a snapshot from a subtransaction because there's no @@ -1204,6 +1207,13 @@ ExportSnapshot(Snapshot snapshot) */ nchildren = xactGetCommittedChildren(&children); + /* + * Generate file path for the snapshot. We start numbering of snapshots + * inside the transaction from 1. + */ + snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d", + MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1); + /* * Copy the snapshot into TopTransactionContext, add it to the * exportedSnapshots list, and mark it pseudo-registered. We do this to @@ -1213,7 +1223,10 @@ ExportSnapshot(Snapshot snapshot) snapshot = CopySnapshot(snapshot); oldcxt = MemoryContextSwitchTo(TopTransactionContext); - exportedSnapshots = lappend(exportedSnapshots, snapshot); + esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot)); + esnap->snapfile = pstrdup(path); + esnap->snapshot = snapshot; + exportedSnapshots = lappend(exportedSnapshots, esnap); MemoryContextSwitchTo(oldcxt); snapshot->regd_count++; @@ -1226,7 +1239,8 @@ ExportSnapshot(Snapshot snapshot) */ initStringInfo(&buf); - appendStringInfo(&buf, "xid:%u\n", topXid); + appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid); + appendStringInfo(&buf, "pid:%d\n", MyProcPid); appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); appendStringInfo(&buf, "ro:%d\n", XactReadOnly); @@ -1245,7 +1259,8 @@ ExportSnapshot(Snapshot snapshot) * xmax. (We need not make the same check for subxip[] members, see * snapshot.h.) */ - addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0; + addTopXid = (TransactionIdIsValid(topXid) && + TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0; appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid); for (i = 0; i < snapshot->xcnt; i++) appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]); @@ -1276,7 +1291,7 @@ ExportSnapshot(Snapshot snapshot) * ensures that no other backend can read an incomplete file * (ImportSnapshot won't allow it because of its valid-characters check). */ - XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp"); + snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path); if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) ereport(ERROR, (errcode_for_file_access(), @@ -1298,8 +1313,6 @@ ExportSnapshot(Snapshot snapshot) * Now that we have written everything into a .tmp file, rename the file * to remove the .tmp suffix. */ - XactExportFilePath(path, topXid, list_length(exportedSnapshots), ""); - if (rename(pathtmp, path) < 0) ereport(ERROR, (errcode_for_file_access(), @@ -1384,6 +1397,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename) return val; } +static void +parseVxidFromText(const char *prefix, char **s, const char *filename, + VirtualTransactionId *vxid) +{ + char *ptr = *s; + int prefixlen = strlen(prefix); + + if (strncmp(ptr, prefix, prefixlen) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr += prefixlen; + if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr = strchr(ptr, '\n'); + if (!ptr) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + *s = ptr + 1; +} + /* * ImportSnapshot * Import a previously exported snapshot. The argument should be a @@ -1399,7 +1436,8 @@ ImportSnapshot(const char *idstr) char *filebuf; int xcnt; int i; - TransactionId src_xid; + VirtualTransactionId src_vxid; + int src_pid; Oid src_dbid; int src_isolevel; bool src_readonly; @@ -1463,7 +1501,8 @@ ImportSnapshot(const char *idstr) */ memset(&snapshot, 0, sizeof(snapshot)); - src_xid = parseXidFromText("xid:", &filebuf, path); + parseVxidFromText("vxid:", &filebuf, path, &src_vxid); + src_pid = parseIntFromText("pid:", &filebuf, path); /* we abuse parseXidFromText a bit here ... */ src_dbid = parseXidFromText("dbid:", &filebuf, path); src_isolevel = parseIntFromText("iso:", &filebuf, path); @@ -1513,7 +1552,7 @@ ImportSnapshot(const char *idstr) * don't trouble to check the array elements, just the most critical * fields. */ - if (!TransactionIdIsNormal(src_xid) || + if (!VirtualTransactionIdIsValid(src_vxid) || !OidIsValid(src_dbid) || !TransactionIdIsNormal(snapshot.xmin) || !TransactionIdIsNormal(snapshot.xmax)) @@ -1554,7 +1593,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid, NULL); + SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL); } /* @@ -2141,5 +2180,5 @@ RestoreSnapshot(char *start_address) void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) { - SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); + SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc); } diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 8f9ea29917..941ba7119e 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_H #define PREDICATE_H +#include "storage/lock.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -46,7 +47,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno); /* predicate lock maintenance */ extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot); extern void SetSerializableTransactionSnapshot(Snapshot snapshot, - TransactionId sourcexid); + VirtualTransactionId *sourcevxid, + int sourcepid); extern void RegisterPredicateLockingXid(TransactionId xid); extern void PredicateLockRelation(Relation relation, Snapshot snapshot); extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 22955a79dd..5cf8ff7538 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -82,7 +82,7 @@ extern int GetMaxSnapshotSubxidCount(void); extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, - TransactionId sourcexid); + VirtualTransactionId *sourcevxid); extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); extern RunningTransactions GetRunningTransactionData(void);