Provide vectored variants of smgrread() and smgrwrite().

smgrreadv() and smgrwritev() and their md.c implementations call
FileReadV() and FileWriteV().  A range of disk blocks beginning at
'blocknum' and extending for 'nblocks' can be scattered to or gathered
from multiple buffers with a single system call.  The traditional
smgrread() and smgrwrite() functions are implemented in terms of the new
functions.

Later commits will introduce calls with nblocks > 1, but the following
behavioral changes can be seen already:

* After a short transfer we'll now retry until we eventually read 0
  bytes (= EOF) or get ENOSPC, EDQUOT, EFBIG etc, where previously we
  would infer the reason.  Retrying is consistent with xlog.c's
  treatment of large WAL writes, and arguably also xlog.c and fd.c's
  treatment of EINTR.  Arbitrary short returns for larger transfers have
  been observed on several OSes, and might in theory also happen for
  transient reasons with our own pg_p*v() fallback code.

* After unexpected EOF or -1, the error thrown now talks about
  a range even for the single block case, eg "blocks 42..42".

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com
This commit is contained in:
Thomas Munro 2023-12-18 13:08:49 +13:00
parent b7412e293b
commit 4908c58720
5 changed files with 290 additions and 132 deletions

View File

@ -6868,7 +6868,7 @@ FROM pg_stat_get_backend_idset() AS backendid;
arg5 is the ID of the backend which created the temporary relation for a
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared buffer.
arg6 is the number of bytes actually read, while arg7 is the number
requested (if these are different it indicates trouble).</entry>
requested (if these are different it indicates a short read).</entry>
</row>
<row>
<entry><literal>smgr-md-write-start</literal></entry>
@ -6890,7 +6890,7 @@ FROM pg_stat_get_backend_idset() AS backendid;
arg5 is the ID of the backend which created the temporary relation for a
local buffer, or <symbol>InvalidBackendId</symbol> (-1) for a shared buffer.
arg6 is the number of bytes actually written, while arg7 is the number
requested (if these are different it indicates trouble).</entry>
requested (if these are different it indicates a short write).</entry>
</row>
<row>
<entry><literal>sort-start</literal></entry>

View File

@ -28,6 +28,7 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "commands/tablespace.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
@ -754,138 +755,274 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
/*
* mdread() -- Read the specified block from a relation.
* Convert an array of buffer address into an array of iovec objects, and
* return the number that were required. 'iov' must have enough space for up
* to 'nblocks' elements, but the number used may be less depending on
* merging. In the case of a run of fully contiguous buffers, a single iovec
* will be populated that can be handled as a plain non-vectored I/O.
*/
static int
buffers_to_iovec(struct iovec *iov, void **buffers, int nblocks)
{
struct iovec *iovp;
int iovcnt;
Assert(nblocks >= 1);
/* If this build supports direct I/O, buffers must be I/O aligned. */
for (int i = 0; i < nblocks; ++i)
{
if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
Assert((uintptr_t) buffers[i] ==
TYPEALIGN(PG_IO_ALIGN_SIZE, buffers[i]));
}
/* Start the first iovec off with the first buffer. */
iovp = &iov[0];
iovp->iov_base = buffers[0];
iovp->iov_len = BLCKSZ;
iovcnt = 1;
/* Try to merge the rest. */
for (int i = 1; i < nblocks; ++i)
{
void *buffer = buffers[i];
if (((char *) iovp->iov_base + iovp->iov_len) == buffer)
{
/* Contiguous with the last iovec. */
iovp->iov_len += BLCKSZ;
}
else
{
/* Need a new iovec. */
iovp++;
iovp->iov_base = buffer;
iovp->iov_len = BLCKSZ;
iovcnt++;
}
}
return iovcnt;
}
/*
* mdreadv() -- Read the specified blocks from a relation.
*/
void
mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer)
mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
off_t seekpos;
int nbytes;
MdfdVec *v;
/* If this build supports direct I/O, the buffer must be I/O aligned. */
if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend);
v = _mdfd_getseg(reln, forknum, blocknum, false,
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend,
nbytes,
BLCKSZ);
if (nbytes != BLCKSZ)
while (nblocks > 0)
{
if (nbytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read block %u in file \"%s\": %m",
blocknum, FilePathName(v->mdfd_vfd))));
struct iovec iov[PG_IOV_MAX];
int iovcnt;
off_t seekpos;
int nbytes;
MdfdVec *v;
BlockNumber nblocks_this_segment;
size_t transferred_this_segment;
size_t size_this_segment;
v = _mdfd_getseg(reln, forknum, blocknum, false,
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nblocks_this_segment =
Min(nblocks,
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
nblocks_this_segment = Min(nblocks_this_segment, lengthof(iov));
iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
size_this_segment = nblocks_this_segment * BLCKSZ;
transferred_this_segment = 0;
/*
* Short read: we are at or past EOF, or we read a partial block at
* EOF. Normally this is an error; upper levels should never try to
* read a nonexistent block. However, if zero_damaged_pages is ON or
* we are InRecovery, we should instead return zeroes without
* complaining. This allows, for example, the case of trying to
* update a block that was later truncated away.
* Inner loop to continue after a short read. We'll keep going until
* we hit EOF rather than assuming that a short read means we hit the
* end.
*/
if (zero_damaged_pages || InRecovery)
MemSet(buffer, 0, BLCKSZ);
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read block %u in file \"%s\": read only %d of %d bytes",
blocknum, FilePathName(v->mdfd_vfd),
nbytes, BLCKSZ)));
for (;;)
{
TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend);
nbytes = FileReadV(v->mdfd_vfd, iov, iovcnt, seekpos,
WAIT_EVENT_DATA_FILE_READ);
TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend,
nbytes,
size_this_segment - transferred_this_segment);
#ifdef SIMULATE_SHORT_READ
nbytes = Min(nbytes, 4096);
#endif
if (nbytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read blocks %u..%u in file \"%s\": %m",
blocknum,
blocknum + nblocks_this_segment - 1,
FilePathName(v->mdfd_vfd))));
if (nbytes == 0)
{
/*
* We are at or past EOF, or we read a partial block at EOF.
* Normally this is an error; upper levels should never try to
* read a nonexistent block. However, if zero_damaged_pages
* is ON or we are InRecovery, we should instead return zeroes
* without complaining. This allows, for example, the case of
* trying to update a block that was later truncated away.
*/
if (zero_damaged_pages || InRecovery)
{
for (BlockNumber i = transferred_this_segment / BLCKSZ;
i < nblocks_this_segment;
++i)
memset(buffers[i], 0, BLCKSZ);
break;
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
blocknum,
blocknum + nblocks_this_segment - 1,
FilePathName(v->mdfd_vfd),
transferred_this_segment,
size_this_segment)));
}
/* One loop should usually be enough. */
transferred_this_segment += nbytes;
Assert(transferred_this_segment <= size_this_segment);
if (transferred_this_segment == size_this_segment)
break;
/* Adjust position and vectors after a short read. */
seekpos += nbytes;
iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
}
nblocks -= nblocks_this_segment;
buffers += nblocks_this_segment;
blocknum += nblocks_this_segment;
}
}
/*
* mdwrite() -- Write the supplied block at the appropriate location.
* mdwritev() -- Write the supplied blocks at the appropriate location.
*
* This is to be used only for updating already-existing blocks of a
* relation (ie, those before the current EOF). To extend a relation,
* use mdextend().
*/
void
mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
const void *buffer, bool skipFsync)
mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, bool skipFsync)
{
off_t seekpos;
int nbytes;
MdfdVec *v;
/* If this build supports direct I/O, the buffer must be I/O aligned. */
if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
/* This assert is too expensive to have on normally ... */
#ifdef CHECK_WRITE_VS_EXTEND
Assert(blocknum < mdnblocks(reln, forknum));
#endif
TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend);
v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend,
nbytes,
BLCKSZ);
if (nbytes != BLCKSZ)
while (nblocks > 0)
{
if (nbytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write block %u in file \"%s\": %m",
blocknum, FilePathName(v->mdfd_vfd))));
/* short write: complain appropriately */
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not write block %u in file \"%s\": wrote only %d of %d bytes",
blocknum,
FilePathName(v->mdfd_vfd),
nbytes, BLCKSZ),
errhint("Check free disk space.")));
}
struct iovec iov[PG_IOV_MAX];
int iovcnt;
off_t seekpos;
int nbytes;
MdfdVec *v;
BlockNumber nblocks_this_segment;
size_t transferred_this_segment;
size_t size_this_segment;
if (!skipFsync && !SmgrIsTemp(reln))
register_dirty_segment(reln, forknum, v);
v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nblocks_this_segment =
Min(nblocks,
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
nblocks_this_segment = Min(nblocks_this_segment, lengthof(iov));
iovcnt = buffers_to_iovec(iov, (void **) buffers, nblocks_this_segment);
size_this_segment = nblocks_this_segment * BLCKSZ;
transferred_this_segment = 0;
/*
* Inner loop to continue after a short write. If the reason is that
* we're out of disk space, a future attempt should get an ENOSPC
* error from the kernel.
*/
for (;;)
{
TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend);
nbytes = FileWriteV(v->mdfd_vfd, iov, iovcnt, seekpos,
WAIT_EVENT_DATA_FILE_WRITE);
TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
reln->smgr_rlocator.locator.spcOid,
reln->smgr_rlocator.locator.dbOid,
reln->smgr_rlocator.locator.relNumber,
reln->smgr_rlocator.backend,
nbytes,
size_this_segment - transferred_this_segment);
#ifdef SIMULATE_SHORT_WRITE
nbytes = Min(nbytes, 4096);
#endif
if (nbytes < 0)
{
bool enospc = errno == ENOSPC;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write blocks %u..%u in file \"%s\": %m",
blocknum,
blocknum + nblocks_this_segment - 1,
FilePathName(v->mdfd_vfd)),
enospc ? errhint("Check free disk space.") : 0));
}
/* One loop should usually be enough. */
transferred_this_segment += nbytes;
Assert(transferred_this_segment <= size_this_segment);
if (transferred_this_segment == size_this_segment)
break;
/* Adjust position and iovecs after a short write. */
seekpos += nbytes;
iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
}
if (!skipFsync && !SmgrIsTemp(reln))
register_dirty_segment(reln, forknum, v);
nblocks -= nblocks_this_segment;
buffers += nblocks_this_segment;
blocknum += nblocks_this_segment;
}
}
/*
* mdwriteback() -- Tell the kernel to write pages back to storage.
*

View File

@ -55,10 +55,13 @@ typedef struct f_smgr
BlockNumber blocknum, int nblocks, bool skipFsync);
bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
void (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, void *buffer);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffers, BlockNumber nblocks,
bool skipFsync);
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@ -80,8 +83,8 @@ static const f_smgr smgrsw[] = {
.smgr_extend = mdextend,
.smgr_zeroextend = mdzeroextend,
.smgr_prefetch = mdprefetch,
.smgr_read = mdread,
.smgr_write = mdwrite,
.smgr_readv = mdreadv,
.smgr_writev = mdwritev,
.smgr_writeback = mdwriteback,
.smgr_nblocks = mdnblocks,
.smgr_truncate = mdtruncate,
@ -554,22 +557,23 @@ smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
/*
* smgrread() -- read a particular block from a relation into the supplied
* buffer.
* smgrreadv() -- read a particular block range from a relation into the
* supplied buffers.
*
* This routine is called from the buffer manager in order to
* instantiate pages in the shared buffer cache. All storage managers
* return pages in the format that POSTGRES expects.
*/
void
smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer)
smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer);
smgrsw[reln->smgr_which].smgr_readv(reln, forknum, blocknum, buffers,
nblocks);
}
/*
* smgrwrite() -- Write the supplied buffer out.
* smgrwritev() -- Write the supplied buffers out.
*
* This is to be used only for updating already-existing blocks of a
* relation (ie, those before the current EOF). To extend a relation,
@ -584,14 +588,13 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
* do not require fsync.
*/
void
smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
const void *buffer, bool skipFsync)
smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, bool skipFsync)
{
smgrsw[reln->smgr_which].smgr_write(reln, forknum, blocknum,
buffer, skipFsync);
smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
buffers, nblocks, skipFsync);
}
/*
* smgrwriteback() -- Trigger kernel writeback for the supplied range of
* blocks.

View File

@ -32,10 +32,11 @@ extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks, bool skipFsync);
extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer);
extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, bool skipFsync);
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);

View File

@ -96,10 +96,13 @@ extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks, bool skipFsync);
extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, void *buffer);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffer, BlockNumber nblocks);
extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffer, BlockNumber nblocks,
bool skipFsync);
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@ -110,4 +113,18 @@ extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void AtEOXact_SMgr(void);
extern bool ProcessBarrierSmgrRelease(void);
static inline void
smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer)
{
smgrreadv(reln, forknum, blocknum, &buffer, 1);
}
static inline void
smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
const void *buffer, bool skipFsync)
{
smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
}
#endif /* SMGR_H */