diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8b1772db69..3fb4caa803 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -379,6 +379,7 @@ retry: worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; + worker->stream_fileset = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg) logicalrep_worker_detach(); - /* Cleanup filesets used for streaming transactions. */ - logicalrep_worker_cleanupfileset(); + /* Cleanup fileset used for streaming transactions. */ + if (MyLogicalRepWorker->stream_fileset != NULL) + FileSetDeleteAll(MyLogicalRepWorker->stream_fileset); ApplyLauncherWakeup(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bfb7d1a261..8d96c926b4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .ts = 0, }; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* file set for stream data */ - FileSet *subxact_fileset; /* file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -269,12 +255,6 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; -/* - * Hash table for storing the streaming xid information along with filesets - * for streaming and subxact files. - */ -static HTAB *xidhash = NULL; - /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1118,7 +1098,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, 0); /* - * Initialize the xidhash table if we haven't yet. This will be used for - * the entire duration of the apply worker so create it in permanent - * context. + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. */ - if (xidhash == NULL) + if (MyLogicalRepWorker->stream_fileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; set_apply_error_context_xact(subxid, 0); @@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, + O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2541,30 +2506,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } -/* - * Cleanup filesets. - */ -void -logicalrep_worker_cleanupfileset(void) -{ - HASH_SEQ_STATUS status; - StreamXidHash *hentry; - - /* Remove all the pending stream and subxact filesets. */ - if (xidhash) - { - hash_seq_init(&status, xidhash); - while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - FileSetDeleteAll(hentry->stream_fileset); - - /* Delete the subxact fileset iff it is created. */ - if (hentry->subxact_fileset) - FileSetDeleteAll(hentry->subxact_fileset); - } - } -} - /* * Apply main loop. */ @@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + /* construct the subxact filename */ + subxact_filename(path, subid, xid); - /* - * If there is no subtransaction then nothing to do, but if already have - * subxact file then delete that. - */ + /* Delete the subxacts file, if exists. */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); + return; } - subxact_filename(path, subid, xid); - /* * Create the subxact file if it not already created, otherwise open the * existing file. */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR, + true); + if (fd == NULL) + fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info + * If the subxact file doesn't exist that means we don't have any subxact + * info. */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid) * Cleanup files for a subscription / toplevel transaction. * * Remove files with serialized changes and subxact info for a particular - * toplevel transaction. Each subscription has a separate set of files. + * toplevel transaction. Each subscription has a separate set of files + * for any toplevel transaction. */ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - /* Delete the change file and release the stream fileset memory */ + /* Delete the changes file. */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false); - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } - - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + /* Delete the subxact file, if it exists. */ + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); } /* @@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the fileset and create the buffile, - * otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3322,20 +3201,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); /* - * If this is the first streamed segment, the file must not exist, so make - * sure we're the ones creating it. Otherwise just open the file for - * writing, in append mode. + * If this is the first streamed segment, create the changes file. + * Otherwise, just open the file for writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, + path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, + path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 5e5409d84d..ff3aa67cde 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -278,10 +278,13 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or * BufFileExportFileSet() to make sure that it is ready to be opened by other - * backends and render it read-only. + * backends and render it read-only. If missing_ok is true, which indicates + * that missing files can be safely ignored, then return NULL if the BufFile + * with the given name is not found, otherwise, throw an error. */ BufFile * -BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, + bool missing_ok) { BufFile *file; char segment_name[MAXPGPATH]; @@ -318,10 +321,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * name. */ if (nfiles == 0) + { + /* free the memory */ + pfree(files); + + if (missing_ok) + return NULL; + ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name))); + } file = makeBufFileCommon(nfiles); file->files = files; @@ -341,10 +352,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know - * that it exists and has been exported or closed. + * that it exists and has been exported or closed otherwise missing_ok should + * be passed true. */ void -BufFileDeleteFileSet(FileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) { char segment_name[MAXPGPATH]; int segment = 0; @@ -366,7 +378,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) CHECK_FOR_INTERRUPTS(); } - if (!found) + if (!found && !missing_ok) elog(ERROR, "could not delete unknown BufFile \"%s\"", name); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index f7994d771d..debf12e1b0 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 504ef1c286..033088f9bc 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); } /* Seek and load the chunk header. */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a6c9d4e2a1..c00be2a2b6 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -50,6 +50,15 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Used to create the changes and subxact files for the streaming + * transactions. Upon the arrival of the first streaming transaction, the + * fileset will be initialized, and it will be deleted when the worker + * exits. Under this, separate buffiles would be created for each + * transaction which will be deleted after the transaction is finished. + */ + FileSet *stream_fileset; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -79,7 +88,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); -extern void logicalrep_worker_cleanupfileset(void); extern int logicalrep_sync_worker_count(Oid subid); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 143eada85f..7ae5ea2dde 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); extern void BufFileExportFileSet(BufFile *file); extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); + int mode, bool missing_ok); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, + bool missing_ok); extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */