pg_dump: Add support for zstd compression

Allow pg_dump to use the zstd compression, in addition to gzip/lz4. Bulk
of the new compression method is implemented in compress_zstd.{c,h},
covering the pg_dump compression APIs. The rest of the patch adds test
and makes various places aware of the new compression method.

The zstd library (which this patch relies on) supports multithreaded
compression since version 1.5. We however disallow that feature for now,
as it might interfere with parallel backups on platforms that rely on
threads (e.g. Windows). This can be improved / relaxed in the future.

This also fixes a minor issue in InitDiscoverCompressFileHandle(), which
was not updated to check if the file already has the .lz4 extension.

Adding zstd compression was originally proposed in 2020 (see the second
thread), but then was reworked to use the new compression API introduced
in e9960732a9. I've considered both threads when compiling the list of
reviewers.

Author: Justin Pryzby
Reviewed-by: Tomas Vondra, Jacob Champion, Andreas Karlsson
Discussion: https://postgr.es/m/20230224191840.GD1653@telsasoft.com
Discussion: https://postgr.es/m/20201221194924.GI30237@telsasoft.com
This commit is contained in:
Tomas Vondra 2023-04-05 21:38:04 +02:00
parent 794f259447
commit 84adc8e20f
12 changed files with 713 additions and 54 deletions

View File

@ -330,8 +330,9 @@ PostgreSQL documentation
machine-readable format that <application>pg_restore</application>
can read. A directory format archive can be manipulated with
standard Unix tools; for example, files in an uncompressed archive
can be compressed with the <application>gzip</application> or
<application>lz4</application> tools.
can be compressed with the <application>gzip</application>,
<application>lz4</application>, or
<application>zstd</application> tools.
This format is compressed by default using <literal>gzip</literal>
and also supports parallel dumps.
</para>
@ -655,7 +656,8 @@ PostgreSQL documentation
<para>
Specify the compression method and/or the compression level to use.
The compression method can be set to <literal>gzip</literal>,
<literal>lz4</literal>, or <literal>none</literal> for no compression.
<literal>lz4</literal>, <literal>zstd</literal>,
or <literal>none</literal> for no compression.
A compression detail string can optionally be specified. If the
detail string is an integer, it specifies the compression level.
Otherwise, it should be a comma-separated list of items, each of the
@ -676,8 +678,9 @@ PostgreSQL documentation
individual table-data segments, and the default is to compress using
<literal>gzip</literal> at a moderate level. For plain text output,
setting a nonzero compression level causes the entire output file to be compressed,
as though it had been fed through <application>gzip</application> or
<application>lz4</application>; but the default is not to compress.
as though it had been fed through <application>gzip</application>,
<application>lz4</application>, or <application>zstd</application>;
but the default is not to compress.
</para>
<para>
The tar archive format currently does not support compression at all.

View File

@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global
export GZIP_PROGRAM=$(GZIP)
export LZ4
export ZSTD
export with_icu
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@ -29,6 +30,7 @@ OBJS = \
compress_io.o \
compress_lz4.o \
compress_none.o \
compress_zstd.o \
dumputils.o \
parallel.o \
pg_backup_archiver.o \

View File

@ -52,8 +52,8 @@
*
* InitDiscoverCompressFileHandle tries to infer the compression by the
* filename suffix. If the suffix is not yet known then it tries to simply
* open the file and if it fails, it tries to open the same file with the .gz
* suffix, and then again with the .lz4 suffix.
* open the file and if it fails, it tries to open the same file with
* compressed suffixes (.gz, .lz4 and .zst, in this order).
*
* IDENTIFICATION
* src/bin/pg_dump/compress_io.c
@ -69,6 +69,7 @@
#include "compress_io.h"
#include "compress_lz4.h"
#include "compress_none.h"
#include "compress_zstd.h"
#include "pg_backup_utils.h"
/*----------------------
@ -77,7 +78,8 @@
*/
/*
* Checks whether a compression algorithm is supported.
* Checks whether support for a compression algorithm is implemented in
* pg_dump/restore.
*
* On success returns NULL, otherwise returns a malloc'ed string which can be
* used by the caller in an error message.
@ -98,6 +100,10 @@ supports_compression(const pg_compress_specification compression_spec)
if (algorithm == PG_COMPRESSION_LZ4)
supported = true;
#endif
#ifdef USE_ZSTD
if (algorithm == PG_COMPRESSION_ZSTD)
supported = true;
#endif
if (!supported)
return psprintf("this build does not support compression with %s",
@ -130,6 +136,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
InitCompressorGzip(cs, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressorLZ4(cs, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
InitCompressorZstd(cs, compression_spec);
return cs;
}
@ -196,20 +204,36 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
InitCompressFileHandleGzip(CFH, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressFileHandleLZ4(CFH, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
InitCompressFileHandleZstd(CFH, compression_spec);
return CFH;
}
/*
* Checks if a compressed file (with the specified extension) exists.
*
* The filename of the tested file is stored to fname buffer (the existing
* buffer is freed, new buffer is allocated and returned through the pointer).
*/
static bool
check_compressed_file(const char *path, char **fname, char *ext)
{
free_keep_errno(*fname);
*fname = psprintf("%s.%s", path, ext);
return (access(*fname, F_OK) == 0);
}
/*
* Open a file for reading. 'path' is the file to open, and 'mode' should
* be either "r" or "rb".
*
* If the file at 'path' contains the suffix of a supported compression method,
* currently this includes ".gz" and ".lz4", then this compression will be used
* currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
* throughout. Otherwise the compression will be inferred by iteratively trying
* to open the file at 'path', first as is, then by appending known compression
* suffixes. So if you pass "foo" as 'path', this will open either "foo" or
* "foo.gz" or "foo.lz4", trying in that order.
* "foo.{gz,lz4,zst}", trying in that order.
*
* On failure, return NULL with an error code in errno.
*/
@ -229,36 +253,20 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
if (hasSuffix(fname, ".gz"))
compression_spec.algorithm = PG_COMPRESSION_GZIP;
else if (hasSuffix(fname, ".lz4"))
compression_spec.algorithm = PG_COMPRESSION_LZ4;
else if (hasSuffix(fname, ".zst"))
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
else
{
bool exists;
exists = (stat(path, &st) == 0);
/* avoid unused warning if it is not built with compression */
if (exists)
if (stat(path, &st) == 0)
compression_spec.algorithm = PG_COMPRESSION_NONE;
#ifdef HAVE_LIBZ
if (!exists)
{
free_keep_errno(fname);
fname = psprintf("%s.gz", path);
exists = (stat(fname, &st) == 0);
if (exists)
compression_spec.algorithm = PG_COMPRESSION_GZIP;
}
#endif
#ifdef USE_LZ4
if (!exists)
{
free_keep_errno(fname);
fname = psprintf("%s.lz4", path);
exists = (stat(fname, &st) == 0);
if (exists)
compression_spec.algorithm = PG_COMPRESSION_LZ4;
}
#endif
else if (check_compressed_file(path, &fname, "gz"))
compression_spec.algorithm = PG_COMPRESSION_GZIP;
else if (check_compressed_file(path, &fname, "lz4"))
compression_spec.algorithm = PG_COMPRESSION_LZ4;
else if (check_compressed_file(path, &fname, "zst"))
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
}
CFH = InitCompressFileHandle(compression_spec);

View File

@ -0,0 +1,537 @@
/*-------------------------------------------------------------------------
*
* compress_zstd.c
* Routines for archivers to write a Zstd compressed data stream.
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/bin/pg_dump/compress_zstd.c
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "pg_backup_utils.h"
#include "compress_zstd.h"
#ifndef USE_ZSTD
void
InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
{
pg_fatal("this build does not support compression with %s", "ZSTD");
}
void
InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
{
pg_fatal("this build does not support compression with %s", "ZSTD");
}
#else
#include <zstd.h>
typedef struct ZstdCompressorState
{
/* This is a normal file to which we read/write compressed data */
FILE *fp;
ZSTD_CStream *cstream;
ZSTD_DStream *dstream;
ZSTD_outBuffer output;
ZSTD_inBuffer input;
/* pointer to a static string like from strerror(), for Zstd_write() */
const char *zstderror;
} ZstdCompressorState;
static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen);
static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
static void
_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
ZSTD_cParameter param, int value, char *paramname)
{
size_t res;
res = ZSTD_CCtx_setParameter(cstream, param, value);
if (ZSTD_isError(res))
pg_fatal("could not set compression parameter: \"%s\": %s",
paramname, ZSTD_getErrorName(res));
}
/* Return a compression stream with parameters set per argument */
static ZSTD_CStream *
_ZstdCStreamParams(pg_compress_specification compress)
{
ZSTD_CStream *cstream;
cstream = ZSTD_createCStream();
if (cstream == NULL)
pg_fatal("could not initialize compression library");
_Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
compress.level, "level");
return cstream;
}
/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
static void
_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
ZSTD_inBuffer *input = &zstdcs->input;
ZSTD_outBuffer *output = &zstdcs->output;
/* Loop while there's any input or until flushed */
while (input->pos != input->size || flush)
{
size_t res;
output->pos = 0;
res = ZSTD_compressStream2(zstdcs->cstream, output,
input, flush ? ZSTD_e_end : ZSTD_e_continue);
if (ZSTD_isError(res))
pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
/*
* Extra paranoia: avoid zero-length chunks, since a zero length chunk
* is the EOF marker in the custom format. This should never happen
* but...
*/
if (output->pos > 0)
cs->writeF(AH, output->dst, output->pos);
if (res == 0)
break; /* End of frame or all input consumed */
}
}
static void
EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
if (cs->readF != NULL)
{
Assert(zstdcs->cstream == NULL);
ZSTD_freeDStream(zstdcs->dstream);
pg_free(unconstify(void *, zstdcs->input.src));
}
else if (cs->writeF != NULL)
{
Assert(zstdcs->dstream == NULL);
_ZstdWriteCommon(AH, cs, true);
ZSTD_freeCStream(zstdcs->cstream);
pg_free(zstdcs->output.dst);
}
pg_free(zstdcs);
}
static void
WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
zstdcs->input.src = data;
zstdcs->input.size = dLen;
zstdcs->input.pos = 0;
_ZstdWriteCommon(AH, cs, false);
}
static void
ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
ZSTD_outBuffer *output = &zstdcs->output;
ZSTD_inBuffer *input = &zstdcs->input;
size_t input_allocated_size = ZSTD_DStreamInSize();
size_t res;
for (;;)
{
size_t cnt;
/*
* Read compressed data. Note that readF can resize the buffer; the
* new size is tracked and used for future loops.
*/
input->size = input_allocated_size;
cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
/* ensure that readF didn't *shrink* the buffer */
Assert(input->size >= input_allocated_size);
input_allocated_size = input->size;
input->size = cnt;
input->pos = 0;
if (cnt == 0)
break;
/* Now decompress */
while (input->pos < input->size)
{
output->pos = 0;
res = ZSTD_decompressStream(zstdcs->dstream, output, input);
if (ZSTD_isError(res))
pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
/*
* then write the decompressed data to the output handle
*/
((char *) output->dst)[output->pos] = '\0';
ahwrite(output->dst, 1, output->pos, AH);
if (res == 0)
break; /* End of frame */
}
}
}
/* Public routine that supports Zstd compressed data I/O */
void
InitCompressorZstd(CompressorState *cs,
const pg_compress_specification compression_spec)
{
ZstdCompressorState *zstdcs;
cs->readData = ReadDataFromArchiveZstd;
cs->writeData = WriteDataToArchiveZstd;
cs->end = EndCompressorZstd;
cs->compression_spec = compression_spec;
zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
cs->private_data = zstdcs;
/* We expect that exactly one of readF/writeF is specified */
Assert((cs->readF == NULL) != (cs->writeF == NULL));
if (cs->readF != NULL)
{
zstdcs->dstream = ZSTD_createDStream();
if (zstdcs->dstream == NULL)
pg_fatal("could not initialize compression library");
zstdcs->input.size = ZSTD_DStreamInSize();
zstdcs->input.src = pg_malloc(zstdcs->input.size);
/*
* output.size is the buffer size we tell zstd it can output to.
* Allocate an additional byte such that ReadDataFromArchiveZstd() can
* call ahwrite() with a null-terminated string, which is an optimized
* case in ExecuteSqlCommandBuf().
*/
zstdcs->output.size = ZSTD_DStreamOutSize();
zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
}
else if (cs->writeF != NULL)
{
zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
zstdcs->output.size = ZSTD_CStreamOutSize();
zstdcs->output.dst = pg_malloc(zstdcs->output.size);
zstdcs->output.pos = 0;
}
}
/*
* Compressed stream API
*/
static bool
Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
ZSTD_inBuffer *input = &zstdcs->input;
ZSTD_outBuffer *output = &zstdcs->output;
size_t input_allocated_size = ZSTD_DStreamInSize();
size_t res,
cnt;
output->size = size;
output->dst = ptr;
output->pos = 0;
for (;;)
{
Assert(input->pos <= input->size);
Assert(input->size <= input_allocated_size);
/*
* If the input is completely consumed, start back at the beginning
*/
if (input->pos == input->size)
{
/* input->size is size produced by "fread" */
input->size = 0;
/* input->pos is position consumed by decompress */
input->pos = 0;
}
/* read compressed data if we must produce more input */
if (input->pos == input->size)
{
cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
input->size = cnt;
Assert(cnt <= input_allocated_size);
/* If we have no more input to consume, we're done */
if (cnt == 0)
break;
}
while (input->pos < input->size)
{
/* now decompress */
res = ZSTD_decompressStream(zstdcs->dstream, output, input);
if (ZSTD_isError(res))
pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
if (output->pos == output->size)
break; /* No more room for output */
if (res == 0)
break; /* End of frame */
}
if (output->pos == output->size)
break; /* We read all the data that fits */
}
if (rdsize != NULL)
*rdsize = output->pos;
return true;
}
static bool
Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
ZSTD_inBuffer *input = &zstdcs->input;
ZSTD_outBuffer *output = &zstdcs->output;
size_t res,
cnt;
input->src = ptr;
input->size = size;
input->pos = 0;
/* Consume all input, to be flushed later */
while (input->pos != input->size)
{
output->pos = 0;
res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
if (ZSTD_isError(res))
{
zstdcs->zstderror = ZSTD_getErrorName(res);
return false;
}
cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
if (cnt != output->pos)
{
zstdcs->zstderror = strerror(errno);
return false;
}
}
return size;
}
static int
Zstd_getc(CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
int ret;
if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
{
if (feof(zstdcs->fp))
pg_fatal("could not read from input file: end of file");
else
pg_fatal("could not read from input file: %m");
}
return ret;
}
static char *
Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
{
int i;
Assert(len > 0);
/*
* Read one byte at a time until newline or EOF. This is only used to read
* the list of LOs, and the I/O is buffered anyway.
*/
for (i = 0; i < len - 1; ++i)
{
size_t readsz;
if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
break;
if (readsz != 1)
break;
if (buf[i] == '\n')
{
++i;
break;
}
}
buf[i] = '\0';
return i > 0 ? buf : NULL;
}
static bool
Zstd_close(CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
if (zstdcs->cstream)
{
size_t res,
cnt;
ZSTD_inBuffer *input = &zstdcs->input;
ZSTD_outBuffer *output = &zstdcs->output;
/* Loop until the compression buffers are fully consumed */
for (;;)
{
output->pos = 0;
res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
if (ZSTD_isError(res))
{
zstdcs->zstderror = ZSTD_getErrorName(res);
return false;
}
cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
if (cnt != output->pos)
{
zstdcs->zstderror = strerror(errno);
return false;
}
if (res == 0)
break; /* End of frame */
}
ZSTD_freeCStream(zstdcs->cstream);
pg_free(zstdcs->output.dst);
}
if (zstdcs->dstream)
{
ZSTD_freeDStream(zstdcs->dstream);
pg_free(unconstify(void *, zstdcs->input.src));
}
if (fclose(zstdcs->fp) != 0)
return false;
pg_free(zstdcs);
return true;
}
static bool
Zstd_eof(CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
return feof(zstdcs->fp);
}
static bool
Zstd_open(const char *path, int fd, const char *mode,
CompressFileHandle *CFH)
{
FILE *fp;
ZstdCompressorState *zstdcs;
if (fd >= 0)
fp = fdopen(fd, mode);
else
fp = fopen(path, mode);
if (fp == NULL)
return false;
zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
CFH->private_data = zstdcs;
zstdcs->fp = fp;
if (mode[0] == 'r')
{
zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
zstdcs->dstream = ZSTD_createDStream();
if (zstdcs->dstream == NULL)
pg_fatal("could not initialize compression library");
}
else if (mode[0] == 'w' || mode[0] == 'a')
{
zstdcs->output.size = ZSTD_CStreamOutSize();
zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
if (zstdcs->cstream == NULL)
pg_fatal("could not initialize compression library");
}
else
pg_fatal("unhandled mode");
return true;
}
static bool
Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
{
char fname[MAXPGPATH];
sprintf(fname, "%s.zst", path);
return CFH->open_func(fname, -1, mode, CFH);
}
static const char *
Zstd_get_error(CompressFileHandle *CFH)
{
ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
return zstdcs->zstderror;
}
void
InitCompressFileHandleZstd(CompressFileHandle *CFH,
const pg_compress_specification compression_spec)
{
CFH->open_func = Zstd_open;
CFH->open_write_func = Zstd_open_write;
CFH->read_func = Zstd_read;
CFH->write_func = Zstd_write;
CFH->gets_func = Zstd_gets;
CFH->getc_func = Zstd_getc;
CFH->close_func = Zstd_close;
CFH->eof_func = Zstd_eof;
CFH->get_error_func = Zstd_get_error;
CFH->compression_spec = compression_spec;
CFH->private_data = NULL;
}
#endif /* USE_ZSTD */

View File

@ -0,0 +1,25 @@
/*-------------------------------------------------------------------------
*
* compress_zstd.h
* Zstd interface to compress_io.c routines
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/bin/pg_dump/compress_zstd.h
*
*-------------------------------------------------------------------------
*/
#ifndef COMPRESS_ZSTD_H
#define COMPRESS_ZSTD_H
#include "compress_io.h"
extern void InitCompressorZstd(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
const pg_compress_specification compression_spec);
#endif /* COMPRESS_ZSTD_H */

View File

@ -5,6 +5,7 @@ pg_dump_common_sources = files(
'compress_io.c',
'compress_lz4.c',
'compress_none.c',
'compress_zstd.c',
'dumputils.c',
'parallel.c',
'pg_backup_archiver.c',
@ -19,7 +20,7 @@ pg_dump_common_sources = files(
pg_dump_common = static_library('libpgdump_common',
pg_dump_common_sources,
c_pch: pch_postgres_fe_h,
dependencies: [frontend_code, libpq, lz4, zlib],
dependencies: [frontend_code, libpq, lz4, zlib, zstd],
kwargs: internal_lib_args,
)
@ -90,6 +91,7 @@ tests += {
'env': {
'GZIP_PROGRAM': gzip.path(),
'LZ4': program_lz4.found() ? program_lz4.path() : '',
'ZSTD': program_zstd.found() ? program_zstd.path() : '',
'with_icu': icu.found() ? 'yes' : 'no',
},
'tests': [

View File

@ -2120,7 +2120,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/*
* Check if the specified archive is a directory. If so, check if
* there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
* there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
*/
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
{
@ -2134,6 +2134,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
#ifdef USE_LZ4
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
return AH->format;
#endif
#ifdef USE_ZSTD
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
return AH->format;
#endif
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
AH->fSpec);

View File

@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
strlcat(fname, ".zst", sizeof(fname));
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;

View File

@ -56,6 +56,7 @@
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "common/relpath.h"
#include "compress_io.h"
#include "dumputils.h"
#include "fe_utils/option_utils.h"
#include "fe_utils/string_utils.h"
@ -735,18 +736,18 @@ main(int argc, char **argv)
pg_fatal("invalid compression specification: %s",
error_detail);
switch (compression_algorithm)
{
case PG_COMPRESSION_NONE:
/* fallthrough */
case PG_COMPRESSION_GZIP:
/* fallthrough */
case PG_COMPRESSION_LZ4:
break;
case PG_COMPRESSION_ZSTD:
pg_fatal("compression with %s is not yet supported", "ZSTD");
break;
}
error_detail = supports_compression(compression_spec);
if (error_detail != NULL)
pg_fatal("%s", error_detail);
/*
* Disable support for zstd workers for now - these are based on threading,
* and it's unclear how it interacts with parallel dumps on platforms where
* that relies on threads too (e.g. Windows).
*/
if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
"workers");
/*
* Custom and directory formats are compressed by default with gzip when

View File

@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
# those lines) to validate that part of the process.
my $supports_icu = ($ENV{with_icu} eq 'yes');
my $supports_lz4 = check_pg_config("#define USE_LZ4 1");
my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
my $supports_lz4 = check_pg_config("#define USE_LZ4 1");
my $supports_zstd = check_pg_config("#define USE_ZSTD 1");
my %pgdump_runs = (
binary_upgrade => {
@ -213,6 +214,77 @@ my %pgdump_runs = (
},
},
compression_zstd_custom => {
test_key => 'compression',
compile_option => 'zstd',
dump_cmd => [
'pg_dump', '--format=custom',
'--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
'postgres',
],
restore_cmd => [
'pg_restore',
"--file=$tempdir/compression_zstd_custom.sql",
"$tempdir/compression_zstd_custom.dump",
],
command_like => {
command => [
'pg_restore',
'-l', "$tempdir/compression_zstd_custom.dump",
],
expected => qr/Compression: zstd/,
name => 'data content is zstd compressed'
},
},
compression_zstd_dir => {
test_key => 'compression',
compile_option => 'zstd',
dump_cmd => [
'pg_dump', '--jobs=2',
'--format=directory', '--compress=zstd:1',
"--file=$tempdir/compression_zstd_dir", 'postgres',
],
# Give coverage for manually compressed blob.toc files during
# restore.
compress_cmd => {
program => $ENV{'ZSTD'},
args => [
'-z', '-f', '--rm',
"$tempdir/compression_zstd_dir/blobs.toc",
"-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
],
},
# Verify that data files were compressed
glob_patterns => [
"$tempdir/compression_zstd_dir/toc.dat",
"$tempdir/compression_zstd_dir/*.dat.zst",
],
restore_cmd => [
'pg_restore', '--jobs=2',
"--file=$tempdir/compression_zstd_dir.sql",
"$tempdir/compression_zstd_dir",
],
},
compression_zstd_plain => {
test_key => 'compression',
compile_option => 'zstd',
dump_cmd => [
'pg_dump', '--format=plain', '--compress=zstd',
"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
],
# Decompress the generated file to run through the tests.
compress_cmd => {
program => $ENV{'ZSTD'},
args => [
'-d', '-f',
"$tempdir/compression_zstd_plain.sql.zst",
"-o", "$tempdir/compression_zstd_plain.sql",
],
},
},
clean => {
dump_cmd => [
'pg_dump',
@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs)
my $test_key = $run;
my $run_db = 'postgres';
# Skip command-level tests for gzip/lz4 if there is no support for it.
# Skip command-level tests for gzip/lz4/zstd if the tool is not supported
if ($pgdump_runs{$run}->{compile_option} &&
(($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) ||
($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd)))
{
note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
next;

View File

@ -154,6 +154,7 @@ do
test "$f" = src/bin/pg_dump/compress_io.h && continue
test "$f" = src/bin/pg_dump/compress_lz4.h && continue
test "$f" = src/bin/pg_dump/compress_none.h && continue
test "$f" = src/bin/pg_dump/compress_zstd.h && continue
test "$f" = src/bin/pg_dump/parallel.h && continue
test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
test "$f" = src/bin/pg_dump/pg_dump.h && continue

View File

@ -3937,3 +3937,4 @@ yyscan_t
z_stream
z_streamp
zic_t
ZSTD_CStream