diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a830b199f5..876d20611b 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -570,7 +570,7 @@ LogStreamerMain(logstreamer_param *param) return 1; } - if (!stream.walmethod->finish()) + if (!stream.walmethod->ops->finish(stream.walmethod)) { pg_log_error("could not finish writing WAL files: %m"); #ifdef WIN32 @@ -581,11 +581,7 @@ LogStreamerMain(logstreamer_param *param) PQfinish(param->bgconn); - if (format == 'p') - FreeWalDirectoryMethod(); - else - FreeWalTarMethod(); - pg_free(stream.walmethod); + stream.walmethod->ops->free(stream.walmethod); return 0; } diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 5c22c914bc..a7180e2955 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -658,7 +658,7 @@ StreamLog(void) ReceiveXlogStream(conn, &stream); - if (!stream.walmethod->finish()) + if (!stream.walmethod->ops->finish(stream.walmethod)) { pg_log_info("could not finish writing WAL files: %m"); return; @@ -667,9 +667,7 @@ StreamLog(void) PQfinish(conn); conn = NULL; - FreeWalDirectoryMethod(); - pg_free(stream.walmethod); - pg_free(stream.sysidentifier); + stream.walmethod->ops->free(stream.walmethod); } /* diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index a619176511..9c71323d70 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -59,18 +59,19 @@ mark_file_as_archived(StreamCtl *stream, const char *fname) snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done", fname); - f = stream->walmethod->open_for_write(tmppath, NULL, 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath, + NULL, 0); if (f == NULL) { pg_log_error("could not create archive status file \"%s\": %s", - tmppath, stream->walmethod->getlasterror()); + tmppath, GetLastWalMethodError(stream->walmethod)); return false; } - if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) + if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0) { pg_log_error("could not close archive status file \"%s\": %s", - tmppath, stream->walmethod->getlasterror()); + tmppath, GetLastWalMethodError(stream->walmethod)); return false; } @@ -98,8 +99,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) XLogFileName(walfile_name, stream->timeline, segno, WalSegSz); /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(walfile_name, - stream->partial_suffix); + fn = stream->walmethod->ops->get_file_name(stream->walmethod, + walfile_name, + stream->partial_suffix); /* * When streaming to files, if an existing file exists we verify that it's @@ -111,35 +113,35 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) * When streaming to tar, no file with this name will exist before, so we * never have to verify a size. */ - if (stream->walmethod->compression_algorithm() == PG_COMPRESSION_NONE && - stream->walmethod->existsfile(fn)) + if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE && + stream->walmethod->ops->existsfile(stream->walmethod, fn)) { - size = stream->walmethod->get_file_size(fn); + size = stream->walmethod->ops->get_file_size(stream->walmethod, fn); if (size < 0) { pg_log_error("could not get size of write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } if (size == WalSegSz) { /* Already padded file. Open it for use */ - f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0); if (f == NULL) { pg_log_error("could not open existing write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } /* fsync file in case of a previous crash */ - if (stream->walmethod->sync(f) != 0) + if (stream->walmethod->ops->sync(f) != 0) { pg_log_error("could not fsync existing write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); - stream->walmethod->close(f, CLOSE_UNLINK); + fn, GetLastWalMethodError(stream->walmethod)); + stream->walmethod->ops->close(f, CLOSE_UNLINK); exit(1); } @@ -164,12 +166,14 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) /* No file existed, so create one */ - f = stream->walmethod->open_for_write(walfile_name, - stream->partial_suffix, WalSegSz); + f = stream->walmethod->ops->open_for_write(stream->walmethod, + walfile_name, + stream->partial_suffix, + WalSegSz); if (f == NULL) { pg_log_error("could not open write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } @@ -199,28 +203,29 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) currpos = walfile->currpos; /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(walfile_name, - stream->partial_suffix); + fn = stream->walmethod->ops->get_file_name(stream->walmethod, + walfile_name, + stream->partial_suffix); if (stream->partial_suffix) { if (currpos == WalSegSz) - r = stream->walmethod->close(walfile, CLOSE_NORMAL); + r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL); else { pg_log_info("not renaming \"%s\", segment is not complete", fn); - r = stream->walmethod->close(walfile, CLOSE_NO_RENAME); + r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME); } } else - r = stream->walmethod->close(walfile, CLOSE_NORMAL); + r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL); walfile = NULL; if (r != 0) { pg_log_error("could not close file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; @@ -263,7 +268,7 @@ existsTimeLineHistoryFile(StreamCtl *stream) TLHistoryFileName(histfname, stream->timeline); - return stream->walmethod->existsfile(histfname); + return stream->walmethod->ops->existsfile(stream->walmethod, histfname); } static bool @@ -285,31 +290,32 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) return false; } - f = stream->walmethod->open_for_write(histfname, ".tmp", 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, + histfname, ".tmp", 0); if (f == NULL) { pg_log_error("could not create timeline history file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); return false; } - if ((int) stream->walmethod->write(f, content, size) != size) + if ((int) stream->walmethod->ops->write(f, content, size) != size) { pg_log_error("could not write timeline history file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); /* * If we fail to make the file, delete it to release disk space */ - stream->walmethod->close(f, CLOSE_UNLINK); + stream->walmethod->ops->close(f, CLOSE_UNLINK); return false; } - if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) + if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0) { pg_log_error("could not close file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); return false; } @@ -678,9 +684,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } error: - if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0) + if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0) pg_log_error("could not close file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); walfile = NULL; return false; } @@ -765,9 +771,9 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, */ if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL) { - if (stream->walmethod->sync(walfile) != 0) + if (stream->walmethod->ops->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); lastFlushPosition = blockpos; /* @@ -1012,9 +1018,9 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * data has been successfully replicated or not, at the normal * shutdown of the server. */ - if (stream->walmethod->sync(walfile) != 0) + if (stream->walmethod->ops->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); lastFlushPosition = blockpos; } @@ -1115,12 +1121,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } } - if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + if (stream->walmethod->ops->write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) { pg_log_error("could not write %d bytes to WAL file \"%s\": %s", bytes_to_write, walfile->pathname, - stream->walmethod->getlasterror()); + GetLastWalMethodError(stream->walmethod)); return false; } diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index d98a2681b9..bc2e83d02b 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -2,9 +2,6 @@ * * walmethods.c - implementations of different ways to write received wal * - * NOTE! The caller must ensure that only one method is instantiated in - * any given program, and that it's only instantiated once! - * * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group * * IDENTIFICATION @@ -43,19 +40,41 @@ *------------------------------------------------------------------------- */ +static Walfile *dir_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int dir_close(Walfile *f, WalCloseMethod method); +static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t dir_write(Walfile *f, const void *buf, size_t count); +static int dir_sync(Walfile *f); +static bool dir_finish(WalWriteMethod *wwmethod); +static void dir_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalDirectoryMethodOps = { + .open_for_write = dir_open_for_write, + .close = dir_close, + .existsfile = dir_existsfile, + .get_file_size = dir_get_file_size, + .get_file_name = dir_get_file_name, + .write = dir_write, + .sync = dir_sync, + .finish = dir_finish, + .free = dir_free +}; + /* * Global static data for this method */ typedef struct DirectoryMethodData { + WalWriteMethod base; char *basedir; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; } DirectoryMethodData; -static DirectoryMethodData *dir_data = NULL; /* * Local file handle @@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile #endif } DirectoryMethodFile; -#define dir_clear_error() \ - (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0) -#define dir_set_error(msg) \ - (dir_data->lasterrstring = _(msg)) - -static const char * -dir_getlasterror(void) -{ - if (dir_data->lasterrstring) - return dir_data->lasterrstring; - return strerror(dir_data->lasterrno); -} +#define clear_error(wwmethod) \ + ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0) static char * -dir_get_file_name(const char *pathname, const char *temp_suffix) +dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); snprintf(filename, MAXPGPATH, "%s%s%s", pathname, - dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : - dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", + wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : + wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", temp_suffix ? temp_suffix : ""); return filename; } static Walfile * -dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; char *filename; int fd; @@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ void *lz4buf = NULL; #endif - dir_clear_error(); + clear_error(wwmethod); - filename = dir_get_file_name(pathname, temp_suffix); + filename = dir_get_file_name(wwmethod, pathname, temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode); if (fd < 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { gzfp = gzdopen(fd, "wb"); if (gzfp == NULL) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } - if (gzsetparams(gzfp, dir_data->compression_level, + if (gzsetparams(gzfp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; gzclose(gzfp); return NULL; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t ctx_out; size_t header_size; @@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(ctx_out)) { - dir_data->lasterrstring = LZ4F_getErrorName(ctx_out); + wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out); close(fd); return NULL; } @@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ /* assign the compression level, default is 0 */ memset(&prefs, 0, sizeof(prefs)); - prefs.compressionLevel = dir_data->compression_level; + prefs.compressionLevel = wwmethod->compression_level; /* add the header */ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs); if (LZ4F_isError(header_size)) { - dir_data->lasterrstring = LZ4F_getErrorName(header_size); + wwmethod->lasterrstring = LZ4F_getErrorName(header_size); (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, lz4buf, header_size) != header_size) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #endif /* Do pre-padding on non-compressed files */ - if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE) + if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { PGAlignedXLogBlock zerobuf; int bytes; @@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; close(fd); return NULL; } @@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (lseek(fd, 0, SEEK_SET) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } @@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * important when using synchronous mode, where the file is modified and * fsynced in-place, without a directory fsync. */ - if (dir_data->sync) + if (wwmethod->sync) { if (fsync_fname(tmppath, false) != 0 || fsync_parent_path(tmppath) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) gzclose(gzfp); else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); (void) LZ4F_freeCompressionContext(ctx); @@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) f->gzfp = gzfp; #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { f->ctx = ctx; f->lz4buf = lz4buf; @@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } #endif + f->base.wwmethod = wwmethod; f->base.currpos = 0; f->base.pathname = pg_strdup(pathname); f->fd = fd; @@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count) DirectoryMethodFile *df = (DirectoryMethodFile *) f; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; r = (ssize_t) gzwrite(df->gzfp, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t chunk; size_t remaining; @@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } if (r > 0) @@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method) { int r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; + DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod; char tmppath[MAXPGPATH]; char tmppath2[MAXPGPATH]; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; /* in case gzclose() doesn't set it */ r = gzclose(df->gzfp); @@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method) else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t compressed; @@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method) * If we have a temp prefix, normal operation is to rename the * file. */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); /* permanent name, so no need for the prefix */ - filename2 = dir_get_file_name(df->base.pathname, NULL); + filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL); snprintf(tmppath2, sizeof(tmppath2), "%s/%s", dir_data->basedir, filename2); pg_free(filename2); - if (dir_data->sync) + if (f->wwmethod->sync) r = durable_rename(tmppath, tmppath2); else { @@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method) char *filename; /* Unlink the file once it's closed */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method) * CLOSE_NO_RENAME. In this case, fsync the file and containing * directory if sync mode is requested. */ - if (dir_data->sync) + if (f->wwmethod->sync) { r = fsync_fname(df->fullpath, false); if (r == 0) @@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method) } if (r != 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; #ifdef USE_LZ4 pg_free(df->lz4buf); @@ -501,23 +517,23 @@ dir_sync(Walfile *f) int r; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); - if (!dir_data->sync) + if (!f->wwmethod->sync) return 0; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) { - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { DirectoryMethodFile *df = (DirectoryMethodFile *) f; size_t compressed; @@ -526,7 +542,7 @@ dir_sync(Walfile *f) compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL); if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -534,7 +550,7 @@ dir_sync(Walfile *f) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } @@ -542,13 +558,14 @@ dir_sync(Walfile *f) r = fsync(((DirectoryMethodFile *) f)->fd); if (r < 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } static ssize_t -dir_get_file_size(const char *pathname) +dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; struct stat statbuf; char tmppath[MAXPGPATH]; @@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname) if (stat(tmppath, &statbuf) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return -1; } return statbuf.st_size; } -static pg_compress_algorithm -dir_compression_algorithm(void) -{ - return dir_data->compression_algorithm; -} - static bool -dir_existsfile(const char *pathname) +dir_existsfile(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; int fd; - dir_clear_error(); + clear_error(wwmethod); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, pathname); @@ -589,60 +601,54 @@ dir_existsfile(const char *pathname) } static bool -dir_finish(void) +dir_finish(WalWriteMethod *wwmethod) { - dir_clear_error(); + clear_error(wwmethod); - if (dir_data->sync) + if (wwmethod->sync) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + /* * Files are fsynced when they are closed, but we need to fsync the * directory entry here as well. */ if (fsync_fname(dir_data->basedir, true) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } return true; } +static void +dir_free(WalWriteMethod *wwmethod) +{ + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + + pg_free(dir_data->basedir); + pg_free(wwmethod); +} + WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; + DirectoryMethodData *wwmethod; - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = dir_open_for_write; - method->write = dir_write; - method->get_file_size = dir_get_file_size; - method->get_file_name = dir_get_file_name; - method->compression_algorithm = dir_compression_algorithm; - method->close = dir_close; - method->sync = dir_sync; - method->existsfile = dir_existsfile; - method->finish = dir_finish; - method->getlasterror = dir_getlasterror; + wwmethod = pg_malloc0(sizeof(DirectoryMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalDirectoryMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); + wwmethod->basedir = pg_strdup(basedir); - dir_data = pg_malloc0(sizeof(DirectoryMethodData)); - dir_data->compression_algorithm = compression_algorithm; - dir_data->compression_level = compression_level; - dir_data->basedir = pg_strdup(basedir); - dir_data->sync = sync; - - return method; -} - -void -FreeWalDirectoryMethod(void) -{ - pg_free(dir_data->basedir); - pg_free(dir_data); - dir_data = NULL; + return &wwmethod->base; } @@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void) *------------------------------------------------------------------------- */ +static Walfile *tar_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int tar_close(Walfile *f, WalCloseMethod method); +static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *tar_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t tar_write(Walfile *f, const void *buf, size_t count); +static int tar_sync(Walfile *f); +static bool tar_finish(WalWriteMethod *wwmethod); +static void tar_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalTarMethodOps = { + .open_for_write = tar_open_for_write, + .close = tar_close, + .existsfile = tar_existsfile, + .get_file_size = tar_get_file_size, + .get_file_name = tar_get_file_name, + .write = tar_write, + .sync = tar_sync, + .finish = tar_finish, + .free = tar_free +}; + typedef struct TarMethodFile { Walfile base; @@ -661,37 +694,20 @@ typedef struct TarMethodFile typedef struct TarMethodData { + WalWriteMethod base; char *tarfilename; int fd; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; TarMethodFile *currentfile; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; #ifdef HAVE_LIBZ z_streamp zp; void *zlibOut; #endif } TarMethodData; -static TarMethodData *tar_data = NULL; - -#define tar_clear_error() \ - (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0) -#define tar_set_error(msg) \ - (tar_data->lasterrstring = _(msg)) - -static const char * -tar_getlasterror(void) -{ - if (tar_data->lasterrstring) - return tar_data->lasterrstring; - return strerror(tar_data->lasterrno); -} #ifdef HAVE_LIBZ static bool -tar_write_compressed_data(void *buf, size_t count, bool flush) +tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count, + bool flush) { tar_data->zp->next_in = buf; tar_data->zp->avail_in = count; @@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH); if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + tar_data->base.lasterrstring = "could not compress data"; return false; } @@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) if (write(tar_data->fd, tar_data->zlibOut, len) != len) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + tar_data->base.lasterrno = errno ? errno : ENOSPC; return false; } @@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) /* Reset the stream for writing */ if (deflateReset(tar_data->zp) != Z_OK) { - tar_set_error("could not reset compression stream"); + tar_data->base.lasterrstring = "could not reset compression stream"; return false; } } @@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) static ssize_t tar_write(Walfile *f, const void *buf, size_t count) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; ssize_t r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); /* Tarfile will always be positioned at the end */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; r = write(tar_data->fd, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } f->currpos += r; return r; } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(unconstify(void *, buf), count, false)) + if (!tar_write_compressed_data(tar_data, unconstify(void *, buf), + count, false)) return -1; f->currpos += count; return count; @@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count) else { /* Can't happen - compression enabled with no method set */ - tar_data->lasterrno = ENOSYS; + f->wwmethod->lasterrno = ENOSYS; return -1; } } @@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes) } static char * -tar_get_file_name(const char *pathname, const char *temp_suffix) +tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); @@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix) } static Walfile * -tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char *tmppath; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->fd < 0) { @@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_file_create_mode); if (tar_data->fd < 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream)); tar_data->zp->zalloc = Z_NULL; @@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * default 15 for the windowBits parameter makes the output be * gzip instead of zlib. */ - if (deflateInit2(tar_data->zp, tar_data->compression_level, + if (deflateInit2(tar_data->zp, wwmethod->compression_level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { pg_free(tar_data->zp); tar_data->zp = NULL; - tar_set_error("could not initialize compression library"); + wwmethod->lasterrstring = + "could not initialize compression library"; return NULL; } } @@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (tar_data->currentfile != NULL) { - tar_set_error("implementation error: tar files can't have more than one open file"); + wwmethod->lasterrstring = + "implementation error: tar files can't have more than one open file"; return NULL; } tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile)); + tar_data->currentfile->base.wwmethod = wwmethod; - tmppath = tar_get_file_name(pathname, temp_suffix); + tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix); /* Create a header with size set to 0 - we will fill out the size on close */ if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK) @@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_free(tar_data->currentfile); pg_free(tmppath); tar_data->currentfile = NULL; - tar_set_error("could not create tar header"); + wwmethod->lasterrstring = "could not create tar header"; return NULL; } pg_free(tmppath); #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush existing data */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return NULL; /* Turn off compression for header */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = + "could not change compression parameters"; return NULL; } } @@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR); if (tar_data->currentfile->ofs_start == -1) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } tar_data->currentfile->base.currpos = 0; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tar_data->currentfile->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Write header through the zlib APIs but with no compression */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return NULL; /* Re-enable compression for the rest of the file */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = "could not change compression parameters"; return NULL; } } @@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (pad_to_size) { tar_data->currentfile->pad_to_size = pad_to_size; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { /* Uncompressed, so pad now */ if (!tar_write_padding_data(tar_data->currentfile, pad_to_size)) @@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE, SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } @@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } static ssize_t -tar_get_file_size(const char *pathname) +tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* Currently not used, so not supported */ - tar_data->lasterrno = ENOSYS; + wwmethod->lasterrno = ENOSYS; return -1; } -static pg_compress_algorithm -tar_compression_algorithm(void) -{ - return tar_data->compression_algorithm; -} - static int tar_sync(Walfile *f) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; int r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); - if (!tar_data->sync) + if (!f->wwmethod->sync) return 0; /* * Always sync the whole tarfile, because that's all we can do. This makes * no sense on compressed files, so just ignore those. */ - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) return 0; r = fsync(tar_data->fd); if (r < 0) - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } @@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method) { ssize_t filesize; int padding; + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; TarMethodFile *tf = (TarMethodFile *) f; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); if (method == CLOSE_UNLINK) { - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) { - tar_set_error("unlink not supported with compression"); + f->wwmethod->lasterrstring = "unlink not supported with compression"; return -1; } @@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (ftruncate(tar_data->fd, tf->ofs_start) != 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (tf->pad_to_size) { - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* * A compressed tarfile is padded on close since we cannot know @@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method) #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush the current buffer */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return -1; } #endif @@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method) print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Turn off compression */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } /* Overwrite the header, assuming the size will be the same */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return -1; /* Turn compression back on */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, f->wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } } @@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method) /* Move file pointer back down to end, so we can write the next file */ if (lseek(tar_data->fd, 0, SEEK_END) < 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method) { /* XXX this seems pretty bogus; why is only this case fatal? */ pg_fatal("could not fsync file \"%s\": %s", - tf->base.pathname, tar_getlasterror()); + tf->base.pathname, GetLastWalMethodError(f->wwmethod)); } /* Clean up and done */ @@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method) } static bool -tar_existsfile(const char *pathname) +tar_existsfile(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* We only deal with new tarfiles, so nothing externally created exists */ return false; } static bool -tar_finish(void) +tar_finish(WalWriteMethod *wwmethod) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char zerobuf[1024] = {0}; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->currentfile) { @@ -1212,20 +1234,21 @@ tar_finish(void) } /* A tarfile always ends with two empty blocks */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false)) + if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf), + false)) return false; /* Also flush all data to make sure the gzip stream is finished */ @@ -1239,7 +1262,7 @@ tar_finish(void) if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + wwmethod->lasterrstring = "could not compress data"; return false; } if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) @@ -1253,7 +1276,7 @@ tar_finish(void) * If write didn't set errno, assume problem is no disk * space. */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } @@ -1263,7 +1286,7 @@ tar_finish(void) if (deflateEnd(tar_data->zp) != Z_OK) { - tar_set_error("could not close compression stream"); + wwmethod->lasterrstring = "could not close compression stream"; return false; } } @@ -1275,29 +1298,29 @@ tar_finish(void) } /* sync the empty blocks as well, since they're after the last file */ - if (tar_data->sync) + if (wwmethod->sync) { if (fsync(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } if (close(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } tar_data->fd = -1; - if (tar_data->sync) + if (wwmethod->sync) { if (fsync_fname(tar_data->tarfilename, false) != 0 || fsync_parent_path(tar_data->tarfilename) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } @@ -1305,6 +1328,19 @@ tar_finish(void) return true; } +static void +tar_free(WalWriteMethod *wwmethod) +{ + TarMethodData *tar_data = (TarMethodData *) wwmethod; + + pg_free(tar_data->tarfilename); +#ifdef HAVE_LIBZ + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) + pg_free(tar_data->zlibOut); +#endif + pg_free(wwmethod); +} + /* * The argument compression_algorithm is currently ignored. It is in place for * symmetry with CreateWalDirectoryMethod which uses it for distinguishing @@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; + TarMethodData *wwmethod; const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ? ".tar.gz" : ".tar"; - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = tar_open_for_write; - method->write = tar_write; - method->get_file_size = tar_get_file_size; - method->get_file_name = tar_get_file_name; - method->compression_algorithm = tar_compression_algorithm; - method->close = tar_close; - method->sync = tar_sync; - method->existsfile = tar_existsfile; - method->finish = tar_finish; - method->getlasterror = tar_getlasterror; + wwmethod = pg_malloc0(sizeof(TarMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalTarMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); - tar_data = pg_malloc0(sizeof(TarMethodData)); - tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); - sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix); - tar_data->fd = -1; - tar_data->compression_algorithm = compression_algorithm; - tar_data->compression_level = compression_level; - tar_data->sync = sync; + wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); + sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix); + wwmethod->fd = -1; #ifdef HAVE_LIBZ if (compression_algorithm == PG_COMPRESSION_GZIP) - tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); #endif - return method; + return &wwmethod->base; } -void -FreeWalTarMethod(void) +const char * +GetLastWalMethodError(WalWriteMethod *wwmethod) { - pg_free(tar_data->tarfilename); -#ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) - pg_free(tar_data->zlibOut); -#endif - pg_free(tar_data); - tar_data = NULL; + if (wwmethod->lasterrstring) + return wwmethod->lasterrstring; + return strerror(wwmethod->lasterrno); } diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index cf5ed87fbe..0d7728d086 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -16,6 +16,7 @@ typedef struct WalWriteMethod WalWriteMethod; typedef struct { + WalWriteMethod *wwmethod; off_t currpos; char *pathname; /* @@ -34,16 +35,9 @@ typedef enum } WalCloseMethod; /* - * A WalWriteMethod structure represents the different methods used - * to write the streaming WAL as it's received. - * - * All methods that have a failure return indicator will set state - * allowing the getlasterror() method to return a suitable message. - * Commonly, errno is this state (or part of it); so callers must take - * care not to clobber errno between a failed method call and use of - * getlasterror() to retrieve the message. + * Table of callbacks for a WalWriteMethod. */ -struct WalWriteMethod +typedef struct WalWriteMethodOps { /* * Open a target file. Returns Walfile, or NULL if open failed. If a temp @@ -51,7 +45,7 @@ struct WalWriteMethod * automatically renamed in close(). If pad_to_size is specified, the file * will be padded with NUL up to that size, if supported by the Walmethod. */ - Walfile *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); + Walfile *(*open_for_write) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size); /* * Close an open Walfile, using one or more methods for handling automatic @@ -60,19 +54,16 @@ struct WalWriteMethod int (*close) (Walfile *f, WalCloseMethod method); /* Check if a file exist */ - bool (*existsfile) (const char *pathname); + bool (*existsfile) (WalWriteMethod *wwmethod, const char *pathname); /* Return the size of a file, or -1 on failure. */ - ssize_t (*get_file_size) (const char *pathname); + ssize_t (*get_file_size) (WalWriteMethod *wwmethod, const char *pathname); /* * Return the name of the current file to work on in pg_malloc()'d string, * without the base directory. This is useful for logging. */ - char *(*get_file_name) (const char *pathname, const char *temp_suffix); - - /* Returns the compression method */ - pg_compress_algorithm (*compression_algorithm) (void); + char *(*get_file_name) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix); /* * Write count number of bytes to the file, and return the number of bytes @@ -91,10 +82,37 @@ struct WalWriteMethod * close/write/sync of shared resources succeeded, otherwise returns false * (but the resources are still closed). */ - bool (*finish) (void); + bool (*finish) (WalWriteMethod *wwmethod); - /* Return a text for the last error in this Walfile */ - const char *(*getlasterror) (void); + /* + * Free subsidiary data associated with the WalWriteMethod, and the + * WalWriteMethod itself. + */ + void (*free) (WalWriteMethod *wwmethod); +} WalWriteMethodOps; + +/* + * A WalWriteMethod structure represents a way of writing streaming WAL as + * it's received. + * + * All methods that have a failure return indicator will set lasterrstring + * or lasterrno (the former takes precedence) so that the caller can signal + * a suitable error. + */ +struct WalWriteMethod +{ + const WalWriteMethodOps *ops; + pg_compress_algorithm compression_algorithm; + int compression_level; + bool sync; + const char *lasterrstring; /* if set, takes precedence over lasterrno */ + int lasterrno; + /* + * MORE DATA FOLLOWS AT END OF STRUCT + * + * Each WalWriteMethod is expected to embed this as the first member of + * a larger struct with method-specific fields following. + */ }; /* @@ -111,6 +129,4 @@ WalWriteMethod *CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algo, int compression, bool sync); -/* Cleanup routines for previously-created methods */ -void FreeWalDirectoryMethod(void); -void FreeWalTarMethod(void); +const char *GetLastWalMethodError(WalWriteMethod *wwmethod);