diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 9da04daca7..332c1ff4f5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -109,7 +109,7 @@ typedef struct ReorderBufferIterTXNEntry XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + File fd; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -178,7 +178,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); * subtransactions * --------------------------------------- */ -static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); @@ -194,7 +195,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + File *fd, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -945,15 +946,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) /* * Allocate & initialize an iterator which iterates in lsn order over a * transaction and all its subtransactions. + * + * Note: The iterator state is returned through iter_state parameter rather + * than the function's return value. This is because the state gets cleaned up + * in a PG_CATCH block in the caller, so we want to make sure the caller gets + * back the state even if this function throws an exception. */ -static ReorderBufferIterTXNState * -ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +static void +ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + *iter_state = NULL; + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -997,6 +1006,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferIterCompare, state); + /* Now that the state fields are initialized, it is safe to return it. */ + *iter_state = state; + /* * Now insert items into the binary heap, in an unordered fashion. (We * will run a heap assembly step at the end; this is more efficient.) @@ -1059,8 +1071,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* assemble a valid binary heap */ binaryheap_build(state->heap); - - return state; } /* @@ -1164,7 +1174,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + FileClose(state->entries[off].fd); } /* free memory we might have "leaked" in the last *Next call */ @@ -1500,7 +1510,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); + ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; @@ -2517,7 +2527,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + File *fd, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; @@ -2562,7 +2572,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); if (*fd < 0 && errno == ENOENT) { *fd = -1; @@ -2582,14 +2592,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange), + WAIT_EVENT_REORDER_BUFFER_READ); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); + FileClose(*fd); *fd = -1; (*segno)++; continue; @@ -2611,10 +2620,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(*fd, + rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange), + WAIT_EVENT_REORDER_BUFFER_READ); if (readBytes < 0) ereport(ERROR, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index c23cc4dda7..05693df057 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 11; use Config; # Initialize master node @@ -135,5 +135,42 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'), is($node_master->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); +# Test to ensure that we don't run out of file descriptors even if there +# are more spill files than maxAllocatedDescs. + +# Set max_files_per_process to a small value to make it more likely to run out +# of max open file descriptors. +$node_master->safe_psql('postgres', + 'ALTER SYSTEM SET max_files_per_process = 26;'); +$node_master->restart; + +$node_master->safe_psql( + 'postgres', q{ +do $$ +BEGIN + FOR i IN 1..10 LOOP + BEGIN + INSERT INTO decoding_test(x) SELECT generate_series(1,5000); + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +}); + +$result = $node_master->safe_psql('postgres', + qq[ +SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL) + WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1; +]); + +$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null}; +is($result, $expected, 'got expected output from spilling subxacts session'); + +# Reset back max_files_per_process +$node_master->safe_psql('postgres', + 'ALTER SYSTEM SET max_files_per_process = DEFAULT;'); +$node_master->restart; + # done with the node $node_master->stop;