diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 2fa3cedfe9..98f0bc3cc3 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2739,17 +2739,23 @@ The commands accepted in replication mode are: option. If the value is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form keyword or - keyword=value. Currently, the only supported - keyword is level, which sets the compression - level. + keyword=value. Currently, the supported keywords + are level and workers. + The level keyword sets the compression level. For gzip the compression level should be an integer between 1 and 9, for lz4 an integer between 1 and 12, and for zstd an integer between 1 and 22. + + + The workers keyword sets the number of threads + that should be used for parallel compression. Parallel compression + is supported only for zstd. + diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index d9233beb8e..82f5f60625 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -424,8 +424,8 @@ PostgreSQL documentation integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form keyword or keyword=value. - Currently, the only supported keyword is level, - which sets the compression level. + Currently, the supported keywords are level + and workers. If no compression level is specified, the default compression level diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index 5496eaa72b..f6876f4811 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -25,8 +25,8 @@ typedef struct bbsink_zstd /* Common information for all types of sink. */ bbsink base; - /* Compression level */ - int compresslevel; + /* Compression options */ + bc_specification *compress; ZSTD_CCtx *cctx; ZSTD_outBuffer zstd_outBuf; @@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) return NULL; /* keep compiler quiet */ #else bbsink_zstd *sink; - int compresslevel; Assert(next != NULL); - if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0) - compresslevel = 0; - else - { - compresslevel = compress->level; - Assert(compresslevel >= 1 && compresslevel <= 22); - } - sink = palloc0(sizeof(bbsink_zstd)); *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; - sink->compresslevel = compresslevel; + sink->compress = compress; return &sink->base; #endif @@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink) bbsink_zstd *mysink = (bbsink_zstd *) sink; size_t output_buffer_bound; size_t ret; + bc_specification *compress = mysink->compress; mysink->cctx = ZSTD_createCCtx(); if (!mysink->cctx) elog(ERROR, "could not create zstd compression context"); - ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, - mysink->compresslevel); - if (ZSTD_isError(ret)) - elog(ERROR, "could not set zstd compression level to %d: %s", - mysink->compresslevel, ZSTD_getErrorName(ret)); + if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0) + { + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + elog(ERROR, "could not set zstd compression level to %d: %s", + compress->level, ZSTD_getErrorName(ret)); + } + + if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0) + { + /* + * On older versions of libzstd, this option does not exist, and trying + * to set it will fail. Similarly for newer versions if they are + * compiled without threading support. + */ + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret))); + } /* * We need our own buffer, because we're going to pass different data to diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 7946b6350b..f94c5c041d 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -67,7 +67,6 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) { #ifdef USE_ZSTD bbstreamer_zstd_frame *streamer; - int compresslevel; size_t ret; Assert(next != NULL); @@ -88,18 +87,35 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) exit(1); } - /* Initialize stream compression preferences */ - if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0) - compresslevel = 0; - else - compresslevel = compress->level; - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, - compresslevel); - if (ZSTD_isError(ret)) + /* Set compression level, if specified */ + if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0) { - pg_log_error("could not set zstd compression level to %d: %s", - compresslevel, ZSTD_getErrorName(ret)); - exit(1); + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set zstd compression level to %d: %s", + compress->level, ZSTD_getErrorName(ret)); + exit(1); + } + } + + /* Set # of workers, if specified */ + if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0) + { + /* + * On older versions of libzstd, this option does not exist, and + * trying to set it will fail. Similarly for newer versions if they + * are compiled without threading support. + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + exit(1); + } } /* Initialize the ZSTD output buffer. */ diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index 47f3d00ac4..5ba84c2250 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -130,6 +130,11 @@ my @compression_failure_tests = ( 'invalid compression specification: found empty string where a compression option was expected', 'failure on extra, empty compression option' ], + [ + 'gzip:workers=3', + 'invalid compression specification: compression algorithm "gzip" does not accept a worker count', + 'failure on worker count for gzip' + ], ); for my $cft (@compression_failure_tests) { diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl index 41a5b370cc..d6f11b9553 100644 --- a/src/bin/pg_verifybackup/t/009_extract.pl +++ b/src/bin/pg_verifybackup/t/009_extract.pl @@ -34,6 +34,12 @@ my @test_configuration = ( 'compression_method' => 'zstd', 'backup_flags' => ['--compress', 'server-zstd:5'], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'server-zstd:workers=3'], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -55,8 +61,27 @@ for my $tc (@test_configuration) my @verify = ('pg_verifybackup', '-e', $backup_path); # A backup with a valid compression method should work. - $primary->command_ok(\@backup, - "backup done, compression method \"$method\""); + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 2; + } + else + { + ok($backup_result, "backup done, compression $method"); + } # Make sure that it verifies OK. $primary->command_ok(\@verify, diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index 488a6d1ede..c1cd12cb06 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -49,6 +49,15 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => [ '-d' ], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'client-zstd:workers=3'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -69,9 +78,27 @@ for my $tc (@test_configuration) 'pg_basebackup', '-D', $backup_path, '-Xfetch', '--no-sync', '-cfast', '-Ft'); push @backup, @{$tc->{'backup_flags'}}; - $primary->command_ok(\@backup, - "client side backup, compression $method"); - + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 3; + } + else + { + ok($backup_result, "client side backup, compression $method"); + } # Verify that the we got the files we expected. my $backup_files = join(',', diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c index 0650f975c4..969e08cca2 100644 --- a/src/common/backup_compression.c +++ b/src/common/backup_compression.c @@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification, result->level = expect_integer_value(keyword, value, result); result->options |= BACKUP_COMPRESSION_OPTION_LEVEL; } + else if (strcmp(keyword, "workers") == 0) + { + result->workers = expect_integer_value(keyword, value, result); + result->options |= BACKUP_COMPRESSION_OPTION_WORKERS; + } else result->parse_error = psprintf(_("unknown compression option \"%s\""), keyword); @@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec) min_level, max_level); } + /* + * Of the compression algorithms that we currently support, only zstd + * allows parallel workers. + */ + if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 && + (spec->algorithm != BACKUP_COMPRESSION_ZSTD)) + { + return psprintf(_("compression algorithm \"%s\" does not accept a worker count"), + get_bc_algorithm_name(spec->algorithm)); + } + return NULL; } diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h index 0565cbc657..6a0ecaa99c 100644 --- a/src/include/common/backup_compression.h +++ b/src/include/common/backup_compression.h @@ -23,12 +23,14 @@ typedef enum bc_algorithm } bc_algorithm; #define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0) +#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1) typedef struct bc_specification { bc_algorithm algorithm; unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */ int level; + int workers; char *parse_error; /* NULL if parsing was OK, else message */ } bc_specification;