diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e3b11daa89..8b1772db69 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg) logicalrep_worker_detach(); + /* Cleanup filesets used for streaming transactions. */ + logicalrep_worker_cleanupfileset(); + ApplyLauncherWakeup(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 295b1e06de..bfb7d1a261 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -39,13 +39,13 @@ * BufFile infrastructure supports temporary files that exceed the OS file size * limit, (b) provides a way for automatic clean up on the error and (c) provides * a way to survive these files across local transactions and allow to open and - * close at stream start and close. We decided to use SharedFileSet + * close at stream start and close. We decided to use FileSet * infrastructure as without that it deletes the files on the closure of the * file and if we decide to keep stream files open across the start/stop stream * then it will consume a lot of memory (more than 8K for each BufFile and * there could be multiple such BufFiles as the subscriber could receive * multiple start/stop streams for different transactions before getting the - * commit). Moreover, if we don't use SharedFileSet then we also need to invent + * commit). Moreover, if we don't use FileSet then we also need to invent * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. @@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg = typedef struct StreamXidHash { TransactionId xid; /* xid is the hash key and must be first */ - SharedFileSet *stream_fileset; /* shared file set for stream data */ - SharedFileSet *subxact_fileset; /* shared file set for subxact info */ + FileSet *stream_fileset; /* file set for stream data */ + FileSet *subxact_fileset; /* file set for subxact info */ } StreamXidHash; static MemoryContext ApplyMessageContext = NULL; @@ -270,8 +270,8 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* - * Hash table for storing the streaming xid information along with shared file - * set for streaming and subxact files. + * Hash table for storing the streaming xid information along with filesets + * for streaming and subxact files. */ static HTAB *xidhash = NULL; @@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s) /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); /* OK, truncate the file at the right offset */ - BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno, - subxact_data.subxacts[subidx].offset); + BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, + subxact_data.subxacts[subidx].offset); BufFileClose(fd); /* discard the subxacts added later */ @@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) errmsg_internal("transaction %u not found in stream XID hash table", xid))); - fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } +/* + * Cleanup filesets. + */ +void +logicalrep_worker_cleanupfileset(void) +{ + HASH_SEQ_STATUS status; + StreamXidHash *hentry; + + /* Remove all the pending stream and subxact filesets. */ + if (xidhash) + { + hash_seq_init(&status, xidhash); + while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) + { + FileSetDeleteAll(hentry->stream_fileset); + + /* Delete the subxact fileset iff it is created. */ + if (hentry->subxact_fileset) + FileSetDeleteAll(hentry->subxact_fileset); + } + } +} + /* * Apply main loop. */ @@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid) if (ent->subxact_fileset) { cleanup_subxact_info(); - SharedFileSetDeleteAll(ent->subxact_fileset); + FileSetDeleteAll(ent->subxact_fileset); pfree(ent->subxact_fileset); ent->subxact_fileset = NULL; } @@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid) MemoryContext oldctx; /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. + * We need to maintain fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. */ oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(SharedFileSet)); - SharedFileSetInit(ent->subxact_fileset, NULL); + ent->subxact_fileset = palloc(sizeof(FileSet)); + FileSetInit(ent->subxact_fileset); MemoryContextSwitchTo(oldctx); - fd = BufFileCreateShared(ent->subxact_fileset, path); + fd = BufFileCreateFileSet(ent->subxact_fileset, path); } else - fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid) subxact_filename(path, subid, xid); - fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid) /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); - SharedFileSetDeleteAll(ent->stream_fileset); + FileSetDeleteAll(ent->stream_fileset); pfree(ent->stream_fileset); ent->stream_fileset = NULL; @@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid) if (ent->subxact_fileset) { subxact_filename(path, subid, xid); - SharedFileSetDeleteAll(ent->subxact_fileset); + FileSetDeleteAll(ent->subxact_fileset); pfree(ent->subxact_fileset); ent->subxact_fileset = NULL; } @@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the shared fileset and create the - * buffile, otherwise open the previously created file. + * changes for this transaction, initialize the fileset and create the buffile, + * otherwise open the previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) if (first_segment) { MemoryContext savectx; - SharedFileSet *fileset; + FileSet *fileset; if (found) ereport(ERROR, @@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. + * We need to maintain fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. */ savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(SharedFileSet)); + fileset = palloc(sizeof(FileSet)); - SharedFileSetInit(fileset, NULL); + FileSetInit(fileset); MemoryContextSwitchTo(savectx); - stream_fd = BufFileCreateShared(fileset, path); + stream_fd = BufFileCreateFileSet(fileset, path); /* Remember the fileset for the next stream of the same transaction */ ent->xid = xid; @@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/Makefile b/src/backend/storage/file/Makefile index 5e1291bf2d..660ac51807 100644 --- a/src/backend/storage/file/Makefile +++ b/src/backend/storage/file/Makefile @@ -16,6 +16,7 @@ OBJS = \ buffile.o \ copydir.o \ fd.o \ + fileset.o \ reinit.o \ sharedfileset.o diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index a4be5fe513..5e5409d84d 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -39,7 +39,7 @@ * BufFile also supports temporary files that can be used by the single backend * when the corresponding files need to be survived across the transaction and * need to be opened and closed multiple times. Such files need to be created - * as a member of a SharedFileSet. + * as a member of a FileSet. *------------------------------------------------------------------------- */ @@ -77,8 +77,8 @@ struct BufFile bool dirty; /* does buffer need to be written? */ bool readOnly; /* has the file been set to read only? */ - SharedFileSet *fileset; /* space for segment files if shared */ - const char *name; /* name of this BufFile if shared */ + FileSet *fileset; /* space for fileset based segment files */ + const char *name; /* name of fileset based BufFile */ /* * resowner is the ResourceOwner to use for underlying temp files. (We @@ -104,7 +104,7 @@ static void extendBufFile(BufFile *file); static void BufFileLoadBuffer(BufFile *file); static void BufFileDumpBuffer(BufFile *file); static void BufFileFlush(BufFile *file); -static File MakeNewSharedSegment(BufFile *file, int segment); +static File MakeNewFileSetSegment(BufFile *file, int segment); /* * Create BufFile and perform the common initialization. @@ -160,7 +160,7 @@ extendBufFile(BufFile *file) if (file->fileset == NULL) pfile = OpenTemporaryFile(file->isInterXact); else - pfile = MakeNewSharedSegment(file, file->numFiles); + pfile = MakeNewFileSetSegment(file, file->numFiles); Assert(pfile >= 0); @@ -214,34 +214,34 @@ BufFileCreateTemp(bool interXact) * Build the name for a given segment of a given BufFile. */ static void -SharedSegmentName(char *name, const char *buffile_name, int segment) +FileSetSegmentName(char *name, const char *buffile_name, int segment) { snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment); } /* - * Create a new segment file backing a shared BufFile. + * Create a new segment file backing a fileset based BufFile. */ static File -MakeNewSharedSegment(BufFile *buffile, int segment) +MakeNewFileSetSegment(BufFile *buffile, int segment) { char name[MAXPGPATH]; File file; /* * It is possible that there are files left over from before a crash - * restart with the same name. In order for BufFileOpenShared() not to + * restart with the same name. In order for BufFileOpenFileSet() not to * get confused about how many segments there are, we'll unlink the next * segment number if it already exists. */ - SharedSegmentName(name, buffile->name, segment + 1); - SharedFileSetDelete(buffile->fileset, name, true); + FileSetSegmentName(name, buffile->name, segment + 1); + FileSetDelete(buffile->fileset, name, true); /* Create the new segment. */ - SharedSegmentName(name, buffile->name, segment); - file = SharedFileSetCreate(buffile->fileset, name); + FileSetSegmentName(name, buffile->name, segment); + file = FileSetCreate(buffile->fileset, name); - /* SharedFileSetCreate would've errored out */ + /* FileSetCreate would've errored out */ Assert(file > 0); return file; @@ -251,15 +251,15 @@ MakeNewSharedSegment(BufFile *buffile, int segment) * Create a BufFile that can be discovered and opened read-only by other * backends that are attached to the same SharedFileSet using the same name. * - * The naming scheme for shared BufFiles is left up to the calling code. The - * name will appear as part of one or more filenames on disk, and might + * The naming scheme for fileset based BufFiles is left up to the calling code. + * The name will appear as part of one or more filenames on disk, and might * provide clues to administrators about which subsystem is generating * temporary file data. Since each SharedFileSet object is backed by one or * more uniquely named temporary directory, names don't conflict with * unrelated SharedFileSet objects. */ BufFile * -BufFileCreateShared(SharedFileSet *fileset, const char *name) +BufFileCreateFileSet(FileSet *fileset, const char *name) { BufFile *file; @@ -267,7 +267,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) file->fileset = fileset; file->name = pstrdup(name); file->files = (File *) palloc(sizeof(File)); - file->files[0] = MakeNewSharedSegment(file, 0); + file->files[0] = MakeNewFileSetSegment(file, 0); file->readOnly = false; return file; @@ -275,13 +275,13 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) /* * Open a file that was previously created in another backend (or this one) - * with BufFileCreateShared in the same SharedFileSet using the same name. + * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or - * BufFileExportShared() to make sure that it is ready to be opened by other + * BufFileExportFileSet() to make sure that it is ready to be opened by other * backends and render it read-only. */ BufFile * -BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) { BufFile *file; char segment_name[MAXPGPATH]; @@ -304,8 +304,8 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode) files = repalloc(files, sizeof(File) * capacity); } /* Try to load a segment. */ - SharedSegmentName(segment_name, name, nfiles); - files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode); + FileSetSegmentName(segment_name, name, nfiles); + files[nfiles] = FileSetOpen(fileset, segment_name, mode); if (files[nfiles] <= 0) break; ++nfiles; @@ -333,18 +333,18 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode) } /* - * Delete a BufFile that was created by BufFileCreateShared in the given - * SharedFileSet using the given name. + * Delete a BufFile that was created by BufFileCreateFileSet in the given + * FileSet using the given name. * * It is not necessary to delete files explicitly with this function. It is * provided only as a way to delete files proactively, rather than waiting for - * the SharedFileSet to be cleaned up. + * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know * that it exists and has been exported or closed. */ void -BufFileDeleteShared(SharedFileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name) { char segment_name[MAXPGPATH]; int segment = 0; @@ -357,8 +357,8 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name) */ for (;;) { - SharedSegmentName(segment_name, name, segment); - if (!SharedFileSetDelete(fileset, segment_name, true)) + FileSetSegmentName(segment_name, name, segment); + if (!FileSetDelete(fileset, segment_name, true)) break; found = true; ++segment; @@ -367,16 +367,16 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name) } if (!found) - elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name); + elog(ERROR, "could not delete unknown BufFile \"%s\"", name); } /* - * BufFileExportShared --- flush and make read-only, in preparation for sharing. + * BufFileExportFileSet --- flush and make read-only, in preparation for sharing. */ void -BufFileExportShared(BufFile *file) +BufFileExportFileSet(BufFile *file) { - /* Must be a file belonging to a SharedFileSet. */ + /* Must be a file belonging to a FileSet. */ Assert(file->fileset != NULL); /* It's probably a bug if someone calls this twice. */ @@ -785,7 +785,7 @@ BufFileTellBlock(BufFile *file) #endif /* - * Return the current shared BufFile size. + * Return the current fileset based BufFile size. * * Counts any holes left behind by BufFileAppend as part of the size. * ereport()s on failure. @@ -811,8 +811,8 @@ BufFileSize(BufFile *file) } /* - * Append the contents of source file (managed within shared fileset) to - * end of target file (managed within same shared fileset). + * Append the contents of source file (managed within fileset) to + * end of target file (managed within same fileset). * * Note that operation subsumes ownership of underlying resources from * "source". Caller should never call BufFileClose against source having @@ -854,11 +854,11 @@ BufFileAppend(BufFile *target, BufFile *source) } /* - * Truncate a BufFile created by BufFileCreateShared up to the given fileno and - * the offset. + * Truncate a BufFile created by BufFileCreateFileSet up to the given fileno + * and the offset. */ void -BufFileTruncateShared(BufFile *file, int fileno, off_t offset) +BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset) { int numFiles = file->numFiles; int newFile = fileno; @@ -876,12 +876,12 @@ BufFileTruncateShared(BufFile *file, int fileno, off_t offset) { if ((i != fileno || offset == 0) && i != 0) { - SharedSegmentName(segment_name, file->name, i); + FileSetSegmentName(segment_name, file->name, i); FileClose(file->files[i]); - if (!SharedFileSetDelete(file->fileset, segment_name, true)) + if (!FileSetDelete(file->fileset, segment_name, true)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not delete shared fileset \"%s\": %m", + errmsg("could not delete fileset \"%s\": %m", segment_name))); numFiles--; newOffset = MAX_PHYSICAL_FILESIZE; diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index b58b399834..433e2832a5 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -1921,7 +1921,7 @@ PathNameDeleteTemporaryFile(const char *path, bool error_on_failure) /* * Unlike FileClose's automatic file deletion code, we tolerate - * non-existence to support BufFileDeleteShared which doesn't know how + * non-existence to support BufFileDeleteFileSet which doesn't know how * many segments it has to delete until it runs out. */ if (stat_errno == ENOENT) diff --git a/src/backend/storage/file/fileset.c b/src/backend/storage/file/fileset.c new file mode 100644 index 0000000000..282ff12b85 --- /dev/null +++ b/src/backend/storage/file/fileset.c @@ -0,0 +1,205 @@ +/*------------------------------------------------------------------------- + * + * fileset.c + * Management of named temporary files. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/file/fileset.c + * + * FileSets provide a temporary namespace (think directory) so that files can + * be discovered by name. + * + * FileSets can be used by backends when the temporary files need to be + * opened/closed multiple times and the underlying files need to survive across + * transactions. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include + +#include "catalog/pg_tablespace.h" +#include "commands/tablespace.h" +#include "common/hashfn.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/fileset.h" +#include "utils/builtins.h" + +static void FileSetPath(char *path, FileSet *fileset, Oid tablespace); +static void FilePath(char *path, FileSet *fileset, const char *name); +static Oid ChooseTablespace(const FileSet *fileset, const char *name); + +/* + * Initialize a space for temporary files. This API can be used by shared + * fileset as well as if the temporary files are used only by single backend + * but the files need to be opened and closed multiple times and also the + * underlying files need to survive across transactions. + * + * The callers are expected to explicitly remove such files by using + * FileSetDelete/FileSetDeleteAll. + * + * Files will be distributed over the tablespaces configured in + * temp_tablespaces. + * + * Under the covers the set is one or more directories which will eventually + * be deleted. + */ +void +FileSetInit(FileSet *fileset) +{ + static uint32 counter = 0; + + fileset->creator_pid = MyProcPid; + fileset->number = counter; + counter = (counter + 1) % INT_MAX; + + /* Capture the tablespace OIDs so that all backends agree on them. */ + PrepareTempTablespaces(); + fileset->ntablespaces = + GetTempTablespaces(&fileset->tablespaces[0], + lengthof(fileset->tablespaces)); + if (fileset->ntablespaces == 0) + { + /* If the GUC is empty, use current database's default tablespace */ + fileset->tablespaces[0] = MyDatabaseTableSpace; + fileset->ntablespaces = 1; + } + else + { + int i; + + /* + * An entry of InvalidOid means use the default tablespace for the + * current database. Replace that now, to be sure that all users of + * the FileSet agree on what to do. + */ + for (i = 0; i < fileset->ntablespaces; i++) + { + if (fileset->tablespaces[i] == InvalidOid) + fileset->tablespaces[i] = MyDatabaseTableSpace; + } + } +} + +/* + * Create a new file in the given set. + */ +File +FileSetCreate(FileSet *fileset, const char *name) +{ + char path[MAXPGPATH]; + File file; + + FilePath(path, fileset, name); + file = PathNameCreateTemporaryFile(path, false); + + /* If we failed, see if we need to create the directory on demand. */ + if (file <= 0) + { + char tempdirpath[MAXPGPATH]; + char filesetpath[MAXPGPATH]; + Oid tablespace = ChooseTablespace(fileset, name); + + TempTablespacePath(tempdirpath, tablespace); + FileSetPath(filesetpath, fileset, tablespace); + PathNameCreateTemporaryDir(tempdirpath, filesetpath); + file = PathNameCreateTemporaryFile(path, true); + } + + return file; +} + +/* + * Open a file that was created with FileSetCreate() */ +File +FileSetOpen(FileSet *fileset, const char *name, int mode) +{ + char path[MAXPGPATH]; + File file; + + FilePath(path, fileset, name); + file = PathNameOpenTemporaryFile(path, mode); + + return file; +} + +/* + * Delete a file that was created with FileSetCreate(). + * + * Return true if the file existed, false if didn't. + */ +bool +FileSetDelete(FileSet *fileset, const char *name, + bool error_on_failure) +{ + char path[MAXPGPATH]; + + FilePath(path, fileset, name); + + return PathNameDeleteTemporaryFile(path, error_on_failure); +} + +/* + * Delete all files in the set. + */ +void +FileSetDeleteAll(FileSet *fileset) +{ + char dirpath[MAXPGPATH]; + int i; + + /* + * Delete the directory we created in each tablespace. Doesn't fail + * because we use this in error cleanup paths, but can generate LOG + * message on IO error. + */ + for (i = 0; i < fileset->ntablespaces; ++i) + { + FileSetPath(dirpath, fileset, fileset->tablespaces[i]); + PathNameDeleteTemporaryDir(dirpath); + } +} + +/* + * Build the path for the directory holding the files backing a FileSet in a + * given tablespace. + */ +static void +FileSetPath(char *path, FileSet *fileset, Oid tablespace) +{ + char tempdirpath[MAXPGPATH]; + + TempTablespacePath(tempdirpath, tablespace); + snprintf(path, MAXPGPATH, "%s/%s%lu.%u.fileset", + tempdirpath, PG_TEMP_FILE_PREFIX, + (unsigned long) fileset->creator_pid, fileset->number); +} + +/* + * Sorting has to determine which tablespace a given temporary file belongs in. + */ +static Oid +ChooseTablespace(const FileSet *fileset, const char *name) +{ + uint32 hash = hash_any((const unsigned char *) name, strlen(name)); + + return fileset->tablespaces[hash % fileset->ntablespaces]; +} + +/* + * Compute the full path of a file in a FileSet. + */ +static void +FilePath(char *path, FileSet *fileset, const char *name) +{ + char dirpath[MAXPGPATH]; + + FileSetPath(dirpath, fileset, ChooseTablespace(fileset, name)); + snprintf(path, MAXPGPATH, "%s/%s", dirpath, name); +} diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c index ed37c940ad..6a33fac435 100644 --- a/src/backend/storage/file/sharedfileset.c +++ b/src/backend/storage/file/sharedfileset.c @@ -13,10 +13,6 @@ * files can be discovered by name, and a shared ownership semantics so that * shared files survive until the last user detaches. * - * SharedFileSets can be used by backends when the temporary files need to be - * opened/closed multiple times and the underlying files need to survive across - * transactions. - * *------------------------------------------------------------------------- */ @@ -33,13 +29,7 @@ #include "storage/sharedfileset.h" #include "utils/builtins.h" -static List *filesetlist = NIL; - static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum); -static void SharedFileSetDeleteOnProcExit(int status, Datum arg); -static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace); -static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name); -static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); /* * Initialize a space for temporary files that can be opened by other backends. @@ -47,77 +37,22 @@ static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); * SharedFileSet with 'seg'. Any contained files will be deleted when the * last backend detaches. * - * We can also use this interface if the temporary files are used only by - * single backend but the files need to be opened and closed multiple times - * and also the underlying files need to survive across transactions. For - * such cases, dsm segment 'seg' should be passed as NULL. Callers are - * expected to explicitly remove such files by using SharedFileSetDelete/ - * SharedFileSetDeleteAll or we remove such files on proc exit. - * - * Files will be distributed over the tablespaces configured in - * temp_tablespaces. - * * Under the covers the set is one or more directories which will eventually * be deleted. */ void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) { - static uint32 counter = 0; - + /* Initialize the shared fileset specific members. */ SpinLockInit(&fileset->mutex); fileset->refcnt = 1; - fileset->creator_pid = MyProcPid; - fileset->number = counter; - counter = (counter + 1) % INT_MAX; - /* Capture the tablespace OIDs so that all backends agree on them. */ - PrepareTempTablespaces(); - fileset->ntablespaces = - GetTempTablespaces(&fileset->tablespaces[0], - lengthof(fileset->tablespaces)); - if (fileset->ntablespaces == 0) - { - /* If the GUC is empty, use current database's default tablespace */ - fileset->tablespaces[0] = MyDatabaseTableSpace; - fileset->ntablespaces = 1; - } - else - { - int i; - - /* - * An entry of InvalidOid means use the default tablespace for the - * current database. Replace that now, to be sure that all users of - * the SharedFileSet agree on what to do. - */ - for (i = 0; i < fileset->ntablespaces; i++) - { - if (fileset->tablespaces[i] == InvalidOid) - fileset->tablespaces[i] = MyDatabaseTableSpace; - } - } + /* Initialize the fileset. */ + FileSetInit(&fileset->fs); /* Register our cleanup callback. */ if (seg) on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); - else - { - static bool registered_cleanup = false; - - if (!registered_cleanup) - { - /* - * We must not have registered any fileset before registering the - * fileset clean up. - */ - Assert(filesetlist == NIL); - on_proc_exit(SharedFileSetDeleteOnProcExit, 0); - registered_cleanup = true; - } - - filesetlist = lcons((void *) fileset, filesetlist); - } } /* @@ -147,87 +82,13 @@ SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg) on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); } -/* - * Create a new file in the given set. - */ -File -SharedFileSetCreate(SharedFileSet *fileset, const char *name) -{ - char path[MAXPGPATH]; - File file; - - SharedFilePath(path, fileset, name); - file = PathNameCreateTemporaryFile(path, false); - - /* If we failed, see if we need to create the directory on demand. */ - if (file <= 0) - { - char tempdirpath[MAXPGPATH]; - char filesetpath[MAXPGPATH]; - Oid tablespace = ChooseTablespace(fileset, name); - - TempTablespacePath(tempdirpath, tablespace); - SharedFileSetPath(filesetpath, fileset, tablespace); - PathNameCreateTemporaryDir(tempdirpath, filesetpath); - file = PathNameCreateTemporaryFile(path, true); - } - - return file; -} - -/* - * Open a file that was created with SharedFileSetCreate(), possibly in - * another backend. - */ -File -SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode) -{ - char path[MAXPGPATH]; - File file; - - SharedFilePath(path, fileset, name); - file = PathNameOpenTemporaryFile(path, mode); - - return file; -} - -/* - * Delete a file that was created with SharedFileSetCreate(). - * Return true if the file existed, false if didn't. - */ -bool -SharedFileSetDelete(SharedFileSet *fileset, const char *name, - bool error_on_failure) -{ - char path[MAXPGPATH]; - - SharedFilePath(path, fileset, name); - - return PathNameDeleteTemporaryFile(path, error_on_failure); -} - /* * Delete all files in the set. */ void SharedFileSetDeleteAll(SharedFileSet *fileset) { - char dirpath[MAXPGPATH]; - int i; - - /* - * Delete the directory we created in each tablespace. Doesn't fail - * because we use this in error cleanup paths, but can generate LOG - * message on IO error. - */ - for (i = 0; i < fileset->ntablespaces; ++i) - { - SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]); - PathNameDeleteTemporaryDir(dirpath); - } - - /* Unregister the shared fileset */ - SharedFileSetUnregister(fileset); + FileSetDeleteAll(&fileset->fs); } /* @@ -255,100 +116,5 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum) * this function so we can safely access its data. */ if (unlink_all) - SharedFileSetDeleteAll(fileset); -} - -/* - * Callback function that will be invoked on the process exit. This will - * process the list of all the registered sharedfilesets and delete the - * underlying files. - */ -static void -SharedFileSetDeleteOnProcExit(int status, Datum arg) -{ - /* - * Remove all the pending shared fileset entries. We don't use foreach() - * here because SharedFileSetDeleteAll will remove the current element in - * filesetlist. Though we have used foreach_delete_current() to remove the - * element from filesetlist it could only fix up the state of one of the - * loops, see SharedFileSetUnregister. - */ - while (list_length(filesetlist) > 0) - { - SharedFileSet *fileset = (SharedFileSet *) linitial(filesetlist); - - SharedFileSetDeleteAll(fileset); - } - - filesetlist = NIL; -} - -/* - * Unregister the shared fileset entry registered for cleanup on proc exit. - */ -void -SharedFileSetUnregister(SharedFileSet *input_fileset) -{ - ListCell *l; - - /* - * If the caller is following the dsm based cleanup then we don't maintain - * the filesetlist so return. - */ - if (filesetlist == NIL) - return; - - foreach(l, filesetlist) - { - SharedFileSet *fileset = (SharedFileSet *) lfirst(l); - - /* Remove the entry from the list */ - if (input_fileset == fileset) - { - filesetlist = foreach_delete_current(filesetlist, l); - return; - } - } - - /* Should have found a match */ - Assert(false); -} - -/* - * Build the path for the directory holding the files backing a SharedFileSet - * in a given tablespace. - */ -static void -SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace) -{ - char tempdirpath[MAXPGPATH]; - - TempTablespacePath(tempdirpath, tablespace); - snprintf(path, MAXPGPATH, "%s/%s%lu.%u.sharedfileset", - tempdirpath, PG_TEMP_FILE_PREFIX, - (unsigned long) fileset->creator_pid, fileset->number); -} - -/* - * Sorting hat to determine which tablespace a given shared temporary file - * belongs in. - */ -static Oid -ChooseTablespace(const SharedFileSet *fileset, const char *name) -{ - uint32 hash = hash_any((const unsigned char *) name, strlen(name)); - - return fileset->tablespaces[hash % fileset->ntablespaces]; -} - -/* - * Compute the full path of a file in a SharedFileSet. - */ -static void -SharedFilePath(char *path, SharedFileSet *fileset, const char *name) -{ - char dirpath[MAXPGPATH]; - - SharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name)); - snprintf(path, MAXPGPATH, "%s/%s", dirpath, name); + FileSetDeleteAll(&fileset->fs); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index cafc087254..f7994d771d 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenShared(fileset, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); filesize = BufFileSize(file); /* @@ -610,7 +610,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, * offset). * * The only thing that currently prevents writing to the leader tape from - * working is the fact that BufFiles opened using BufFileOpenShared() are + * working is the fact that BufFiles opened using BufFileOpenFileSet() are * read-only by definition, but that could be changed if it seemed * worthwhile. For now, writing to the leader tape will raise a "Bad file * descriptor" error, so tuplesort must avoid writing to the leader tape @@ -722,7 +722,7 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, char filename[MAXPGPATH]; pg_itoa(worker, filename); - lts->pfile = BufFileCreateShared(fileset, filename); + lts->pfile = BufFileCreateFileSet(&fileset->fs, filename); } else lts->pfile = BufFileCreateTemp(false); @@ -1096,7 +1096,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) /* Handle extra steps when caller is to share its tapeset */ if (share) { - BufFileExportShared(lts->pfile); + BufFileExportFileSet(lts->pfile); share->firstblocknumber = lt->firstBlockNumber; } } diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 57e35db4f8..504ef1c286 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -310,7 +310,8 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, /* Create one. Only this backend will write into it. */ sts_filename(name, accessor, accessor->participant); - accessor->write_file = BufFileCreateShared(accessor->fileset, name); + accessor->write_file = + BufFileCreateFileSet(&accessor->fileset->fs, name); /* Set up the shared state for this backend's file. */ participant = &accessor->sts->participants[accessor->participant]; @@ -559,7 +560,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenShared(accessor->fileset, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); } /* Seek and load the chunk header. */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 41c7487393..a6c9d4e2a1 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -79,6 +79,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_worker_cleanupfileset(void); extern int logicalrep_sync_worker_count(Oid subid); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 566523de1f..143eada85f 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -26,7 +26,7 @@ #ifndef BUFFILE_H #define BUFFILE_H -#include "storage/sharedfileset.h" +#include "storage/fileset.h" /* BufFile is an opaque type whose details are not known outside buffile.c. */ @@ -46,11 +46,11 @@ extern int BufFileSeekBlock(BufFile *file, long blknum); extern int64 BufFileSize(BufFile *file); extern long BufFileAppend(BufFile *target, BufFile *source); -extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); -extern void BufFileExportShared(BufFile *file); -extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); -extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset); +extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); +extern void BufFileExportFileSet(BufFile *file); +extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, + int mode); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); +extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ diff --git a/src/include/storage/fileset.h b/src/include/storage/fileset.h new file mode 100644 index 0000000000..be0e097834 --- /dev/null +++ b/src/include/storage/fileset.h @@ -0,0 +1,40 @@ +/*------------------------------------------------------------------------- + * + * fileset.h + * Management of named temporary files. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/fileset.h + * + *------------------------------------------------------------------------- + */ + +#ifndef FILESET_H +#define FILESET_H + +#include "storage/fd.h" + +/* + * A set of temporary files. + */ +typedef struct FileSet +{ + pid_t creator_pid; /* PID of the creating process */ + uint32 number; /* per-PID identifier */ + int ntablespaces; /* number of tablespaces to use */ + Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that + * it's rare that there more than temp + * tablespaces. */ +} FileSet; + +extern void FileSetInit(FileSet *fileset); +extern File FileSetCreate(FileSet *fileset, const char *name); +extern File FileSetOpen(FileSet *fileset, const char *name, + int mode); +extern bool FileSetDelete(FileSet *fileset, const char *name, + bool error_on_failure); +extern void FileSetDeleteAll(FileSet *fileset); + +#endif diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h index 09ba121aaf..59becfbef8 100644 --- a/src/include/storage/sharedfileset.h +++ b/src/include/storage/sharedfileset.h @@ -17,6 +17,7 @@ #include "storage/dsm.h" #include "storage/fd.h" +#include "storage/fileset.h" #include "storage/spin.h" /* @@ -24,24 +25,13 @@ */ typedef struct SharedFileSet { - pid_t creator_pid; /* PID of the creating process */ - uint32 number; /* per-PID identifier */ + FileSet fs; slock_t mutex; /* mutex protecting the reference count */ int refcnt; /* number of attached backends */ - int ntablespaces; /* number of tablespaces to use */ - Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that - * it's rare that there more than temp - * tablespaces. */ } SharedFileSet; extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg); extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg); -extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name); -extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name, - int mode); -extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name, - bool error_on_failure); extern void SharedFileSetDeleteAll(SharedFileSet *fileset); -extern void SharedFileSetUnregister(SharedFileSet *input_fileset); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 621d0cb4da..f31a1e4e1e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -709,6 +709,7 @@ File FileFdwExecutionState FileFdwPlanState FileNameMap +FileSet FileTag FinalPathExtraData FindColsContext