From 84adc8e20f54e93a003cd316fa1eb9b03e393288 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 5 Apr 2023 21:38:04 +0200 Subject: [PATCH] pg_dump: Add support for zstd compression Allow pg_dump to use the zstd compression, in addition to gzip/lz4. Bulk of the new compression method is implemented in compress_zstd.{c,h}, covering the pg_dump compression APIs. The rest of the patch adds test and makes various places aware of the new compression method. The zstd library (which this patch relies on) supports multithreaded compression since version 1.5. We however disallow that feature for now, as it might interfere with parallel backups on platforms that rely on threads (e.g. Windows). This can be improved / relaxed in the future. This also fixes a minor issue in InitDiscoverCompressFileHandle(), which was not updated to check if the file already has the .lz4 extension. Adding zstd compression was originally proposed in 2020 (see the second thread), but then was reworked to use the new compression API introduced in e9960732a9. I've considered both threads when compiling the list of reviewers. Author: Justin Pryzby Reviewed-by: Tomas Vondra, Jacob Champion, Andreas Karlsson Discussion: https://postgr.es/m/20230224191840.GD1653@telsasoft.com Discussion: https://postgr.es/m/20201221194924.GI30237@telsasoft.com --- doc/src/sgml/ref/pg_dump.sgml | 13 +- src/bin/pg_dump/Makefile | 2 + src/bin/pg_dump/compress_io.c | 72 ++-- src/bin/pg_dump/compress_zstd.c | 537 ++++++++++++++++++++++++++ src/bin/pg_dump/compress_zstd.h | 25 ++ src/bin/pg_dump/meson.build | 4 +- src/bin/pg_dump/pg_backup_archiver.c | 6 +- src/bin/pg_dump/pg_backup_directory.c | 2 + src/bin/pg_dump/pg_dump.c | 25 +- src/bin/pg_dump/t/002_pg_dump.pl | 79 +++- src/tools/pginclude/cpluspluscheck | 1 + src/tools/pgindent/typedefs.list | 1 + 12 files changed, 713 insertions(+), 54 deletions(-) create mode 100644 src/bin/pg_dump/compress_zstd.c create mode 100644 src/bin/pg_dump/compress_zstd.h diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index 77299878e0..8de38e0fd0 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -330,8 +330,9 @@ PostgreSQL documentation machine-readable format that pg_restore can read. A directory format archive can be manipulated with standard Unix tools; for example, files in an uncompressed archive - can be compressed with the gzip or - lz4 tools. + can be compressed with the gzip, + lz4, or + zstd tools. This format is compressed by default using gzip and also supports parallel dumps. @@ -655,7 +656,8 @@ PostgreSQL documentation Specify the compression method and/or the compression level to use. The compression method can be set to gzip, - lz4, or none for no compression. + lz4, zstd, + or none for no compression. A compression detail string can optionally be specified. If the detail string is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the @@ -676,8 +678,9 @@ PostgreSQL documentation individual table-data segments, and the default is to compress using gzip at a moderate level. For plain text output, setting a nonzero compression level causes the entire output file to be compressed, - as though it had been fed through gzip or - lz4; but the default is not to compress. + as though it had been fed through gzip, + lz4, or zstd; + but the default is not to compress. The tar archive format currently does not support compression at all. diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index eb8f59459a..24de7593a6 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global export GZIP_PROGRAM=$(GZIP) export LZ4 +export ZSTD export with_icu override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) @@ -29,6 +30,7 @@ OBJS = \ compress_io.o \ compress_lz4.o \ compress_none.o \ + compress_zstd.o \ dumputils.o \ parallel.o \ pg_backup_archiver.o \ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 0972a4f934..db19058354 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -52,8 +52,8 @@ * * InitDiscoverCompressFileHandle tries to infer the compression by the * filename suffix. If the suffix is not yet known then it tries to simply - * open the file and if it fails, it tries to open the same file with the .gz - * suffix, and then again with the .lz4 suffix. + * open the file and if it fails, it tries to open the same file with + * compressed suffixes (.gz, .lz4 and .zst, in this order). * * IDENTIFICATION * src/bin/pg_dump/compress_io.c @@ -69,6 +69,7 @@ #include "compress_io.h" #include "compress_lz4.h" #include "compress_none.h" +#include "compress_zstd.h" #include "pg_backup_utils.h" /*---------------------- @@ -77,7 +78,8 @@ */ /* - * Checks whether a compression algorithm is supported. + * Checks whether support for a compression algorithm is implemented in + * pg_dump/restore. * * On success returns NULL, otherwise returns a malloc'ed string which can be * used by the caller in an error message. @@ -98,6 +100,10 @@ supports_compression(const pg_compress_specification compression_spec) if (algorithm == PG_COMPRESSION_LZ4) supported = true; #endif +#ifdef USE_ZSTD + if (algorithm == PG_COMPRESSION_ZSTD) + supported = true; +#endif if (!supported) return psprintf("this build does not support compression with %s", @@ -130,6 +136,8 @@ AllocateCompressor(const pg_compress_specification compression_spec, InitCompressorGzip(cs, compression_spec); else if (compression_spec.algorithm == PG_COMPRESSION_LZ4) InitCompressorLZ4(cs, compression_spec); + else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD) + InitCompressorZstd(cs, compression_spec); return cs; } @@ -196,20 +204,36 @@ InitCompressFileHandle(const pg_compress_specification compression_spec) InitCompressFileHandleGzip(CFH, compression_spec); else if (compression_spec.algorithm == PG_COMPRESSION_LZ4) InitCompressFileHandleLZ4(CFH, compression_spec); + else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD) + InitCompressFileHandleZstd(CFH, compression_spec); return CFH; } +/* + * Checks if a compressed file (with the specified extension) exists. + * + * The filename of the tested file is stored to fname buffer (the existing + * buffer is freed, new buffer is allocated and returned through the pointer). + */ +static bool +check_compressed_file(const char *path, char **fname, char *ext) +{ + free_keep_errno(*fname); + *fname = psprintf("%s.%s", path, ext); + return (access(*fname, F_OK) == 0); +} + /* * Open a file for reading. 'path' is the file to open, and 'mode' should * be either "r" or "rb". * * If the file at 'path' contains the suffix of a supported compression method, - * currently this includes ".gz" and ".lz4", then this compression will be used + * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used * throughout. Otherwise the compression will be inferred by iteratively trying * to open the file at 'path', first as is, then by appending known compression * suffixes. So if you pass "foo" as 'path', this will open either "foo" or - * "foo.gz" or "foo.lz4", trying in that order. + * "foo.{gz,lz4,zst}", trying in that order. * * On failure, return NULL with an error code in errno. */ @@ -229,36 +253,20 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode) if (hasSuffix(fname, ".gz")) compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (hasSuffix(fname, ".lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (hasSuffix(fname, ".zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; else { - bool exists; - - exists = (stat(path, &st) == 0); - /* avoid unused warning if it is not built with compression */ - if (exists) + if (stat(path, &st) == 0) compression_spec.algorithm = PG_COMPRESSION_NONE; -#ifdef HAVE_LIBZ - if (!exists) - { - free_keep_errno(fname); - fname = psprintf("%s.gz", path); - exists = (stat(fname, &st) == 0); - - if (exists) - compression_spec.algorithm = PG_COMPRESSION_GZIP; - } -#endif -#ifdef USE_LZ4 - if (!exists) - { - free_keep_errno(fname); - fname = psprintf("%s.lz4", path); - exists = (stat(fname, &st) == 0); - - if (exists) - compression_spec.algorithm = PG_COMPRESSION_LZ4; - } -#endif + else if (check_compressed_file(path, &fname, "gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (check_compressed_file(path, &fname, "lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (check_compressed_file(path, &fname, "zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; } CFH = InitCompressFileHandle(compression_spec); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c new file mode 100644 index 0000000000..aa16822dff --- /dev/null +++ b/src/bin/pg_dump/compress_zstd.c @@ -0,0 +1,537 @@ +/*------------------------------------------------------------------------- + * + * compress_zstd.c + * Routines for archivers to write a Zstd compressed data stream. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_zstd.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "pg_backup_utils.h" +#include "compress_zstd.h" + +#ifndef USE_ZSTD + +void +InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "ZSTD"); +} + +void +InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "ZSTD"); +} + +#else + +#include + +typedef struct ZstdCompressorState +{ + /* This is a normal file to which we read/write compressed data */ + FILE *fp; + + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + + /* pointer to a static string like from strerror(), for Zstd_write() */ + const char *zstderror; +} ZstdCompressorState; + +static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress); +static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs); +static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); +static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs); + +static void +_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream, + ZSTD_cParameter param, int value, char *paramname) +{ + size_t res; + + res = ZSTD_CCtx_setParameter(cstream, param, value); + if (ZSTD_isError(res)) + pg_fatal("could not set compression parameter: \"%s\": %s", + paramname, ZSTD_getErrorName(res)); +} + +/* Return a compression stream with parameters set per argument */ +static ZSTD_CStream * +_ZstdCStreamParams(pg_compress_specification compress) +{ + ZSTD_CStream *cstream; + + cstream = ZSTD_createCStream(); + if (cstream == NULL) + pg_fatal("could not initialize compression library"); + + _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel, + compress.level, "level"); + + return cstream; +} + +/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */ +static void +_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + + /* Loop while there's any input or until flushed */ + while (input->pos != input->size || flush) + { + size_t res; + + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, + input, flush ? ZSTD_e_end : ZSTD_e_continue); + + if (ZSTD_isError(res)) + pg_fatal("could not compress data: %s", ZSTD_getErrorName(res)); + + /* + * Extra paranoia: avoid zero-length chunks, since a zero length chunk + * is the EOF marker in the custom format. This should never happen + * but... + */ + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + if (res == 0) + break; /* End of frame or all input consumed */ + } +} + +static void +EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + + if (cs->readF != NULL) + { + Assert(zstdcs->cstream == NULL); + ZSTD_freeDStream(zstdcs->dstream); + pg_free(unconstify(void *, zstdcs->input.src)); + } + else if (cs->writeF != NULL) + { + Assert(zstdcs->dstream == NULL); + _ZstdWriteCommon(AH, cs, true); + ZSTD_freeCStream(zstdcs->cstream); + pg_free(zstdcs->output.dst); + } + + pg_free(zstdcs); +} + +static void +WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + + zstdcs->input.src = data; + zstdcs->input.size = dLen; + zstdcs->input.pos = 0; + + _ZstdWriteCommon(AH, cs, false); +} + +static void +ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + ZSTD_outBuffer *output = &zstdcs->output; + ZSTD_inBuffer *input = &zstdcs->input; + size_t input_allocated_size = ZSTD_DStreamInSize(); + size_t res; + + for (;;) + { + size_t cnt; + + /* + * Read compressed data. Note that readF can resize the buffer; the + * new size is tracked and used for future loops. + */ + input->size = input_allocated_size; + cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size); + + /* ensure that readF didn't *shrink* the buffer */ + Assert(input->size >= input_allocated_size); + input_allocated_size = input->size; + input->size = cnt; + input->pos = 0; + + if (cnt == 0) + break; + + /* Now decompress */ + while (input->pos < input->size) + { + output->pos = 0; + res = ZSTD_decompressStream(zstdcs->dstream, output, input); + if (ZSTD_isError(res)) + pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + /* + * then write the decompressed data to the output handle + */ + ((char *) output->dst)[output->pos] = '\0'; + ahwrite(output->dst, 1, output->pos, AH); + + if (res == 0) + break; /* End of frame */ + } + } +} + +/* Public routine that supports Zstd compressed data I/O */ +void +InitCompressorZstd(CompressorState *cs, + const pg_compress_specification compression_spec) +{ + ZstdCompressorState *zstdcs; + + cs->readData = ReadDataFromArchiveZstd; + cs->writeData = WriteDataToArchiveZstd; + cs->end = EndCompressorZstd; + + cs->compression_spec = compression_spec; + + zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs)); + cs->private_data = zstdcs; + + /* We expect that exactly one of readF/writeF is specified */ + Assert((cs->readF == NULL) != (cs->writeF == NULL)); + + if (cs->readF != NULL) + { + zstdcs->dstream = ZSTD_createDStream(); + if (zstdcs->dstream == NULL) + pg_fatal("could not initialize compression library"); + + zstdcs->input.size = ZSTD_DStreamInSize(); + zstdcs->input.src = pg_malloc(zstdcs->input.size); + + /* + * output.size is the buffer size we tell zstd it can output to. + * Allocate an additional byte such that ReadDataFromArchiveZstd() can + * call ahwrite() with a null-terminated string, which is an optimized + * case in ExecuteSqlCommandBuf(). + */ + zstdcs->output.size = ZSTD_DStreamOutSize(); + zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1); + } + else if (cs->writeF != NULL) + { + zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec); + + zstdcs->output.size = ZSTD_CStreamOutSize(); + zstdcs->output.dst = pg_malloc(zstdcs->output.size); + zstdcs->output.pos = 0; + } +} + +/* + * Compressed stream API + */ + +static bool +Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + size_t input_allocated_size = ZSTD_DStreamInSize(); + size_t res, + cnt; + + output->size = size; + output->dst = ptr; + output->pos = 0; + + for (;;) + { + Assert(input->pos <= input->size); + Assert(input->size <= input_allocated_size); + + /* + * If the input is completely consumed, start back at the beginning + */ + if (input->pos == input->size) + { + /* input->size is size produced by "fread" */ + input->size = 0; + /* input->pos is position consumed by decompress */ + input->pos = 0; + } + + /* read compressed data if we must produce more input */ + if (input->pos == input->size) + { + cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp); + input->size = cnt; + + Assert(cnt <= input_allocated_size); + + /* If we have no more input to consume, we're done */ + if (cnt == 0) + break; + } + + while (input->pos < input->size) + { + /* now decompress */ + res = ZSTD_decompressStream(zstdcs->dstream, output, input); + + if (ZSTD_isError(res)) + pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + if (output->pos == output->size) + break; /* No more room for output */ + + if (res == 0) + break; /* End of frame */ + } + + if (output->pos == output->size) + break; /* We read all the data that fits */ + } + + if (rdsize != NULL) + *rdsize = output->pos; + + return true; +} + +static bool +Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + size_t res, + cnt; + + input->src = ptr; + input->size = size; + input->pos = 0; + + /* Consume all input, to be flushed later */ + while (input->pos != input->size) + { + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue); + if (ZSTD_isError(res)) + { + zstdcs->zstderror = ZSTD_getErrorName(res); + return false; + } + + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + zstdcs->zstderror = strerror(errno); + return false; + } + } + + return size; +} + +static int +Zstd_getc(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + int ret; + + if (CFH->read_func(&ret, 1, NULL, CFH) != 1) + { + if (feof(zstdcs->fp)) + pg_fatal("could not read from input file: end of file"); + else + pg_fatal("could not read from input file: %m"); + } + return ret; +} + +static char * +Zstd_gets(char *buf, int len, CompressFileHandle *CFH) +{ + int i; + + Assert(len > 0); + + /* + * Read one byte at a time until newline or EOF. This is only used to read + * the list of LOs, and the I/O is buffered anyway. + */ + for (i = 0; i < len - 1; ++i) + { + size_t readsz; + + if (!CFH->read_func(&buf[i], 1, &readsz, CFH)) + break; + if (readsz != 1) + break; + if (buf[i] == '\n') + { + ++i; + break; + } + } + buf[i] = '\0'; + return i > 0 ? buf : NULL; +} + +static bool +Zstd_close(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + if (zstdcs->cstream) + { + size_t res, + cnt; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + + /* Loop until the compression buffers are fully consumed */ + for (;;) + { + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end); + if (ZSTD_isError(res)) + { + zstdcs->zstderror = ZSTD_getErrorName(res); + return false; + } + + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + zstdcs->zstderror = strerror(errno); + return false; + } + + if (res == 0) + break; /* End of frame */ + } + + ZSTD_freeCStream(zstdcs->cstream); + pg_free(zstdcs->output.dst); + } + + if (zstdcs->dstream) + { + ZSTD_freeDStream(zstdcs->dstream); + pg_free(unconstify(void *, zstdcs->input.src)); + } + + if (fclose(zstdcs->fp) != 0) + return false; + + pg_free(zstdcs); + return true; +} + +static bool +Zstd_eof(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + return feof(zstdcs->fp); +} + +static bool +Zstd_open(const char *path, int fd, const char *mode, + CompressFileHandle *CFH) +{ + FILE *fp; + ZstdCompressorState *zstdcs; + + if (fd >= 0) + fp = fdopen(fd, mode); + else + fp = fopen(path, mode); + + if (fp == NULL) + return false; + + zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs)); + CFH->private_data = zstdcs; + zstdcs->fp = fp; + + if (mode[0] == 'r') + { + zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize()); + zstdcs->dstream = ZSTD_createDStream(); + if (zstdcs->dstream == NULL) + pg_fatal("could not initialize compression library"); + } + else if (mode[0] == 'w' || mode[0] == 'a') + { + zstdcs->output.size = ZSTD_CStreamOutSize(); + zstdcs->output.dst = pg_malloc0(zstdcs->output.size); + zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec); + if (zstdcs->cstream == NULL) + pg_fatal("could not initialize compression library"); + } + else + pg_fatal("unhandled mode"); + + return true; +} + +static bool +Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH) +{ + char fname[MAXPGPATH]; + + sprintf(fname, "%s.zst", path); + return CFH->open_func(fname, -1, mode, CFH); +} + +static const char * +Zstd_get_error(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + return zstdcs->zstderror; +} + +void +InitCompressFileHandleZstd(CompressFileHandle *CFH, + const pg_compress_specification compression_spec) +{ + CFH->open_func = Zstd_open; + CFH->open_write_func = Zstd_open_write; + CFH->read_func = Zstd_read; + CFH->write_func = Zstd_write; + CFH->gets_func = Zstd_gets; + CFH->getc_func = Zstd_getc; + CFH->close_func = Zstd_close; + CFH->eof_func = Zstd_eof; + CFH->get_error_func = Zstd_get_error; + + CFH->compression_spec = compression_spec; + + CFH->private_data = NULL; +} + +#endif /* USE_ZSTD */ diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h new file mode 100644 index 0000000000..2aaa6b100b --- /dev/null +++ b/src/bin/pg_dump/compress_zstd.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * compress_zstd.h + * Zstd interface to compress_io.c routines + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_zstd.h + * + *------------------------------------------------------------------------- + */ + +#ifndef COMPRESS_ZSTD_H +#define COMPRESS_ZSTD_H + +#include "compress_io.h" + +extern void InitCompressorZstd(CompressorState *cs, + const pg_compress_specification compression_spec); +extern void InitCompressFileHandleZstd(CompressFileHandle *CFH, + const pg_compress_specification compression_spec); + +#endif /* COMPRESS_ZSTD_H */ diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index b2fb7ac77f..9d59a106f3 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -5,6 +5,7 @@ pg_dump_common_sources = files( 'compress_io.c', 'compress_lz4.c', 'compress_none.c', + 'compress_zstd.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', @@ -19,7 +20,7 @@ pg_dump_common_sources = files( pg_dump_common = static_library('libpgdump_common', pg_dump_common_sources, c_pch: pch_postgres_fe_h, - dependencies: [frontend_code, libpq, lz4, zlib], + dependencies: [frontend_code, libpq, lz4, zlib, zstd], kwargs: internal_lib_args, ) @@ -90,6 +91,7 @@ tests += { 'env': { 'GZIP_PROGRAM': gzip.path(), 'LZ4': program_lz4.found() ? program_lz4.path() : '', + 'ZSTD': program_zstd.found() ? program_zstd.path() : '', 'with_icu': icu.found() ? 'yes' : 'no', }, 'tests': [ diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index ab77e373e9..d518349e10 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2120,7 +2120,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) /* * Check if the specified archive is a directory. If so, check if - * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it. + * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it. */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { @@ -2134,6 +2134,10 @@ _discoverArchiveFormat(ArchiveHandle *AH) #ifdef USE_LZ4 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4")) return AH->format; +#endif +#ifdef USE_ZSTD + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst")) + return AH->format; #endif pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index abaaa3b10e..2177d5ff42 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH) strlcat(fname, ".gz", sizeof(fname)); else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4) strlcat(fname, ".lz4", sizeof(fname)); + else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD) + strlcat(fname, ".zst", sizeof(fname)); if (stat(fname, &st) == 0) te->dataLength = st.st_size; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6abbcff683..7a504dfe25 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -56,6 +56,7 @@ #include "catalog/pg_type_d.h" #include "common/connect.h" #include "common/relpath.h" +#include "compress_io.h" #include "dumputils.h" #include "fe_utils/option_utils.h" #include "fe_utils/string_utils.h" @@ -735,18 +736,18 @@ main(int argc, char **argv) pg_fatal("invalid compression specification: %s", error_detail); - switch (compression_algorithm) - { - case PG_COMPRESSION_NONE: - /* fallthrough */ - case PG_COMPRESSION_GZIP: - /* fallthrough */ - case PG_COMPRESSION_LZ4: - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } + error_detail = supports_compression(compression_spec); + if (error_detail != NULL) + pg_fatal("%s", error_detail); + + /* + * Disable support for zstd workers for now - these are based on threading, + * and it's unclear how it interacts with parallel dumps on platforms where + * that relies on threads too (e.g. Windows). + */ + if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS) + pg_log_warning("compression option \"%s\" is not currently supported by pg_dump", + "workers"); /* * Custom and directory formats are compressed by default with gzip when diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index df26ba42d6..b5c97694e3 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir; # those lines) to validate that part of the process. my $supports_icu = ($ENV{with_icu} eq 'yes'); -my $supports_lz4 = check_pg_config("#define USE_LZ4 1"); my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1"); +my $supports_lz4 = check_pg_config("#define USE_LZ4 1"); +my $supports_zstd = check_pg_config("#define USE_ZSTD 1"); my %pgdump_runs = ( binary_upgrade => { @@ -213,6 +214,77 @@ my %pgdump_runs = ( }, }, + compression_zstd_custom => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--format=custom', + '--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump", + 'postgres', + ], + restore_cmd => [ + 'pg_restore', + "--file=$tempdir/compression_zstd_custom.sql", + "$tempdir/compression_zstd_custom.dump", + ], + command_like => { + command => [ + 'pg_restore', + '-l', "$tempdir/compression_zstd_custom.dump", + ], + expected => qr/Compression: zstd/, + name => 'data content is zstd compressed' + }, + }, + + compression_zstd_dir => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--jobs=2', + '--format=directory', '--compress=zstd:1', + "--file=$tempdir/compression_zstd_dir", 'postgres', + ], + # Give coverage for manually compressed blob.toc files during + # restore. + compress_cmd => { + program => $ENV{'ZSTD'}, + args => [ + '-z', '-f', '--rm', + "$tempdir/compression_zstd_dir/blobs.toc", + "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst", + ], + }, + # Verify that data files were compressed + glob_patterns => [ + "$tempdir/compression_zstd_dir/toc.dat", + "$tempdir/compression_zstd_dir/*.dat.zst", + ], + restore_cmd => [ + 'pg_restore', '--jobs=2', + "--file=$tempdir/compression_zstd_dir.sql", + "$tempdir/compression_zstd_dir", + ], + }, + + compression_zstd_plain => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--format=plain', '--compress=zstd', + "--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres', + ], + # Decompress the generated file to run through the tests. + compress_cmd => { + program => $ENV{'ZSTD'}, + args => [ + '-d', '-f', + "$tempdir/compression_zstd_plain.sql.zst", + "-o", "$tempdir/compression_zstd_plain.sql", + ], + }, + }, + clean => { dump_cmd => [ 'pg_dump', @@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs) my $test_key = $run; my $run_db = 'postgres'; - # Skip command-level tests for gzip/lz4 if there is no support for it. + # Skip command-level tests for gzip/lz4/zstd if the tool is not supported if ($pgdump_runs{$run}->{compile_option} && (($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) || - ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))) + ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) || + ($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd))) { note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support"; next; diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck index b0e9aa99a2..4e09c4686b 100755 --- a/src/tools/pginclude/cpluspluscheck +++ b/src/tools/pginclude/cpluspluscheck @@ -154,6 +154,7 @@ do test "$f" = src/bin/pg_dump/compress_io.h && continue test "$f" = src/bin/pg_dump/compress_lz4.h && continue test "$f" = src/bin/pg_dump/compress_none.h && continue + test "$f" = src/bin/pg_dump/compress_zstd.h && continue test "$f" = src/bin/pg_dump/parallel.h && continue test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue test "$f" = src/bin/pg_dump/pg_dump.h && continue diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5c0410869f..065acb6f50 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3937,3 +3937,4 @@ yyscan_t z_stream z_streamp zic_t +ZSTD_CStream