mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-10-02 19:36:52 +02:00
Fix some issues with WAL segment opening for pg_receivewal --compress
The logic handling the opening of new WAL segments was fuzzy when using
--compress if a partial, non-compressed, segment with the same base name
existed in the repository storing those files. In this case, using
--compress would cause the code to first check for the existence and the
size of a non-compressed segment, followed by the opening of a new
compressed, partial, segment. The code was accidentally working
correctly on most platforms as the buildfarm has proved, except
bowerbird where gzflush() could fail in this code path. It is wrong
anyway to take the code path used pre-padding when creating a new
partial, non-compressed, segment, so let's fix it.
Note that this issue exists when users mix successive runs of
pg_receivewal with or without compression, as discovered with the tests
introduced by ffc9dda
.
While on it, this refactors the code so as code paths that need to know
about the ".gz" suffix are down from four to one in walmethods.c, easing
a bit the introduction of new compression methods. This addresses a
second issue where log messages generated for an unexpected failure
would not show the compressed segment name involved, which was
confusing, printing instead the name of the non-compressed equivalent.
Reported-by: Georgios Kokolatos
Discussion: https://postgr.es/m/YPDLz2x3o1aX2wRh@paquier.xyz
Backpatch-through: 10
This commit is contained in:
parent
01c3adcdd8
commit
7fbe0c8c4d
@ -88,26 +88,29 @@ static bool
|
|||||||
open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||||
{
|
{
|
||||||
Walfile *f;
|
Walfile *f;
|
||||||
char fn[MAXPGPATH];
|
char *fn;
|
||||||
ssize_t size;
|
ssize_t size;
|
||||||
XLogSegNo segno;
|
XLogSegNo segno;
|
||||||
|
|
||||||
XLByteToSeg(startpoint, segno, WalSegSz);
|
XLByteToSeg(startpoint, segno, WalSegSz);
|
||||||
XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
|
XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
|
||||||
|
|
||||||
snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
|
/* Note that this considers the compression used if necessary */
|
||||||
stream->partial_suffix ? stream->partial_suffix : "");
|
fn = stream->walmethod->get_file_name(current_walfile_name,
|
||||||
|
stream->partial_suffix);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* When streaming to files, if an existing file exists we verify that it's
|
* When streaming to files, if an existing file exists we verify that it's
|
||||||
* either empty (just created), or a complete WalSegSz segment (in which
|
* either empty (just created), or a complete WalSegSz segment (in which
|
||||||
* case it has been created and padded). Anything else indicates a corrupt
|
* case it has been created and padded). Anything else indicates a corrupt
|
||||||
* file.
|
* file. Compressed files have no need for padding, so just ignore this
|
||||||
|
* case.
|
||||||
*
|
*
|
||||||
* When streaming to tar, no file with this name will exist before, so we
|
* When streaming to tar, no file with this name will exist before, so we
|
||||||
* never have to verify a size.
|
* never have to verify a size.
|
||||||
*/
|
*/
|
||||||
if (stream->walmethod->existsfile(fn))
|
if (stream->walmethod->compression() == 0 &&
|
||||||
|
stream->walmethod->existsfile(fn))
|
||||||
{
|
{
|
||||||
size = stream->walmethod->get_file_size(fn);
|
size = stream->walmethod->get_file_size(fn);
|
||||||
if (size < 0)
|
if (size < 0)
|
||||||
|
@ -68,20 +68,32 @@ dir_getlasterror(void)
|
|||||||
return strerror(errno);
|
return strerror(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static char *
|
||||||
|
dir_get_file_name(const char *pathname, const char *temp_suffix)
|
||||||
|
{
|
||||||
|
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
|
||||||
|
|
||||||
|
snprintf(filename, MAXPGPATH, "%s%s%s",
|
||||||
|
pathname, dir_data->compression > 0 ? ".gz" : "",
|
||||||
|
temp_suffix ? temp_suffix : "");
|
||||||
|
|
||||||
|
return filename;
|
||||||
|
}
|
||||||
|
|
||||||
static Walfile
|
static Walfile
|
||||||
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
||||||
{
|
{
|
||||||
static char tmppath[MAXPGPATH];
|
static char tmppath[MAXPGPATH];
|
||||||
|
char *filename;
|
||||||
int fd;
|
int fd;
|
||||||
DirectoryMethodFile *f;
|
DirectoryMethodFile *f;
|
||||||
#ifdef HAVE_LIBZ
|
#ifdef HAVE_LIBZ
|
||||||
gzFile gzfp = NULL;
|
gzFile gzfp = NULL;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
filename = dir_get_file_name(pathname, temp_suffix);
|
||||||
dir_data->basedir, pathname,
|
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||||
dir_data->compression > 0 ? ".gz" : "",
|
dir_data->basedir, filename);
|
||||||
temp_suffix ? temp_suffix : "");
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Open a file for non-compressed as well as compressed files. Tracking
|
* Open a file for non-compressed as well as compressed files. Tracking
|
||||||
@ -232,26 +244,31 @@ dir_close(Walfile f, WalCloseMethod method)
|
|||||||
/* Build path to the current version of the file */
|
/* Build path to the current version of the file */
|
||||||
if (method == CLOSE_NORMAL && df->temp_suffix)
|
if (method == CLOSE_NORMAL && df->temp_suffix)
|
||||||
{
|
{
|
||||||
|
char *filename;
|
||||||
|
char *filename2;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we have a temp prefix, normal operation is to rename the
|
* If we have a temp prefix, normal operation is to rename the
|
||||||
* file.
|
* file.
|
||||||
*/
|
*/
|
||||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
filename = dir_get_file_name(df->pathname, df->temp_suffix);
|
||||||
dir_data->basedir, df->pathname,
|
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||||
dir_data->compression > 0 ? ".gz" : "",
|
dir_data->basedir, filename);
|
||||||
df->temp_suffix);
|
|
||||||
snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
|
/* permanent name, so no need for the prefix */
|
||||||
dir_data->basedir, df->pathname,
|
filename2 = dir_get_file_name(df->pathname, NULL);
|
||||||
dir_data->compression > 0 ? ".gz" : "");
|
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
|
||||||
|
dir_data->basedir, filename2);
|
||||||
r = durable_rename(tmppath, tmppath2);
|
r = durable_rename(tmppath, tmppath2);
|
||||||
}
|
}
|
||||||
else if (method == CLOSE_UNLINK)
|
else if (method == CLOSE_UNLINK)
|
||||||
{
|
{
|
||||||
|
char *filename;
|
||||||
|
|
||||||
/* Unlink the file once it's closed */
|
/* Unlink the file once it's closed */
|
||||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
filename = dir_get_file_name(df->pathname, df->temp_suffix);
|
||||||
dir_data->basedir, df->pathname,
|
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||||
dir_data->compression > 0 ? ".gz" : "",
|
dir_data->basedir, filename);
|
||||||
df->temp_suffix ? df->temp_suffix : "");
|
|
||||||
r = unlink(tmppath);
|
r = unlink(tmppath);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -313,6 +330,12 @@ dir_get_file_size(const char *pathname)
|
|||||||
return statbuf.st_size;
|
return statbuf.st_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
dir_compression(void)
|
||||||
|
{
|
||||||
|
return dir_data->compression;
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
dir_existsfile(const char *pathname)
|
dir_existsfile(const char *pathname)
|
||||||
{
|
{
|
||||||
@ -355,6 +378,8 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
|
|||||||
method->write = dir_write;
|
method->write = dir_write;
|
||||||
method->get_current_pos = dir_get_current_pos;
|
method->get_current_pos = dir_get_current_pos;
|
||||||
method->get_file_size = dir_get_file_size;
|
method->get_file_size = dir_get_file_size;
|
||||||
|
method->get_file_name = dir_get_file_name;
|
||||||
|
method->compression = dir_compression;
|
||||||
method->close = dir_close;
|
method->close = dir_close;
|
||||||
method->sync = dir_sync;
|
method->sync = dir_sync;
|
||||||
method->existsfile = dir_existsfile;
|
method->existsfile = dir_existsfile;
|
||||||
@ -527,11 +552,22 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static char *
|
||||||
|
tar_get_file_name(const char *pathname, const char *temp_suffix)
|
||||||
|
{
|
||||||
|
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
|
||||||
|
|
||||||
|
snprintf(filename, MAXPGPATH, "%s%s",
|
||||||
|
pathname, temp_suffix ? temp_suffix : "");
|
||||||
|
|
||||||
|
return filename;
|
||||||
|
}
|
||||||
|
|
||||||
static Walfile
|
static Walfile
|
||||||
tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
||||||
{
|
{
|
||||||
int save_errno;
|
int save_errno;
|
||||||
static char tmppath[MAXPGPATH];
|
char *tmppath;
|
||||||
|
|
||||||
tar_clear_error();
|
tar_clear_error();
|
||||||
|
|
||||||
@ -583,8 +619,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
|||||||
|
|
||||||
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
|
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
|
||||||
|
|
||||||
snprintf(tmppath, sizeof(tmppath), "%s%s",
|
tmppath = tar_get_file_name(pathname, temp_suffix);
|
||||||
pathname, temp_suffix ? temp_suffix : "");
|
|
||||||
|
|
||||||
/* Create a header with size set to 0 - we will fill out the size on close */
|
/* Create a header with size set to 0 - we will fill out the size on close */
|
||||||
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
|
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
|
||||||
@ -689,6 +724,12 @@ tar_get_file_size(const char *pathname)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
tar_compression(void)
|
||||||
|
{
|
||||||
|
return tar_data->compression;
|
||||||
|
}
|
||||||
|
|
||||||
static off_t
|
static off_t
|
||||||
tar_get_current_pos(Walfile f)
|
tar_get_current_pos(Walfile f)
|
||||||
{
|
{
|
||||||
@ -994,6 +1035,8 @@ CreateWalTarMethod(const char *tarbase, int compression, bool sync)
|
|||||||
method->write = tar_write;
|
method->write = tar_write;
|
||||||
method->get_current_pos = tar_get_current_pos;
|
method->get_current_pos = tar_get_current_pos;
|
||||||
method->get_file_size = tar_get_file_size;
|
method->get_file_size = tar_get_file_size;
|
||||||
|
method->get_file_name = tar_get_file_name;
|
||||||
|
method->compression = tar_compression;
|
||||||
method->close = tar_close;
|
method->close = tar_close;
|
||||||
method->sync = tar_sync;
|
method->sync = tar_sync;
|
||||||
method->existsfile = tar_existsfile;
|
method->existsfile = tar_existsfile;
|
||||||
|
@ -52,6 +52,15 @@ struct WalWriteMethod
|
|||||||
/* Return the size of a file, or -1 on failure. */
|
/* Return the size of a file, or -1 on failure. */
|
||||||
ssize_t (*get_file_size) (const char *pathname);
|
ssize_t (*get_file_size) (const char *pathname);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return the name of the current file to work on, without the base
|
||||||
|
* directory. This is useful for logging.
|
||||||
|
*/
|
||||||
|
char *(*get_file_name) (const char *pathname, const char *temp_suffix);
|
||||||
|
|
||||||
|
/* Return the level of compression */
|
||||||
|
int (*compression) (void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Write count number of bytes to the file, and return the number of bytes
|
* Write count number of bytes to the file, and return the number of bytes
|
||||||
* actually written or -1 for error.
|
* actually written or -1 for error.
|
||||||
|
Loading…
Reference in New Issue
Block a user