diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 917033f3f6..22370a1393 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt CommandId combocid; /* just for debugging */ } ReorderBufferTupleCidEnt; +/* Virtual file descriptor with file offset tracking */ +typedef struct TXNEntryFile +{ + File vfd; /* -1 when the file is closed */ + off_t curOffset; /* offset for next write or read. Reset to 0 + * when vfd is opened. */ +} TXNEntryFile; + /* k-way in-order change iteration support structures */ typedef struct ReorderBufferIterTXNEntry { XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TXNEntryFile file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); * subtransactions * --------------------------------------- */ -static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); @@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TXNEntryFile *file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) /* * Allocate & initialize an iterator which iterates in lsn order over a * transaction and all its subtransactions. + * + * Note: The iterator state is returned through iter_state parameter rather + * than the function's return value. This is because the state gets cleaned up + * in a PG_CATCH block in the caller, so we want to make sure the caller gets + * back the state even if this function throws an exception. */ -static ReorderBufferIterTXNState * -ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +static void +ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + *iter_state = NULL; + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file.vfd = -1; state->entries[off].segno = 0; } @@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferIterCompare, state); + /* Now that the state fields are initialized, it is safe to return it. */ + *iter_state = state; + /* * Now insert items into the binary heap, in an unordered fashion. (We * will run a heap assembly step at the end; this is more efficient.) @@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* assemble a valid binary heap */ binaryheap_build(state->heap); - - return state; } /* @@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file.vfd != -1) + FileClose(state->entries[off].file.vfd); } /* free memory we might have "leaked" in the last *Next call */ @@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); + ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; @@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TXNEntryFile *file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; dlist_mutable_iter cleanup_iter; + File *fd = &file->vfd; Assert(txn->first_lsn != InvalidXLogRecPtr); Assert(txn->final_lsn != InvalidXLogRecPtr); @@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + + /* No harm in resetting the offset even in case of failure */ + file->curOffset = 0; + if (*fd < 0 && errno == ENOENT) { *fd = -1; @@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, rb->outbuf, + sizeof(ReorderBufferDiskChange), + file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); + FileClose(*fd); *fd = -1; (*segno)++; continue; @@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, readBytes, (uint32) sizeof(ReorderBufferDiskChange)))); + file->curOffset += readBytes; + ondisk = (ReorderBufferDiskChange *) rb->outbuf; ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, + rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange), + file->curOffset, + WAIT_EVENT_REORDER_BUFFER_READ); if (readBytes < 0) ereport(ERROR, @@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, readBytes, (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + file->curOffset += readBytes; + /* * ok, read a full change from disk, now restore it into proper * in-memory format diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index c23cc4dda7..05693df057 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 11; use Config; # Initialize master node @@ -135,5 +135,42 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'), is($node_master->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); +# Test to ensure that we don't run out of file descriptors even if there +# are more spill files than maxAllocatedDescs. + +# Set max_files_per_process to a small value to make it more likely to run out +# of max open file descriptors. +$node_master->safe_psql('postgres', + 'ALTER SYSTEM SET max_files_per_process = 26;'); +$node_master->restart; + +$node_master->safe_psql( + 'postgres', q{ +do $$ +BEGIN + FOR i IN 1..10 LOOP + BEGIN + INSERT INTO decoding_test(x) SELECT generate_series(1,5000); + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +}); + +$result = $node_master->safe_psql('postgres', + qq[ +SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL) + WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1; +]); + +$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null}; +is($result, $expected, 'got expected output from spilling subxacts session'); + +# Reset back max_files_per_process +$node_master->safe_psql('postgres', + 'ALTER SYSTEM SET max_files_per_process = DEFAULT;'); +$node_master->restart; + # done with the node $node_master->stop;