From bf9aa490db24b2334b3595ee33653bf2fe39208c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 2 Dec 2010 21:39:03 +0200 Subject: [PATCH] Refactor the pg_dump zlib code from pg_backup_custom.c to a separate file, to make it easier to reuse that code. There is no user-visible changes. This is in preparation for the patch to add a new archive format, a directory, to perform a custom-like dump but with each table being dumped to a separate file (that in turn is a prerequisite for parallel pg_dump). This also makes it easier to add new compression methods in the future, and makes the pg_backup_custom.c code easier to read, when the compression-related code is factored out. Joachim Wieland, with heavy editorialization by me. --- src/bin/pg_dump/Makefile | 2 +- src/bin/pg_dump/compress_io.c | 403 +++++++++++++++++++++++++++ src/bin/pg_dump/compress_io.h | 68 +++++ src/bin/pg_dump/pg_backup_archiver.h | 7 +- src/bin/pg_dump/pg_backup_custom.c | 365 +++++------------------- 5 files changed, 552 insertions(+), 293 deletions(-) create mode 100644 src/bin/pg_dump/compress_io.c create mode 100644 src/bin/pg_dump/compress_io.h diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 0367466f84..efb031a989 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \ pg_backup_files.o pg_backup_null.o pg_backup_tar.o \ - dumputils.o $(WIN32RES) + dumputils.o compress_io.o $(WIN32RES) KEYWRDOBJS = keywords.o kwlookup.o diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c new file mode 100644 index 0000000000..a02908276d --- /dev/null +++ b/src/bin/pg_dump/compress_io.c @@ -0,0 +1,403 @@ +/*------------------------------------------------------------------------- + * + * compress_io.c + * Routines for archivers to write an uncompressed or compressed data + * stream. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * The interface for writing to an archive consists of three functions: + * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call + * AllocateCompressor, then write all the data by calling WriteDataToArchive + * as many times as needed, and finally EndCompressor. WriteDataToArchive + * and EndCompressor will call the WriteFunc that was provided to + * AllocateCompressor for each chunk of compressed data. + * + * The interface for reading an archive consists of just one function: + * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input + * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the + * compressed data chunk at a time, and ReadDataFromArchive decompresses it + * and passes the decompressed data to ahwrite(), until ReadFunc returns 0 + * to signal EOF. + * + * The interface is the same for compressed and uncompressed streams. + * + * + * IDENTIFICATION + * src/bin/pg_dump/compress_io.c + * + *------------------------------------------------------------------------- + */ + +#include "compress_io.h" + +static const char *modulename = gettext_noop("compress_io"); + +static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, + int *level); + +/* Routines that support zlib compressed data I/O */ +#ifdef HAVE_LIBZ +static void InitCompressorZlib(CompressorState *cs, int level); +static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, + bool flush); +static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); +static size_t WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen); +static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); + +#endif + +/* Routines that support uncompressed data I/O */ +static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); +static size_t WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen); + +/* + * Interprets a numeric 'compression' value. The algorithm implied by the + * value (zlib or none at the moment), is returned in *alg, and the + * zlib compression level in *level. + */ +static void +ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level) +{ + if (compression == Z_DEFAULT_COMPRESSION || + (compression > 0 && compression <= 9)) + *alg = COMPR_ALG_LIBZ; + else if (compression == 0) + *alg = COMPR_ALG_NONE; + else + die_horribly(NULL, modulename, "Invalid compression code: %d\n", + compression); + + /* The level is just the passed-in value. */ + if (level) + *level = compression; +} + +/* Public interface routines */ + +/* Allocate a new compressor */ +CompressorState * +AllocateCompressor(int compression, WriteFunc writeF) +{ + CompressorState *cs; + CompressionAlgorithm alg; + int level; + + ParseCompressionOption(compression, &alg, &level); + +#ifndef HAVE_LIBZ + if (alg == COMPR_ALG_LIBZ) + die_horribly(NULL, modulename, "not built with zlib support\n"); +#endif + + cs = (CompressorState *) calloc(1, sizeof(CompressorState)); + if (cs == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + cs->writeF = writeF; + cs->comprAlg = alg; + + /* + * Perform compression algorithm specific initialization. + */ +#ifdef HAVE_LIBZ + if (alg == COMPR_ALG_LIBZ) + InitCompressorZlib(cs, level); +#endif + + return cs; +} + +/* + * Read all compressed data from the input stream (via readF) and print it + * out with ahwrite(). + */ +void +ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF) +{ + CompressionAlgorithm alg; + + ParseCompressionOption(compression, &alg, NULL); + + if (alg == COMPR_ALG_NONE) + ReadDataFromArchiveNone(AH, readF); + if (alg == COMPR_ALG_LIBZ) + { +#ifdef HAVE_LIBZ + ReadDataFromArchiveZlib(AH, readF); +#else + die_horribly(NULL, modulename, "not built with zlib support\n"); +#endif + } +} + +/* + * Compress and write data to the output stream (via writeF). + */ +size_t +WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + switch(cs->comprAlg) + { + case COMPR_ALG_LIBZ: +#ifdef HAVE_LIBZ + return WriteDataToArchiveZlib(AH, cs, data, dLen); +#else + die_horribly(NULL, modulename, "not built with zlib support\n"); +#endif + case COMPR_ALG_NONE: + return WriteDataToArchiveNone(AH, cs, data, dLen); + } + return 0; /* keep compiler quiet */ +} + +/* + * Terminate compression library context and flush its buffers. + */ +void +EndCompressor(ArchiveHandle *AH, CompressorState *cs) +{ +#ifdef HAVE_LIBZ + if (cs->comprAlg == COMPR_ALG_LIBZ) + EndCompressorZlib(AH, cs); +#endif + free(cs); +} + +/* Private routines, specific to each compression method. */ + +#ifdef HAVE_LIBZ +/* + * Functions for zlib compressed output. + */ + +static void +InitCompressorZlib(CompressorState *cs, int level) +{ + z_streamp zp; + + zp = cs->zp = (z_streamp) malloc(sizeof(z_stream)); + if (cs->zp == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + /* + * zlibOutSize is the buffer size we tell zlib it can output + * to. We actually allocate one extra byte because some routines + * want to append a trailing zero byte to the zlib output. + */ + cs->zlibOut = (char *) malloc(ZLIB_OUT_SIZE + 1); + cs->zlibOutSize = ZLIB_OUT_SIZE; + + if (cs->zlibOut == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + + if (deflateInit(zp, level) != Z_OK) + die_horribly(NULL, modulename, + "could not initialize compression library: %s\n", + zp->msg); + + /* Just be paranoid - maybe End is called after Start, with no Write */ + zp->next_out = (void *) cs->zlibOut; + zp->avail_out = cs->zlibOutSize; +} + +static void +EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) +{ + z_streamp zp = cs->zp; + + zp->next_in = NULL; + zp->avail_in = 0; + + /* Flush any remaining data from zlib buffer */ + DeflateCompressorZlib(AH, cs, true); + + if (deflateEnd(zp) != Z_OK) + die_horribly(AH, modulename, + "could not close compression stream: %s\n", zp->msg); + + free(cs->zlibOut); + free(cs->zp); +} + +static void +DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + z_streamp zp = cs->zp; + char *out = cs->zlibOut; + int res = Z_OK; + + while (cs->zp->avail_in != 0 || flush) + { + res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); + if (res == Z_STREAM_ERROR) + die_horribly(AH, modulename, + "could not compress data: %s\n", zp->msg); + if ((flush && (zp->avail_out < cs->zlibOutSize)) + || (zp->avail_out == 0) + || (zp->avail_in != 0) + ) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (zp->avail_out < cs->zlibOutSize) + { + /* + * Any write function shoud do its own error checking but + * to make sure we do a check here as well... + */ + size_t len = cs->zlibOutSize - zp->avail_out; + if (cs->writeF(AH, out, len) != len) + die_horribly(AH, modulename, + "could not write to output file: %s\n", + strerror(errno)); + } + zp->next_out = (void *) out; + zp->avail_out = cs->zlibOutSize; + } + + if (res == Z_STREAM_END) + break; + } +} + +static size_t +WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen) +{ + cs->zp->next_in = (void *) data; + cs->zp->avail_in = dLen; + DeflateCompressorZlib(AH, cs, false); + /* we have either succeeded in writing dLen bytes or we have called + * die_horribly() */ + return dLen; +} + +static void +ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF) +{ + z_streamp zp; + char *out; + int res = Z_OK; + size_t cnt; + char *buf; + size_t buflen; + + zp = (z_streamp) malloc(sizeof(z_stream)); + if (zp == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + buf = malloc(ZLIB_IN_SIZE); + if (buf == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + buflen = ZLIB_IN_SIZE; + + out = malloc(ZLIB_OUT_SIZE + 1); + if (out == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + + if (inflateInit(zp) != Z_OK) + die_horribly(NULL, modulename, + "could not initialize compression library: %s\n", + zp->msg); + + /* no minimal chunk size for zlib */ + while ((cnt = readF(AH, &buf, &buflen))) + { + zp->next_in = (void *) buf; + zp->avail_in = cnt; + + while (zp->avail_in > 0) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + die_horribly(AH, modulename, + "could not uncompress data: %s\n", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + } + + zp->next_in = NULL; + zp->avail_in = 0; + while (res != Z_STREAM_END) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + die_horribly(AH, modulename, + "could not uncompress data: %s\n", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + + if (inflateEnd(zp) != Z_OK) + die_horribly(AH, modulename, + "could not close compression library: %s\n", zp->msg); + + free(buf); + free(out); + free(zp); +} + +#endif /* HAVE_LIBZ */ + + +/* + * Functions for uncompressed output. + */ + +static void +ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF) +{ + size_t cnt; + char *buf; + size_t buflen; + + buf = malloc(ZLIB_OUT_SIZE); + if (buf == NULL) + die_horribly(NULL, modulename, "out of memory\n"); + buflen = ZLIB_OUT_SIZE; + + while ((cnt = readF(AH, &buf, &buflen))) + { + ahwrite(buf, 1, cnt, AH); + } + + free(buf); +} + +static size_t +WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen) +{ + /* + * Any write function should do its own error checking but to make + * sure we do a check here as well... + */ + if (cs->writeF(AH, data, dLen) != dLen) + die_horribly(AH, modulename, + "could not write to output file: %s\n", + strerror(errno)); + return dLen; +} + + diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h new file mode 100644 index 0000000000..8edf2b1ea6 --- /dev/null +++ b/src/bin/pg_dump/compress_io.h @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * compress_io.h + * Interface to compress_io.c routines + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_io.h + * + *------------------------------------------------------------------------- + */ + +#ifndef __COMPRESS_IO__ +#define __COMPRESS_IO__ + +#include "postgres_fe.h" +#include "pg_backup_archiver.h" + +/* Initial buffer sizes used in zlib compression. */ +#define ZLIB_OUT_SIZE 4096 +#define ZLIB_IN_SIZE 4096 + +struct _CompressorState; + +typedef enum +{ + COMPR_ALG_NONE, + COMPR_ALG_LIBZ +} CompressionAlgorithm; + +/* Prototype for callback function to WriteDataToArchive() */ +typedef size_t (*WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len); + +/* + * Prototype for callback function to ReadDataFromArchive() + * + * ReadDataFromArchive will call the read function repeatedly, until it + * returns 0 to signal EOF. ReadDataFromArchive passes a buffer to read the + * data into in *buf, of length *buflen. If that's not big enough for the + * callback function, it can free() it and malloc() a new one, returning the + * new buffer and its size in *buf and *buflen. + * + * Returns the number of bytes read into *buf, or 0 on EOF. + */ +typedef size_t (*ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen); + +typedef struct _CompressorState +{ + CompressionAlgorithm comprAlg; + WriteFunc writeF; + +#ifdef HAVE_LIBZ + z_streamp zp; + char *zlibOut; + size_t zlibOutSize; +#endif +} CompressorState; + +extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF); +extern void ReadDataFromArchive(ArchiveHandle *AH, int compression, + ReadFunc readF); +extern size_t WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); +extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); + +#endif diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index ae0c6e0963..a285a69b97 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -49,6 +49,7 @@ #define GZCLOSE(fh) fclose(fh) #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s)) #define GZREAD(p, s, n, fh) fread(p, s, n, fh) +/* this is just the redefinition of a libz constant */ #define Z_DEFAULT_COMPRESSION (-1) typedef struct _z_stream @@ -266,7 +267,11 @@ typedef struct _archiveHandle DumpId maxDumpId; /* largest DumpId among all TOC entries */ struct _tocEntry *currToc; /* Used when dumping data */ - int compression; /* Compression requested on open */ + int compression; /* Compression requested on open + * Possible values for compression: + * -1 Z_DEFAULT_COMPRESSION + * 0 COMPRESSION_NONE + * 1-9 levels for gzip compression */ ArchiveMode mode; /* File mode - r or w */ void *formatData; /* Header data specific to file format */ diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 2bc7e8f13c..f2969e2fac 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -25,6 +25,7 @@ */ #include "pg_backup_archiver.h" +#include "compress_io.h" /*-------- * Routines in the format interface @@ -58,20 +59,9 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); -/*------------ - * Buffers used in zlib compression and extra data stored in archive and - * in TOC entries. - *------------ - */ -#define zlibOutSize 4096 -#define zlibInSize 4096 - typedef struct { - z_streamp zp; - char *zlibOut; - char *zlibIn; - size_t inSize; + CompressorState *cs; int hasSeek; pgoff_t filePos; pgoff_t dataStart; @@ -89,10 +79,10 @@ typedef struct *------ */ static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id); -static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te); -static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te); static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx); -static int _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush); + +static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len); +static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen); static const char *modulename = gettext_noop("custom archiver"); @@ -136,39 +126,20 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; - /* - * Set up some special context used in compressing data. - */ + /* Set up a private area. */ ctx = (lclContext *) calloc(1, sizeof(lclContext)); if (ctx == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->formatData = (void *) ctx; - ctx->zp = (z_streamp) malloc(sizeof(z_stream)); - if (ctx->zp == NULL) - die_horribly(AH, modulename, "out of memory\n"); - /* Initialize LO buffering */ AH->lo_buf_size = LOBBUFSIZE; AH->lo_buf = (void *) malloc(LOBBUFSIZE); if (AH->lo_buf == NULL) die_horribly(AH, modulename, "out of memory\n"); - /* - * zlibOutSize is the buffer size we tell zlib it can output to. We - * actually allocate one extra byte because some routines want to append a - * trailing zero byte to the zlib output. The input buffer is expansible - * and is always of size ctx->inSize; zlibInSize is just the initial - * default size for it. - */ - ctx->zlibOut = (char *) malloc(zlibOutSize + 1); - ctx->zlibIn = (char *) malloc(zlibInSize); - ctx->inSize = zlibInSize; ctx->filePos = 0; - if (ctx->zlibOut == NULL || ctx->zlibIn == NULL) - die_horribly(AH, modulename, "out of memory\n"); - /* * Now open the file */ @@ -324,7 +295,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - _StartDataCompressor(AH, te); + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); } /* @@ -340,17 +311,12 @@ static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) { lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; + CompressorState *cs = ctx->cs; - zp->next_in = (void *) data; - zp->avail_in = dLen; + if (dLen == 0) + return 0; - while (zp->avail_in != 0) - { - /* printf("Deflating %lu bytes\n", (unsigned long) dLen); */ - _DoDeflate(AH, ctx, 0); - } - return dLen; + return WriteDataToArchive(AH, cs, data, dLen); } /* @@ -363,10 +329,11 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) static void _EndData(ArchiveHandle *AH, TocEntry *te) { -/* lclContext *ctx = (lclContext *) AH->formatData; */ -/* lclTocEntry *tctx = (lclTocEntry *) te->formatData; */ + lclContext *ctx = (lclContext *) AH->formatData; - _EndDataCompressor(AH, te); + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); } /* @@ -401,11 +368,14 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { + lclContext *ctx = (lclContext *) AH->formatData; + if (oid == 0) die_horribly(AH, modulename, "invalid OID for large object\n"); WriteInt(AH, oid); - _StartDataCompressor(AH, te); + + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); } /* @@ -416,7 +386,11 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { - _EndDataCompressor(AH, te); + lclContext *ctx = (lclContext *) AH->formatData; + + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); } /* @@ -532,108 +506,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) static void _PrintData(ArchiveHandle *AH) { - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; - size_t blkLen; - char *in = ctx->zlibIn; - size_t cnt; - -#ifdef HAVE_LIBZ - int res; - char *out = ctx->zlibOut; -#endif - -#ifdef HAVE_LIBZ - - res = Z_OK; - - if (AH->compression != 0) - { - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - if (inflateInit(zp) != Z_OK) - die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg); - } -#endif - - blkLen = ReadInt(AH); - while (blkLen != 0) - { - if (blkLen + 1 > ctx->inSize) - { - free(ctx->zlibIn); - ctx->zlibIn = NULL; - ctx->zlibIn = (char *) malloc(blkLen + 1); - if (!ctx->zlibIn) - die_horribly(AH, modulename, "out of memory\n"); - - ctx->inSize = blkLen + 1; - in = ctx->zlibIn; - } - - cnt = fread(in, 1, blkLen, AH->FH); - if (cnt != blkLen) - { - if (feof(AH->FH)) - die_horribly(AH, modulename, - "could not read from input file: end of file\n"); - else - die_horribly(AH, modulename, - "could not read from input file: %s\n", strerror(errno)); - } - - ctx->filePos += blkLen; - - zp->next_in = (void *) in; - zp->avail_in = blkLen; - -#ifdef HAVE_LIBZ - if (AH->compression != 0) - { - while (zp->avail_in != 0) - { - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg); - - out[zlibOutSize - zp->avail_out] = '\0'; - ahwrite(out, 1, zlibOutSize - zp->avail_out, AH); - } - } - else -#endif - { - in[zp->avail_in] = '\0'; - ahwrite(in, 1, zp->avail_in, AH); - zp->avail_in = 0; - } - blkLen = ReadInt(AH); - } - -#ifdef HAVE_LIBZ - if (AH->compression != 0) - { - zp->next_in = NULL; - zp->avail_in = 0; - while (res != Z_STREAM_END) - { - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg); - - out[zlibOutSize - zp->avail_out] = '\0'; - ahwrite(out, 1, zlibOutSize - zp->avail_out, AH); - } - if (inflateEnd(zp) != Z_OK) - die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg); - } -#endif + ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); } static void @@ -684,20 +557,21 @@ _skipData(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; size_t blkLen; - char *in = ctx->zlibIn; + char *buf = NULL; + int buflen = 0; size_t cnt; blkLen = ReadInt(AH); while (blkLen != 0) { - if (blkLen > ctx->inSize) + if (blkLen > buflen) { - free(ctx->zlibIn); - ctx->zlibIn = (char *) malloc(blkLen); - ctx->inSize = blkLen; - in = ctx->zlibIn; + if (buf) + free(buf); + buf = (char *) malloc(blkLen); + buflen = blkLen; } - cnt = fread(in, 1, blkLen, AH->FH); + cnt = fread(buf, 1, blkLen, AH->FH); if (cnt != blkLen) { if (feof(AH->FH)) @@ -712,6 +586,9 @@ _skipData(ArchiveHandle *AH) blkLen = ReadInt(AH); } + + if (buf) + free(buf); } /* @@ -961,145 +838,58 @@ _readBlockHeader(ArchiveHandle *AH, int *type, int *id) } /* - * If zlib is available, then startit up. This is called from - * StartData & StartBlob. The buffers are setup in the Init routine. + * Callback function for WriteDataToArchive. Writes one block of (compressed) + * data to the archive. */ -static void -_StartDataCompressor(ArchiveHandle *AH, TocEntry *te) +static size_t +_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len) { - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; + /* never write 0-byte blocks (this should not happen) */ + if (len == 0) + return 0; -#ifdef HAVE_LIBZ - - if (AH->compression < 0 || AH->compression > 9) - AH->compression = Z_DEFAULT_COMPRESSION; - - if (AH->compression != 0) - { - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - if (deflateInit(zp, AH->compression) != Z_OK) - die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg); - } -#else - - AH->compression = 0; -#endif - - /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) ctx->zlibOut; - zp->avail_out = zlibOutSize; + WriteInt(AH, len); + return _WriteBuf(AH, buf, len); } /* - * Send compressed data to the output stream (via ahwrite). - * Each data chunk is preceded by it's length. - * In the case of Z0, or no zlib, just write the raw data. - * + * Callback function for ReadDataFromArchive. To keep things simple, we + * always read one compressed block at a time. */ -static int -_DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush) +static size_t +_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen) { - z_streamp zp = ctx->zp; + size_t blkLen; + size_t cnt; -#ifdef HAVE_LIBZ - char *out = ctx->zlibOut; - int res = Z_OK; + /* Read length */ + blkLen = ReadInt(AH); + if (blkLen == 0) + return 0; - if (AH->compression != 0) + /* If the caller's buffer is not large enough, allocate a bigger one */ + if (blkLen > *buflen) { - res = deflate(zp, flush); - if (res == Z_STREAM_ERROR) - die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg); - - if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize)) - || (zp->avail_out == 0) - || (zp->avail_in != 0) - ) - { - /* - * Extra paranoia: avoid zero-length chunks since a zero length - * chunk is the EOF marker. This should never happen but... - */ - if (zp->avail_out < zlibOutSize) - { - /* - * printf("Wrote %lu byte deflated chunk\n", (unsigned long) - * (zlibOutSize - zp->avail_out)); - */ - WriteInt(AH, zlibOutSize - zp->avail_out); - if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out)) - die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); - ctx->filePos += zlibOutSize - zp->avail_out; - } - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - } + free(*buf); + *buf = (char *) malloc(blkLen); + if (!(*buf)) + die_horribly(AH, modulename, "out of memory\n"); + *buflen = blkLen; } - else -#endif + + cnt = _ReadBuf(AH, *buf, blkLen); + if (cnt != blkLen) { - if (zp->avail_in > 0) - { - WriteInt(AH, zp->avail_in); - if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in) - die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); - ctx->filePos += zp->avail_in; - zp->avail_in = 0; - } + if (feof(AH->FH)) + die_horribly(AH, modulename, + "could not read from input file: end of file\n"); else - { -#ifdef HAVE_LIBZ - if (flush == Z_FINISH) - res = Z_STREAM_END; -#endif - } + die_horribly(AH, modulename, + "could not read from input file: %s\n", strerror(errno)); } - -#ifdef HAVE_LIBZ - return res; -#else - return 1; -#endif + return cnt; } -/* - * Terminate zlib context and flush it's buffers. If no zlib - * then just return. - */ -static void -_EndDataCompressor(ArchiveHandle *AH, TocEntry *te) -{ - -#ifdef HAVE_LIBZ - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; - int res; - - if (AH->compression != 0) - { - zp->next_in = NULL; - zp->avail_in = 0; - - do - { - /* printf("Ending data output\n"); */ - res = _DoDeflate(AH, ctx, Z_FINISH); - } while (res != Z_STREAM_END); - - if (deflateEnd(zp) != Z_OK) - die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg); - } -#endif - - /* Send the end marker */ - WriteInt(AH, 0); -} - - /* * Clone format-specific fields during parallel restoration. */ @@ -1114,12 +904,9 @@ _Clone(ArchiveHandle *AH) memcpy(AH->formatData, ctx, sizeof(lclContext)); ctx = (lclContext *) AH->formatData; - ctx->zp = (z_streamp) malloc(sizeof(z_stream)); - ctx->zlibOut = (char *) malloc(zlibOutSize + 1); - ctx->zlibIn = (char *) malloc(ctx->inSize); - - if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL) - die_horribly(AH, modulename, "out of memory\n"); + /* sanity check, shouldn't happen */ + if (ctx->cs != NULL) + die_horribly(AH, modulename, "compressor active\n"); /* * Note: we do not make a local lo_buf because we expect at most one BLOBS @@ -1133,9 +920,5 @@ static void _DeClone(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; - - free(ctx->zlibOut); - free(ctx->zlibIn); - free(ctx->zp); free(ctx); }