diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c72a611a39..7612cf5f04 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -196,6 +196,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferCleanupSerializedTXNs(const char *slotname); +static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, + TransactionId xid, XLogSegNo segno); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -214,7 +217,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t /* - * Allocate a new ReorderBuffer + * Allocate a new ReorderBuffer and clean out any old serialized state from + * prior ReorderBuffer instances for the same slot. */ ReorderBuffer * ReorderBufferAllocate(void) @@ -223,6 +227,8 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; + Assert(MyReplicationSlot != NULL); + /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -269,6 +275,13 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); + /* + * Ensure there's no stale data from prior uses of this slot, in case some + * prior exit avoided calling ReorderBufferFree. Failure to do this can + * produce duplicated txns, and it's very cheap if there's nothing there. + */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); + return buffer; } @@ -285,6 +298,9 @@ ReorderBufferFree(ReorderBuffer *rb) * memory context. */ MemoryContextDelete(context); + + /* Free disk space used by unconsumed reorder buffers */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); } /* @@ -2030,7 +2046,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; - char path[MAXPGPATH]; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -2058,21 +2073,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) { - XLogRecPtr recptr; + char path[MAXPGPATH]; if (fd != -1) CloseTransientFile(fd); XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); - XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr, wal_segment_size); /* * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + curOpenSegNo); /* open segment, create it if necessary */ fd = OpenTransientFile(path, @@ -2081,8 +2094,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (fd < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); + errmsg("could not open file \"%s\": %m", path))); } ReorderBufferSerializeChange(rb, txn, fd, change); @@ -2300,25 +2312,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, if (*fd == -1) { - XLogRecPtr recptr; char path[MAXPGPATH]; /* first time in */ if (*segno == 0) - { XLByteToSeg(txn->first_lsn, *segno, wal_segment_size); - } Assert(*segno != 0 || dlist_is_empty(&txn->changes)); - XLogSegNoOffsetToRecPtr(*segno, 0, recptr, wal_segment_size); /* * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + *segno); *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); if (*fd < 0 && errno == ENOENT) @@ -2332,7 +2339,6 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - } /* @@ -2554,13 +2560,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) for (cur = first; cur <= last; cur++) { char path[MAXPGPATH]; - XLogRecPtr recptr; - XLogSegNoOffsetToRecPtr(cur, 0, recptr, wal_segment_size); - - sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), txn->xid, - (uint32) (recptr >> 32), (uint32) recptr); + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur); if (unlink(path) != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), @@ -2568,6 +2569,63 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) } } +/* + * Remove any leftover serialized reorder buffers from a slot directory after a + * prior crash or decoding session exit. + */ +static void +ReorderBufferCleanupSerializedTXNs(const char *slotname) +{ + DIR *spill_dir; + struct dirent *spill_de; + struct stat statbuf; + char path[MAXPGPATH * 2 + 12]; + + sprintf(path, "pg_replslot/%s", slotname); + + /* we're only handling directories here, skip if it's not ours */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + spill_dir = AllocateDir(path); + while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) + { + /* only look at names that can be ours */ + if (strncmp(spill_de->d_name, "xid", 3) == 0) + { + snprintf(path, sizeof(path), + "pg_replslot/%s/%s", slotname, + spill_de->d_name); + + if (unlink(path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m", + path, slotname))); + } + } + FreeDir(spill_dir); +} + +/* + * Given a replication slot, transaction ID and segment number, fill in the + * corresponding spill file into 'path', which is a caller-owned buffer of size + * at least MAXPGPATH. + */ +static void +ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, + XLogSegNo segno) +{ + XLogRecPtr recptr; + + XLogSegNoOffsetToRecPtr(segno, 0, recptr, wal_segment_size); + + snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", + NameStr(MyReplicationSlot->data.name), + xid, + (uint32) (recptr >> 32), (uint32) recptr); +} + /* * Delete all data spilled to disk after we've restarted/crashed. It will be * recreated when the respective slots are reused. @@ -2578,15 +2636,9 @@ StartupReorderBuffer(void) DIR *logical_dir; struct dirent *logical_de; - DIR *spill_dir; - struct dirent *spill_de; - logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; - char path[MAXPGPATH * 2 + 12]; - if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; @@ -2599,33 +2651,7 @@ StartupReorderBuffer(void) * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ - sprintf(path, "pg_replslot/%s", logical_de->d_name); - - /* we're only creating directories here, skip if it's not our's */ - if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) - continue; - - spill_dir = AllocateDir(path); - while ((spill_de = ReadDir(spill_dir, path)) != NULL) - { - if (strcmp(spill_de->d_name, ".") == 0 || - strcmp(spill_de->d_name, "..") == 0) - continue; - - /* only look at names that can be ours */ - if (strncmp(spill_de->d_name, "xid", 3) == 0) - { - sprintf(path, "pg_replslot/%s/%s", logical_de->d_name, - spill_de->d_name); - - if (unlink(path) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", - path))); - } - } - FreeDir(spill_dir); + ReorderBufferCleanupSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); }