From ebfb814f7ce0d5ab6f47f0b86db51a1b8f3342f4 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 19 Sep 2022 12:53:46 -0400 Subject: [PATCH] walmethods.c/h: Make WalWriteMethod more object-oriented. Normally when we use object-oriented programming techniques, we provide a pointer to an object and then some way of looking up the associated table of callbacks, but walmethods.c/h took the alternative approach of providing only a pointer to the table of callbacks and thus imposed the artificial restriction that there could only ever be one object of each type, so that the callbacks could find it via a global variable. That doesn't seem like the right idea, so revise the approach. Each callback which does not already have an argument of type Walfile * now takes a pointer to the relevant WalWriteMethod * so that these callbacks need not rely on there being only one object of each type. Freeing a WalWriteMethod is now performed via a callback provided for that purpose rather than requiring the caller to know which WAL method they want to free. Discussion: http://postgr.es/m/CA+TgmoZS0Kw98fOoAcGz8B9iDhdqB4Be4e=vDZaJZ5A-xMYBqA@mail.gmail.com --- src/bin/pg_basebackup/pg_basebackup.c | 8 +- src/bin/pg_basebackup/pg_receivewal.c | 6 +- src/bin/pg_basebackup/receivelog.c | 89 ++--- src/bin/pg_basebackup/walmethods.c | 504 ++++++++++++++------------ src/bin/pg_basebackup/walmethods.h | 60 +-- 5 files changed, 354 insertions(+), 313 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a830b199f5..876d20611b 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -570,7 +570,7 @@ LogStreamerMain(logstreamer_param *param) return 1; } - if (!stream.walmethod->finish()) + if (!stream.walmethod->ops->finish(stream.walmethod)) { pg_log_error("could not finish writing WAL files: %m"); #ifdef WIN32 @@ -581,11 +581,7 @@ LogStreamerMain(logstreamer_param *param) PQfinish(param->bgconn); - if (format == 'p') - FreeWalDirectoryMethod(); - else - FreeWalTarMethod(); - pg_free(stream.walmethod); + stream.walmethod->ops->free(stream.walmethod); return 0; } diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 5c22c914bc..a7180e2955 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -658,7 +658,7 @@ StreamLog(void) ReceiveXlogStream(conn, &stream); - if (!stream.walmethod->finish()) + if (!stream.walmethod->ops->finish(stream.walmethod)) { pg_log_info("could not finish writing WAL files: %m"); return; @@ -667,9 +667,7 @@ StreamLog(void) PQfinish(conn); conn = NULL; - FreeWalDirectoryMethod(); - pg_free(stream.walmethod); - pg_free(stream.sysidentifier); + stream.walmethod->ops->free(stream.walmethod); } /* diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index a619176511..9c71323d70 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -59,18 +59,19 @@ mark_file_as_archived(StreamCtl *stream, const char *fname) snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done", fname); - f = stream->walmethod->open_for_write(tmppath, NULL, 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath, + NULL, 0); if (f == NULL) { pg_log_error("could not create archive status file \"%s\": %s", - tmppath, stream->walmethod->getlasterror()); + tmppath, GetLastWalMethodError(stream->walmethod)); return false; } - if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) + if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0) { pg_log_error("could not close archive status file \"%s\": %s", - tmppath, stream->walmethod->getlasterror()); + tmppath, GetLastWalMethodError(stream->walmethod)); return false; } @@ -98,8 +99,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) XLogFileName(walfile_name, stream->timeline, segno, WalSegSz); /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(walfile_name, - stream->partial_suffix); + fn = stream->walmethod->ops->get_file_name(stream->walmethod, + walfile_name, + stream->partial_suffix); /* * When streaming to files, if an existing file exists we verify that it's @@ -111,35 +113,35 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) * When streaming to tar, no file with this name will exist before, so we * never have to verify a size. */ - if (stream->walmethod->compression_algorithm() == PG_COMPRESSION_NONE && - stream->walmethod->existsfile(fn)) + if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE && + stream->walmethod->ops->existsfile(stream->walmethod, fn)) { - size = stream->walmethod->get_file_size(fn); + size = stream->walmethod->ops->get_file_size(stream->walmethod, fn); if (size < 0) { pg_log_error("could not get size of write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } if (size == WalSegSz) { /* Already padded file. Open it for use */ - f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0); if (f == NULL) { pg_log_error("could not open existing write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } /* fsync file in case of a previous crash */ - if (stream->walmethod->sync(f) != 0) + if (stream->walmethod->ops->sync(f) != 0) { pg_log_error("could not fsync existing write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); - stream->walmethod->close(f, CLOSE_UNLINK); + fn, GetLastWalMethodError(stream->walmethod)); + stream->walmethod->ops->close(f, CLOSE_UNLINK); exit(1); } @@ -164,12 +166,14 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) /* No file existed, so create one */ - f = stream->walmethod->open_for_write(walfile_name, - stream->partial_suffix, WalSegSz); + f = stream->walmethod->ops->open_for_write(stream->walmethod, + walfile_name, + stream->partial_suffix, + WalSegSz); if (f == NULL) { pg_log_error("could not open write-ahead log file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; } @@ -199,28 +203,29 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) currpos = walfile->currpos; /* Note that this considers the compression used if necessary */ - fn = stream->walmethod->get_file_name(walfile_name, - stream->partial_suffix); + fn = stream->walmethod->ops->get_file_name(stream->walmethod, + walfile_name, + stream->partial_suffix); if (stream->partial_suffix) { if (currpos == WalSegSz) - r = stream->walmethod->close(walfile, CLOSE_NORMAL); + r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL); else { pg_log_info("not renaming \"%s\", segment is not complete", fn); - r = stream->walmethod->close(walfile, CLOSE_NO_RENAME); + r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME); } } else - r = stream->walmethod->close(walfile, CLOSE_NORMAL); + r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL); walfile = NULL; if (r != 0) { pg_log_error("could not close file \"%s\": %s", - fn, stream->walmethod->getlasterror()); + fn, GetLastWalMethodError(stream->walmethod)); pg_free(fn); return false; @@ -263,7 +268,7 @@ existsTimeLineHistoryFile(StreamCtl *stream) TLHistoryFileName(histfname, stream->timeline); - return stream->walmethod->existsfile(histfname); + return stream->walmethod->ops->existsfile(stream->walmethod, histfname); } static bool @@ -285,31 +290,32 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) return false; } - f = stream->walmethod->open_for_write(histfname, ".tmp", 0); + f = stream->walmethod->ops->open_for_write(stream->walmethod, + histfname, ".tmp", 0); if (f == NULL) { pg_log_error("could not create timeline history file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); return false; } - if ((int) stream->walmethod->write(f, content, size) != size) + if ((int) stream->walmethod->ops->write(f, content, size) != size) { pg_log_error("could not write timeline history file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); /* * If we fail to make the file, delete it to release disk space */ - stream->walmethod->close(f, CLOSE_UNLINK); + stream->walmethod->ops->close(f, CLOSE_UNLINK); return false; } - if (stream->walmethod->close(f, CLOSE_NORMAL) != 0) + if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0) { pg_log_error("could not close file \"%s\": %s", - histfname, stream->walmethod->getlasterror()); + histfname, GetLastWalMethodError(stream->walmethod)); return false; } @@ -678,9 +684,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } error: - if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0) + if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0) pg_log_error("could not close file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); walfile = NULL; return false; } @@ -765,9 +771,9 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, */ if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL) { - if (stream->walmethod->sync(walfile) != 0) + if (stream->walmethod->ops->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); lastFlushPosition = blockpos; /* @@ -1012,9 +1018,9 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * data has been successfully replicated or not, at the normal * shutdown of the server. */ - if (stream->walmethod->sync(walfile) != 0) + if (stream->walmethod->ops->sync(walfile) != 0) pg_fatal("could not fsync file \"%s\": %s", - walfile->pathname, stream->walmethod->getlasterror()); + walfile->pathname, GetLastWalMethodError(stream->walmethod)); lastFlushPosition = blockpos; } @@ -1115,12 +1121,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } } - if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + if (stream->walmethod->ops->write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) { pg_log_error("could not write %d bytes to WAL file \"%s\": %s", bytes_to_write, walfile->pathname, - stream->walmethod->getlasterror()); + GetLastWalMethodError(stream->walmethod)); return false; } diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index d98a2681b9..bc2e83d02b 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -2,9 +2,6 @@ * * walmethods.c - implementations of different ways to write received wal * - * NOTE! The caller must ensure that only one method is instantiated in - * any given program, and that it's only instantiated once! - * * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group * * IDENTIFICATION @@ -43,19 +40,41 @@ *------------------------------------------------------------------------- */ +static Walfile *dir_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int dir_close(Walfile *f, WalCloseMethod method); +static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t dir_write(Walfile *f, const void *buf, size_t count); +static int dir_sync(Walfile *f); +static bool dir_finish(WalWriteMethod *wwmethod); +static void dir_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalDirectoryMethodOps = { + .open_for_write = dir_open_for_write, + .close = dir_close, + .existsfile = dir_existsfile, + .get_file_size = dir_get_file_size, + .get_file_name = dir_get_file_name, + .write = dir_write, + .sync = dir_sync, + .finish = dir_finish, + .free = dir_free +}; + /* * Global static data for this method */ typedef struct DirectoryMethodData { + WalWriteMethod base; char *basedir; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; } DirectoryMethodData; -static DirectoryMethodData *dir_data = NULL; /* * Local file handle @@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile #endif } DirectoryMethodFile; -#define dir_clear_error() \ - (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0) -#define dir_set_error(msg) \ - (dir_data->lasterrstring = _(msg)) - -static const char * -dir_getlasterror(void) -{ - if (dir_data->lasterrstring) - return dir_data->lasterrstring; - return strerror(dir_data->lasterrno); -} +#define clear_error(wwmethod) \ + ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0) static char * -dir_get_file_name(const char *pathname, const char *temp_suffix) +dir_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); snprintf(filename, MAXPGPATH, "%s%s%s", pathname, - dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : - dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", + wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" : + wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "", temp_suffix ? temp_suffix : ""); return filename; } static Walfile * -dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; char *filename; int fd; @@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ void *lz4buf = NULL; #endif - dir_clear_error(); + clear_error(wwmethod); - filename = dir_get_file_name(pathname, temp_suffix); + filename = dir_get_file_name(wwmethod, pathname, temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode); if (fd < 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { gzfp = gzdopen(fd, "wb"); if (gzfp == NULL) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } - if (gzsetparams(gzfp, dir_data->compression_level, + if (gzsetparams(gzfp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; gzclose(gzfp); return NULL; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t ctx_out; size_t header_size; @@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(ctx_out)) { - dir_data->lasterrstring = LZ4F_getErrorName(ctx_out); + wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out); close(fd); return NULL; } @@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ /* assign the compression level, default is 0 */ memset(&prefs, 0, sizeof(prefs)); - prefs.compressionLevel = dir_data->compression_level; + prefs.compressionLevel = wwmethod->compression_level; /* add the header */ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs); if (LZ4F_isError(header_size)) { - dir_data->lasterrstring = LZ4F_getErrorName(header_size); + wwmethod->lasterrstring = LZ4F_getErrorName(header_size); (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, lz4buf, header_size) != header_size) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; (void) LZ4F_freeCompressionContext(ctx); pg_free(lz4buf); close(fd); @@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #endif /* Do pre-padding on non-compressed files */ - if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE) + if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { PGAlignedXLogBlock zerobuf; int bytes; @@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; close(fd); return NULL; } @@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (lseek(fd, 0, SEEK_SET) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; close(fd); return NULL; } @@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * important when using synchronous mode, where the file is modified and * fsynced in-place, without a directory fsync. */ - if (dir_data->sync) + if (wwmethod->sync) { if (fsync_fname(tmppath, false) != 0 || fsync_parent_path(tmppath) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) gzclose(gzfp); else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); (void) LZ4F_freeCompressionContext(ctx); @@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) f->gzfp = gzfp; #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { f->ctx = ctx; f->lz4buf = lz4buf; @@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } #endif + f->base.wwmethod = wwmethod; f->base.currpos = 0; f->base.pathname = pg_strdup(pathname); f->fd = fd; @@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count) DirectoryMethodFile *df = (DirectoryMethodFile *) f; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; r = (ssize_t) gzwrite(df->gzfp, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t chunk; size_t remaining; @@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count) if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; } } if (r > 0) @@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method) { int r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; + DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod; char tmppath[MAXPGPATH]; char tmppath2[MAXPGPATH]; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { errno = 0; /* in case gzclose() doesn't set it */ r = gzclose(df->gzfp); @@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method) else #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { size_t compressed; @@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } @@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method) * If we have a temp prefix, normal operation is to rename the * file. */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); /* permanent name, so no need for the prefix */ - filename2 = dir_get_file_name(df->base.pathname, NULL); + filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL); snprintf(tmppath2, sizeof(tmppath2), "%s/%s", dir_data->basedir, filename2); pg_free(filename2); - if (dir_data->sync) + if (f->wwmethod->sync) r = durable_rename(tmppath, tmppath2); else { @@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method) char *filename; /* Unlink the file once it's closed */ - filename = dir_get_file_name(df->base.pathname, df->temp_suffix); + filename = dir_get_file_name(f->wwmethod, df->base.pathname, + df->temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, filename); pg_free(filename); @@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method) * CLOSE_NO_RENAME. In this case, fsync the file and containing * directory if sync mode is requested. */ - if (dir_data->sync) + if (f->wwmethod->sync) { r = fsync_fname(df->fullpath, false); if (r == 0) @@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method) } if (r != 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; #ifdef USE_LZ4 pg_free(df->lz4buf); @@ -501,23 +517,23 @@ dir_sync(Walfile *f) int r; Assert(f != NULL); - dir_clear_error(); + clear_error(f->wwmethod); - if (!dir_data->sync) + if (!f->wwmethod->sync) return 0; #ifdef HAVE_LIBZ - if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) { - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } } #endif #ifdef USE_LZ4 - if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4) { DirectoryMethodFile *df = (DirectoryMethodFile *) f; size_t compressed; @@ -526,7 +542,7 @@ dir_sync(Walfile *f) compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL); if (LZ4F_isError(compressed)) { - dir_data->lasterrstring = LZ4F_getErrorName(compressed); + f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed); return -1; } @@ -534,7 +550,7 @@ dir_sync(Walfile *f) if (write(df->fd, df->lz4buf, compressed) != compressed) { /* If write didn't set errno, assume problem is no disk space */ - dir_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } @@ -542,13 +558,14 @@ dir_sync(Walfile *f) r = fsync(((DirectoryMethodFile *) f)->fd); if (r < 0) - dir_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } static ssize_t -dir_get_file_size(const char *pathname) +dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; struct stat statbuf; char tmppath[MAXPGPATH]; @@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname) if (stat(tmppath, &statbuf) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return -1; } return statbuf.st_size; } -static pg_compress_algorithm -dir_compression_algorithm(void) -{ - return dir_data->compression_algorithm; -} - static bool -dir_existsfile(const char *pathname) +dir_existsfile(WalWriteMethod *wwmethod, const char *pathname) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; char tmppath[MAXPGPATH]; int fd; - dir_clear_error(); + clear_error(wwmethod); snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, pathname); @@ -589,60 +601,54 @@ dir_existsfile(const char *pathname) } static bool -dir_finish(void) +dir_finish(WalWriteMethod *wwmethod) { - dir_clear_error(); + clear_error(wwmethod); - if (dir_data->sync) + if (wwmethod->sync) { + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + /* * Files are fsynced when they are closed, but we need to fsync the * directory entry here as well. */ if (fsync_fname(dir_data->basedir, true) != 0) { - dir_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } return true; } +static void +dir_free(WalWriteMethod *wwmethod) +{ + DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod; + + pg_free(dir_data->basedir); + pg_free(wwmethod); +} + WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; + DirectoryMethodData *wwmethod; - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = dir_open_for_write; - method->write = dir_write; - method->get_file_size = dir_get_file_size; - method->get_file_name = dir_get_file_name; - method->compression_algorithm = dir_compression_algorithm; - method->close = dir_close; - method->sync = dir_sync; - method->existsfile = dir_existsfile; - method->finish = dir_finish; - method->getlasterror = dir_getlasterror; + wwmethod = pg_malloc0(sizeof(DirectoryMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalDirectoryMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); + wwmethod->basedir = pg_strdup(basedir); - dir_data = pg_malloc0(sizeof(DirectoryMethodData)); - dir_data->compression_algorithm = compression_algorithm; - dir_data->compression_level = compression_level; - dir_data->basedir = pg_strdup(basedir); - dir_data->sync = sync; - - return method; -} - -void -FreeWalDirectoryMethod(void) -{ - pg_free(dir_data->basedir); - pg_free(dir_data); - dir_data = NULL; + return &wwmethod->base; } @@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void) *------------------------------------------------------------------------- */ +static Walfile *tar_open_for_write(WalWriteMethod *wwmethod, + const char *pathname, + const char *temp_suffix, + size_t pad_to_size); +static int tar_close(Walfile *f, WalCloseMethod method); +static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname); +static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, + const char *pathname); +static char *tar_get_file_name(WalWriteMethod *wwmethod, + const char *pathname, const char *temp_suffix); +static ssize_t tar_write(Walfile *f, const void *buf, size_t count); +static int tar_sync(Walfile *f); +static bool tar_finish(WalWriteMethod *wwmethod); +static void tar_free(WalWriteMethod *wwmethod); + +const WalWriteMethodOps WalTarMethodOps = { + .open_for_write = tar_open_for_write, + .close = tar_close, + .existsfile = tar_existsfile, + .get_file_size = tar_get_file_size, + .get_file_name = tar_get_file_name, + .write = tar_write, + .sync = tar_sync, + .finish = tar_finish, + .free = tar_free +}; + typedef struct TarMethodFile { Walfile base; @@ -661,37 +694,20 @@ typedef struct TarMethodFile typedef struct TarMethodData { + WalWriteMethod base; char *tarfilename; int fd; - pg_compress_algorithm compression_algorithm; - int compression_level; - bool sync; TarMethodFile *currentfile; - const char *lasterrstring; /* if set, takes precedence over lasterrno */ - int lasterrno; #ifdef HAVE_LIBZ z_streamp zp; void *zlibOut; #endif } TarMethodData; -static TarMethodData *tar_data = NULL; - -#define tar_clear_error() \ - (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0) -#define tar_set_error(msg) \ - (tar_data->lasterrstring = _(msg)) - -static const char * -tar_getlasterror(void) -{ - if (tar_data->lasterrstring) - return tar_data->lasterrstring; - return strerror(tar_data->lasterrno); -} #ifdef HAVE_LIBZ static bool -tar_write_compressed_data(void *buf, size_t count, bool flush) +tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count, + bool flush) { tar_data->zp->next_in = buf; tar_data->zp->avail_in = count; @@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH); if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + tar_data->base.lasterrstring = "could not compress data"; return false; } @@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) if (write(tar_data->fd, tar_data->zlibOut, len) != len) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + tar_data->base.lasterrno = errno ? errno : ENOSPC; return false; } @@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) /* Reset the stream for writing */ if (deflateReset(tar_data->zp) != Z_OK) { - tar_set_error("could not reset compression stream"); + tar_data->base.lasterrstring = "could not reset compression stream"; return false; } } @@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush) static ssize_t tar_write(Walfile *f, const void *buf, size_t count) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; ssize_t r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); /* Tarfile will always be positioned at the end */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; r = write(tar_data->fd, buf, count); if (r != count) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } f->currpos += r; return r; } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(unconstify(void *, buf), count, false)) + if (!tar_write_compressed_data(tar_data, unconstify(void *, buf), + count, false)) return -1; f->currpos += count; return count; @@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count) else { /* Can't happen - compression enabled with no method set */ - tar_data->lasterrno = ENOSYS; + f->wwmethod->lasterrno = ENOSYS; return -1; } } @@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes) } static char * -tar_get_file_name(const char *pathname, const char *temp_suffix) +tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix) { char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); @@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix) } static Walfile * -tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, + const char *temp_suffix, size_t pad_to_size) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char *tmppath; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->fd < 0) { @@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_file_create_mode); if (tar_data->fd < 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream)); tar_data->zp->zalloc = Z_NULL; @@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * default 15 for the windowBits parameter makes the output be * gzip instead of zlib. */ - if (deflateInit2(tar_data->zp, tar_data->compression_level, + if (deflateInit2(tar_data->zp, wwmethod->compression_level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { pg_free(tar_data->zp); tar_data->zp = NULL; - tar_set_error("could not initialize compression library"); + wwmethod->lasterrstring = + "could not initialize compression library"; return NULL; } } @@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (tar_data->currentfile != NULL) { - tar_set_error("implementation error: tar files can't have more than one open file"); + wwmethod->lasterrstring = + "implementation error: tar files can't have more than one open file"; return NULL; } tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile)); + tar_data->currentfile->base.wwmethod = wwmethod; - tmppath = tar_get_file_name(pathname, temp_suffix); + tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix); /* Create a header with size set to 0 - we will fill out the size on close */ if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK) @@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ pg_free(tar_data->currentfile); pg_free(tmppath); tar_data->currentfile = NULL; - tar_set_error("could not create tar header"); + wwmethod->lasterrstring = "could not create tar header"; return NULL; } pg_free(tmppath); #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush existing data */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return NULL; /* Turn off compression for header */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = + "could not change compression parameters"; return NULL; } } @@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR); if (tar_data->currentfile->ofs_start == -1) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } tar_data->currentfile->base.currpos = 0; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tar_data->currentfile->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; pg_free(tar_data->currentfile); tar_data->currentfile = NULL; return NULL; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Write header through the zlib APIs but with no compression */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return NULL; /* Re-enable compression for the rest of the file */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + wwmethod->lasterrstring = "could not change compression parameters"; return NULL; } } @@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (pad_to_size) { tar_data->currentfile->pad_to_size = pad_to_size; - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { /* Uncompressed, so pad now */ if (!tar_write_padding_data(tar_data->currentfile, pad_to_size)) @@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE, SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return NULL; } @@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } static ssize_t -tar_get_file_size(const char *pathname) +tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* Currently not used, so not supported */ - tar_data->lasterrno = ENOSYS; + wwmethod->lasterrno = ENOSYS; return -1; } -static pg_compress_algorithm -tar_compression_algorithm(void) -{ - return tar_data->compression_algorithm; -} - static int tar_sync(Walfile *f) { + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; int r; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); - if (!tar_data->sync) + if (!f->wwmethod->sync) return 0; /* * Always sync the whole tarfile, because that's all we can do. This makes * no sense on compressed files, so just ignore those. */ - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) return 0; r = fsync(tar_data->fd); if (r < 0) - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return r; } @@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method) { ssize_t filesize; int padding; + TarMethodData *tar_data = (TarMethodData *) f->wwmethod; TarMethodFile *tf = (TarMethodFile *) f; Assert(f != NULL); - tar_clear_error(); + clear_error(f->wwmethod); if (method == CLOSE_UNLINK) { - if (tar_data->compression_algorithm != PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE) { - tar_set_error("unlink not supported with compression"); + f->wwmethod->lasterrstring = "unlink not supported with compression"; return -1; } @@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (ftruncate(tar_data->fd, tf->ofs_start) != 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method) */ if (tf->pad_to_size) { - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* * A compressed tarfile is padded on close since we cannot know @@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method) #ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Flush the current buffer */ - if (!tar_write_compressed_data(NULL, 0, true)) + if (!tar_write_compressed_data(tar_data, NULL, 0, true)) return -1; } #endif @@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method) print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + f->wwmethod->lasterrno = errno ? errno : ENOSPC; return -1; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { /* Turn off compression */ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } /* Overwrite the header, assuming the size will be the same */ - if (!tar_write_compressed_data(tar_data->currentfile->header, + if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header, TAR_BLOCK_SIZE, true)) return -1; /* Turn compression back on */ - if (deflateParams(tar_data->zp, tar_data->compression_level, + if (deflateParams(tar_data->zp, f->wwmethod->compression_level, Z_DEFAULT_STRATEGY) != Z_OK) { - tar_set_error("could not change compression parameters"); + f->wwmethod->lasterrstring = "could not change compression parameters"; return -1; } } @@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method) /* Move file pointer back down to end, so we can write the next file */ if (lseek(tar_data->fd, 0, SEEK_END) < 0) { - tar_data->lasterrno = errno; + f->wwmethod->lasterrno = errno; return -1; } @@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method) { /* XXX this seems pretty bogus; why is only this case fatal? */ pg_fatal("could not fsync file \"%s\": %s", - tf->base.pathname, tar_getlasterror()); + tf->base.pathname, GetLastWalMethodError(f->wwmethod)); } /* Clean up and done */ @@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method) } static bool -tar_existsfile(const char *pathname) +tar_existsfile(WalWriteMethod *wwmethod, const char *pathname) { - tar_clear_error(); + clear_error(wwmethod); /* We only deal with new tarfiles, so nothing externally created exists */ return false; } static bool -tar_finish(void) +tar_finish(WalWriteMethod *wwmethod) { + TarMethodData *tar_data = (TarMethodData *) wwmethod; char zerobuf[1024] = {0}; - tar_clear_error(); + clear_error(wwmethod); if (tar_data->currentfile) { @@ -1212,20 +1234,21 @@ tar_finish(void) } /* A tarfile always ends with two empty blocks */ - if (tar_data->compression_algorithm == PG_COMPRESSION_NONE) + if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE) { errno = 0; if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) { /* If write didn't set errno, assume problem is no disk space */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } #ifdef HAVE_LIBZ - else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) + else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) { - if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false)) + if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf), + false)) return false; /* Also flush all data to make sure the gzip stream is finished */ @@ -1239,7 +1262,7 @@ tar_finish(void) if (r == Z_STREAM_ERROR) { - tar_set_error("could not compress data"); + wwmethod->lasterrstring = "could not compress data"; return false; } if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) @@ -1253,7 +1276,7 @@ tar_finish(void) * If write didn't set errno, assume problem is no disk * space. */ - tar_data->lasterrno = errno ? errno : ENOSPC; + wwmethod->lasterrno = errno ? errno : ENOSPC; return false; } } @@ -1263,7 +1286,7 @@ tar_finish(void) if (deflateEnd(tar_data->zp) != Z_OK) { - tar_set_error("could not close compression stream"); + wwmethod->lasterrstring = "could not close compression stream"; return false; } } @@ -1275,29 +1298,29 @@ tar_finish(void) } /* sync the empty blocks as well, since they're after the last file */ - if (tar_data->sync) + if (wwmethod->sync) { if (fsync(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } if (close(tar_data->fd) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } tar_data->fd = -1; - if (tar_data->sync) + if (wwmethod->sync) { if (fsync_fname(tar_data->tarfilename, false) != 0 || fsync_parent_path(tar_data->tarfilename) != 0) { - tar_data->lasterrno = errno; + wwmethod->lasterrno = errno; return false; } } @@ -1305,6 +1328,19 @@ tar_finish(void) return true; } +static void +tar_free(WalWriteMethod *wwmethod) +{ + TarMethodData *tar_data = (TarMethodData *) wwmethod; + + pg_free(tar_data->tarfilename); +#ifdef HAVE_LIBZ + if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP) + pg_free(tar_data->zlibOut); +#endif + pg_free(wwmethod); +} + /* * The argument compression_algorithm is currently ignored. It is in place for * symmetry with CreateWalDirectoryMethod which uses it for distinguishing @@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync) { - WalWriteMethod *method; + TarMethodData *wwmethod; const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ? ".tar.gz" : ".tar"; - method = pg_malloc0(sizeof(WalWriteMethod)); - method->open_for_write = tar_open_for_write; - method->write = tar_write; - method->get_file_size = tar_get_file_size; - method->get_file_name = tar_get_file_name; - method->compression_algorithm = tar_compression_algorithm; - method->close = tar_close; - method->sync = tar_sync; - method->existsfile = tar_existsfile; - method->finish = tar_finish; - method->getlasterror = tar_getlasterror; + wwmethod = pg_malloc0(sizeof(TarMethodData)); + *((const WalWriteMethodOps **) &wwmethod->base.ops) = + &WalTarMethodOps; + wwmethod->base.compression_algorithm = compression_algorithm; + wwmethod->base.compression_level = compression_level; + wwmethod->base.sync = sync; + clear_error(&wwmethod->base); - tar_data = pg_malloc0(sizeof(TarMethodData)); - tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); - sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix); - tar_data->fd = -1; - tar_data->compression_algorithm = compression_algorithm; - tar_data->compression_level = compression_level; - tar_data->sync = sync; + wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); + sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix); + wwmethod->fd = -1; #ifdef HAVE_LIBZ if (compression_algorithm == PG_COMPRESSION_GZIP) - tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); #endif - return method; + return &wwmethod->base; } -void -FreeWalTarMethod(void) +const char * +GetLastWalMethodError(WalWriteMethod *wwmethod) { - pg_free(tar_data->tarfilename); -#ifdef HAVE_LIBZ - if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP) - pg_free(tar_data->zlibOut); -#endif - pg_free(tar_data); - tar_data = NULL; + if (wwmethod->lasterrstring) + return wwmethod->lasterrstring; + return strerror(wwmethod->lasterrno); } diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index cf5ed87fbe..0d7728d086 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -16,6 +16,7 @@ typedef struct WalWriteMethod WalWriteMethod; typedef struct { + WalWriteMethod *wwmethod; off_t currpos; char *pathname; /* @@ -34,16 +35,9 @@ typedef enum } WalCloseMethod; /* - * A WalWriteMethod structure represents the different methods used - * to write the streaming WAL as it's received. - * - * All methods that have a failure return indicator will set state - * allowing the getlasterror() method to return a suitable message. - * Commonly, errno is this state (or part of it); so callers must take - * care not to clobber errno between a failed method call and use of - * getlasterror() to retrieve the message. + * Table of callbacks for a WalWriteMethod. */ -struct WalWriteMethod +typedef struct WalWriteMethodOps { /* * Open a target file. Returns Walfile, or NULL if open failed. If a temp @@ -51,7 +45,7 @@ struct WalWriteMethod * automatically renamed in close(). If pad_to_size is specified, the file * will be padded with NUL up to that size, if supported by the Walmethod. */ - Walfile *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); + Walfile *(*open_for_write) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size); /* * Close an open Walfile, using one or more methods for handling automatic @@ -60,19 +54,16 @@ struct WalWriteMethod int (*close) (Walfile *f, WalCloseMethod method); /* Check if a file exist */ - bool (*existsfile) (const char *pathname); + bool (*existsfile) (WalWriteMethod *wwmethod, const char *pathname); /* Return the size of a file, or -1 on failure. */ - ssize_t (*get_file_size) (const char *pathname); + ssize_t (*get_file_size) (WalWriteMethod *wwmethod, const char *pathname); /* * Return the name of the current file to work on in pg_malloc()'d string, * without the base directory. This is useful for logging. */ - char *(*get_file_name) (const char *pathname, const char *temp_suffix); - - /* Returns the compression method */ - pg_compress_algorithm (*compression_algorithm) (void); + char *(*get_file_name) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix); /* * Write count number of bytes to the file, and return the number of bytes @@ -91,10 +82,37 @@ struct WalWriteMethod * close/write/sync of shared resources succeeded, otherwise returns false * (but the resources are still closed). */ - bool (*finish) (void); + bool (*finish) (WalWriteMethod *wwmethod); - /* Return a text for the last error in this Walfile */ - const char *(*getlasterror) (void); + /* + * Free subsidiary data associated with the WalWriteMethod, and the + * WalWriteMethod itself. + */ + void (*free) (WalWriteMethod *wwmethod); +} WalWriteMethodOps; + +/* + * A WalWriteMethod structure represents a way of writing streaming WAL as + * it's received. + * + * All methods that have a failure return indicator will set lasterrstring + * or lasterrno (the former takes precedence) so that the caller can signal + * a suitable error. + */ +struct WalWriteMethod +{ + const WalWriteMethodOps *ops; + pg_compress_algorithm compression_algorithm; + int compression_level; + bool sync; + const char *lasterrstring; /* if set, takes precedence over lasterrno */ + int lasterrno; + /* + * MORE DATA FOLLOWS AT END OF STRUCT + * + * Each WalWriteMethod is expected to embed this as the first member of + * a larger struct with method-specific fields following. + */ }; /* @@ -111,6 +129,4 @@ WalWriteMethod *CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algo, int compression, bool sync); -/* Cleanup routines for previously-created methods */ -void FreeWalDirectoryMethod(void); -void FreeWalTarMethod(void); +const char *GetLastWalMethodError(WalWriteMethod *wwmethod);