Introduce 'bbstreamer' abstraction to modularize pg_basebackup.

pg_basebackup knows how to do quite a few things with a backup that it
gets from the server, like just write out the files, or compress them
first, or even parse the tar format and inject a modified
postgresql.auto.conf file into the archive generated by the server.
Unforatunely, this makes pg_basebackup.c a very large source file, and
also somewhat difficult to enhance, because for example the knowledge
that the server is sending us a 'tar' file rather than some other sort
of archive is spread all over the place rather than centralized.

In an effort to improve this situation, this commit invents a new
'bbstreamer' abstraction. Each archive received from the server is
fed to a bbstreamer which may choose to dispose of it or pass it
along to some other bbstreamer. Chunks may also be "labelled"
according to whether they are part of the payload data of a file
in the archive or part of the archive metadata.

So, for example, if we want to take a tar file, modify the
postgresql.auto.conf file it contains, and the gzip the result
and write it out, we can use a bbstreamer_tar_parser to parse the
tar file received from the server, a bbstreamer_recovery_injector
to modify the contents of postgresql.auto.conf, a
bbstreamer_tar_archiver to replace the tar headers for the file
modified in the previous step with newly-built ones that are
correct for the modified file, and a bbstreamer_gzip_writer to
gzip and write the resulting data. Only the objects with "tar"
in the name know anything about the tar archive format, and in
theory we could re-archive using some other format rather than
"tar" if somebody wanted to write the code.

These chances do add a substantial amount of code, but I think the
result is a lot more maintainable and extensible. pg_basebackup.c
itself shrinks by roughly a third, with a lot of the complexity
previously contained there moving into the newly-added files.

Patch by me. The larger patch series of which this is a part has been
reviewed and tested at various times by Andres Freund, Sumanta
Mukherjee, Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja,
Mark Dilger, Sergei Kornilov, and Jeevan Ladhe.

Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
This commit is contained in:
Robert Haas 2021-11-05 10:22:07 -04:00
parent 00a354a135
commit 23a1c6578c
6 changed files with 1714 additions and 754 deletions

View File

@ -35,10 +35,16 @@ OBJS = \
streamutil.o \
walmethods.o
BBOBJS = \
pg_basebackup.o \
bbstreamer_file.o \
bbstreamer_inject.o \
bbstreamer_tar.o
all: pg_basebackup pg_receivewal pg_recvlogical
pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@ -61,7 +67,7 @@ uninstall:
clean distclean maintainer-clean:
rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
pg_basebackup.o pg_receivewal.o pg_recvlogical.o \
$(BBOBJS) pg_receivewal.o pg_recvlogical.o \
$(OBJS)
rm -rf tmp_check

View File

@ -0,0 +1,217 @@
/*-------------------------------------------------------------------------
*
* bbstreamer.h
*
* Each tar archive returned by the server is passed to one or more
* bbstreamer objects for further processing. The bbstreamer may do
* something simple, like write the archive to a file, perhaps after
* compressing it, but it can also do more complicated things, like
* annotating the byte stream to indicate which parts of the data
* correspond to tar headers or trailing padding, vs. which parts are
* payload data. A subsequent bbstreamer may use this information to
* make further decisions about how to process the data; for example,
* it might choose to modify the archive contents.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer.h
*-------------------------------------------------------------------------
*/
#ifndef BBSTREAMER_H
#define BBSTREAMER_H
#include "lib/stringinfo.h"
#include "pqexpbuffer.h"
struct bbstreamer;
struct bbstreamer_ops;
typedef struct bbstreamer bbstreamer;
typedef struct bbstreamer_ops bbstreamer_ops;
/*
* Each chunk of archive data passed to a bbstreamer is classified into one
* of these categories. When data is first received from the remote server,
* each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will
* be of whatever size the remote server chose to send.
*
* If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all
* chunks should be labelled as one of the other types listed here. In
* addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and
* exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if
* that means a zero-length call. There can be any number of
* BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There
* should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
* last BBSTREAMER_MEMBER_TRAILER chunk.
*
* In theory, we could need other classifications here, such as a way of
* indicating an archive header, but the "tar" format doesn't need anything
* else, so for the time being there's no point.
*/
typedef enum
{
BBSTREAMER_UNKNOWN,
BBSTREAMER_MEMBER_HEADER,
BBSTREAMER_MEMBER_CONTENTS,
BBSTREAMER_MEMBER_TRAILER,
BBSTREAMER_ARCHIVE_TRAILER
} bbstreamer_archive_context;
/*
* Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER,
* BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also
* pass a pointer to an instance of this struct. The details are expected
* to be present in the archive header and used to fill the struct, after
* which all subsequent calls for the same archive member are expected to
* pass the same details.
*/
typedef struct
{
char pathname[MAXPGPATH];
pgoff_t size;
mode_t mode;
uid_t uid;
gid_t gid;
bool is_directory;
bool is_link;
char linktarget[MAXPGPATH];
} bbstreamer_member;
/*
* Generally, each type of bbstreamer will define its own struct, but the
* first element should be 'bbstreamer base'. A bbstreamer that does not
* require any additional private data could use this structure directly.
*
* bbs_ops is a pointer to the bbstreamer_ops object which contains the
* function pointers appropriate to this type of bbstreamer.
*
* bbs_next is a pointer to the successor bbstreamer, for those types of
* bbstreamer which forward data to a successor. It need not be used and
* should be set to NULL when not relevant.
*
* bbs_buffer is a buffer for accumulating data for temporary storage. Each
* type of bbstreamer makes its own decisions about whether and how to use
* this buffer.
*/
struct bbstreamer
{
const bbstreamer_ops *bbs_ops;
bbstreamer *bbs_next;
StringInfoData bbs_buffer;
};
/*
* There are three callbacks for a bbstreamer. The 'content' callback is
* called repeatedly, as described in the bbstreamer_archive_context comments.
* Then, the 'finalize' callback is called once at the end, to give the
* bbstreamer a chance to perform cleanup such as closing files. Finally,
* because this code is running in a frontend environment where, as of this
* writing, there are no memory contexts, the 'free' callback is called to
* release memory. These callbacks should always be invoked using the static
* inline functions defined below.
*/
struct bbstreamer_ops
{
void (*content) (bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
void (*finalize) (bbstreamer *streamer);
void (*free) (bbstreamer *streamer);
};
/* Send some content to a bbstreamer. */
static inline void
bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
Assert(streamer != NULL);
streamer->bbs_ops->content(streamer, member, data, len, context);
}
/* Finalize a bbstreamer. */
static inline void
bbstreamer_finalize(bbstreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->finalize(streamer);
}
/* Free a bbstreamer. */
static inline void
bbstreamer_free(bbstreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->free(streamer);
}
/*
* This is a convenience method for use when implementing a bbstreamer; it is
* not for use by outside callers. It adds the amount of data specified by
* 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data'
* accordingly.
*/
static inline void
bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len,
int nbytes)
{
Assert(nbytes <= *len);
appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
*len -= nbytes;
*data += nbytes;
}
/*
* This is a convenence method for use when implementing a bbstreamer; it is
* not for use by outsider callers. It attempts to add enough data to the
* bbstreamer's buffer to reach a length of target_bytes and adjusts '*len'
* and '*data' accordingly. It returns true if the target length has been
* reached and false otherwise.
*/
static inline bool
bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len,
int target_bytes)
{
int buflen = streamer->bbs_buffer.len;
if (buflen >= target_bytes)
{
/* Target length already reached; nothing to do. */
return true;
}
if (buflen + *len < target_bytes)
{
/* Not enough data to reach target length; buffer all of it. */
bbstreamer_buffer_bytes(streamer, data, len, *len);
return false;
}
/* Buffer just enough to reach the target length. */
bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
return true;
}
/*
* Functions for creating bbstreamer objects of various types. See the header
* comments for each of these functions for details.
*/
extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file);
extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file,
int compresslevel);
extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *));
extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents);
extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname,
char *data, int len);
#endif

View File

@ -0,0 +1,579 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_file.c
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_file.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include <unistd.h>
#include "bbstreamer.h"
#include "common/logging.h"
#include "common/file_perm.h"
#include "common/string.h"
typedef struct bbstreamer_plain_writer
{
bbstreamer base;
char *pathname;
FILE *file;
bool should_close_file;
} bbstreamer_plain_writer;
#ifdef HAVE_LIBZ
typedef struct bbstreamer_gzip_writer
{
bbstreamer base;
char *pathname;
gzFile gzfile;
} bbstreamer_gzip_writer;
#endif
typedef struct bbstreamer_extractor
{
bbstreamer base;
char *basepath;
const char *(*link_map) (const char *);
void (*report_output_file) (const char *);
char filename[MAXPGPATH];
FILE *file;
} bbstreamer_extractor;
static void bbstreamer_plain_writer_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_plain_writer_finalize(bbstreamer *streamer);
static void bbstreamer_plain_writer_free(bbstreamer *streamer);
const bbstreamer_ops bbstreamer_plain_writer_ops = {
.content = bbstreamer_plain_writer_content,
.finalize = bbstreamer_plain_writer_finalize,
.free = bbstreamer_plain_writer_free
};
#ifdef HAVE_LIBZ
static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
static const char *get_gz_error(gzFile gzf);
const bbstreamer_ops bbstreamer_gzip_writer_ops = {
.content = bbstreamer_gzip_writer_content,
.finalize = bbstreamer_gzip_writer_finalize,
.free = bbstreamer_gzip_writer_free
};
#endif
static void bbstreamer_extractor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_extractor_finalize(bbstreamer *streamer);
static void bbstreamer_extractor_free(bbstreamer *streamer);
static void extract_directory(const char *filename, mode_t mode);
static void extract_link(const char *filename, const char *linktarget);
static FILE *create_file_for_extract(const char *filename, mode_t mode);
const bbstreamer_ops bbstreamer_extractor_ops = {
.content = bbstreamer_extractor_content,
.finalize = bbstreamer_extractor_finalize,
.free = bbstreamer_extractor_free
};
/*
* Create a bbstreamer that just writes data to a file.
*
* The caller must specify a pathname and may specify a file. The pathname is
* used for error-reporting purposes either way. If file is NULL, the pathname
* also identifies the file to which the data should be written: it is opened
* for writing and closed when done. If file is not NULL, the data is written
* there.
*/
bbstreamer *
bbstreamer_plain_writer_new(char *pathname, FILE *file)
{
bbstreamer_plain_writer *streamer;
streamer = palloc0(sizeof(bbstreamer_plain_writer));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_plain_writer_ops;
streamer->pathname = pstrdup(pathname);
streamer->file = file;
if (file == NULL)
{
streamer->file = fopen(pathname, "wb");
if (streamer->file == NULL)
{
pg_log_error("could not create file \"%s\": %m", pathname);
exit(1);
}
streamer->should_close_file = true;
}
return &streamer->base;
}
/*
* Write archive content to file.
*/
static void
bbstreamer_plain_writer_content(bbstreamer *streamer,
bbstreamer_member *member, const char *data,
int len, bbstreamer_archive_context context)
{
bbstreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
if (len == 0)
return;
errno = 0;
if (fwrite(data, len, 1, mystreamer->file) != 1)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_log_error("could not write to file \"%s\": %m",
mystreamer->pathname);
exit(1);
}
}
/*
* End-of-archive processing when writing to a plain file consists of closing
* the file if we opened it, but not if the caller provided it.
*/
static void
bbstreamer_plain_writer_finalize(bbstreamer *streamer)
{
bbstreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
{
pg_log_error("could not close file \"%s\": %m",
mystreamer->pathname);
exit(1);
}
mystreamer->file = NULL;
mystreamer->should_close_file = false;
}
/*
* Free memory associated with this bbstreamer.
*/
static void
bbstreamer_plain_writer_free(bbstreamer *streamer)
{
bbstreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
Assert(!mystreamer->should_close_file);
Assert(mystreamer->base.bbs_next == NULL);
pfree(mystreamer->pathname);
pfree(mystreamer);
}
/*
* Create a bbstreamer that just compresses data using gzip, and then writes
* it to a file.
*
* As in the case of bbstreamer_plain_writer_new, pathname is always used
* for error reporting purposes; if file is NULL, it is also the opened and
* closed so that the data may be written there.
*/
bbstreamer *
bbstreamer_gzip_writer_new(char *pathname, FILE *file, int compresslevel)
{
#ifdef HAVE_LIBZ
bbstreamer_gzip_writer *streamer;
streamer = palloc0(sizeof(bbstreamer_gzip_writer));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_gzip_writer_ops;
streamer->pathname = pstrdup(pathname);
if (file == NULL)
{
streamer->gzfile = gzopen(pathname, "wb");
if (streamer->gzfile == NULL)
{
pg_log_error("could not create compressed file \"%s\": %m",
pathname);
exit(1);
}
}
else
{
int fd = dup(fileno(file));
if (fd < 0)
{
pg_log_error("could not duplicate stdout: %m");
exit(1);
}
streamer->gzfile = gzdopen(fd, "wb");
if (streamer->gzfile == NULL)
{
pg_log_error("could not open output file: %m");
exit(1);
}
}
if (gzsetparams(streamer->gzfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_log_error("could not set compression level %d: %s",
compresslevel, get_gz_error(streamer->gzfile));
exit(1);
}
return &streamer->base;
#else
pg_log_error("this build does not support compression");
exit(1);
#endif
}
#ifdef HAVE_LIBZ
/*
* Write archive content to gzip file.
*/
static void
bbstreamer_gzip_writer_content(bbstreamer *streamer,
bbstreamer_member *member, const char *data,
int len, bbstreamer_archive_context context)
{
bbstreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
if (len == 0)
return;
errno = 0;
if (gzwrite(mystreamer->gzfile, data, len) != len)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_log_error("could not write to compressed file \"%s\": %s",
mystreamer->pathname, get_gz_error(mystreamer->gzfile));
exit(1);
}
}
/*
* End-of-archive processing when writing to a gzip file consists of just
* calling gzclose.
*
* It makes no difference whether we opened the file or the caller did it,
* because libz provides no way of avoiding a close on the underling file
* handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
* work around this issue, so that the behavior from the caller's viewpoint
* is the same as for bbstreamer_plain_writer.
*/
static void
bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
{
bbstreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
if (gzclose(mystreamer->gzfile) != 0)
{
pg_log_error("could not close compressed file \"%s\": %s",
mystreamer->pathname,
get_gz_error(mystreamer->gzfile));
exit(1);
}
mystreamer->gzfile = NULL;
}
/*
* Free memory associated with this bbstreamer.
*/
static void
bbstreamer_gzip_writer_free(bbstreamer *streamer)
{
bbstreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
Assert(mystreamer->base.bbs_next == NULL);
Assert(mystreamer->gzfile == NULL);
pfree(mystreamer->pathname);
pfree(mystreamer);
}
/*
* Helper function for libz error reporting.
*/
static const char *
get_gz_error(gzFile gzf)
{
int errnum;
const char *errmsg;
errmsg = gzerror(gzf, &errnum);
if (errnum == Z_ERRNO)
return strerror(errno);
else
return errmsg;
}
#endif
/*
* Create a bbstreamer that extracts an archive.
*
* All pathnames in the archive are interpreted relative to basepath.
*
* Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here
* with untyped chunks; we need typed chunks which follow the rules described
* in bbstreamer.h. Assuming we have that, we don't need to worry about the
* original archive format; it's enough to just look at the member information
* provided and write to the corresponding file.
*
* 'link_map' is a function that will be applied to the target of any
* symbolic link, and which should return a replacement pathname to be used
* in its place. If NULL, the symbolic link target is used without
* modification.
*
* 'report_output_file' is a function that will be called each time we open a
* new output file. The pathname to that file is passed as an argument. If
* NULL, the call is skipped.
*/
bbstreamer *
bbstreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *))
{
bbstreamer_extractor *streamer;
streamer = palloc0(sizeof(bbstreamer_extractor));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_extractor_ops;
streamer->basepath = pstrdup(basepath);
streamer->link_map = link_map;
streamer->report_output_file = report_output_file;
return &streamer->base;
}
/*
* Extract archive contents to the filesystem.
*/
static void
bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
int fnamelen;
Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
Assert(context != BBSTREAMER_UNKNOWN);
switch (context)
{
case BBSTREAMER_MEMBER_HEADER:
Assert(mystreamer->file == NULL);
/* Prepend basepath. */
snprintf(mystreamer->filename, sizeof(mystreamer->filename),
"%s/%s", mystreamer->basepath, member->pathname);
/* Remove any trailing slash. */
fnamelen = strlen(mystreamer->filename);
if (mystreamer->filename[fnamelen - 1] == '/')
mystreamer->filename[fnamelen - 1] = '\0';
/* Dispatch based on file type. */
if (member->is_directory)
extract_directory(mystreamer->filename, member->mode);
else if (member->is_link)
{
const char *linktarget = member->linktarget;
if (mystreamer->link_map)
linktarget = mystreamer->link_map(linktarget);
extract_link(mystreamer->filename, linktarget);
}
else
mystreamer->file =
create_file_for_extract(mystreamer->filename,
member->mode);
/* Report output file change. */
if (mystreamer->report_output_file)
mystreamer->report_output_file(mystreamer->filename);
break;
case BBSTREAMER_MEMBER_CONTENTS:
if (mystreamer->file == NULL)
break;
errno = 0;
if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_log_error("could not write to file \"%s\": %m",
mystreamer->filename);
exit(1);
}
break;
case BBSTREAMER_MEMBER_TRAILER:
if (mystreamer->file == NULL)
break;
fclose(mystreamer->file);
mystreamer->file = NULL;
break;
case BBSTREAMER_ARCHIVE_TRAILER:
break;
default:
/* Shouldn't happen. */
pg_log_error("unexpected state while extracting archive");
exit(1);
}
}
/*
* Create a directory.
*/
static void
extract_directory(const char *filename, mode_t mode)
{
if (mkdir(filename, pg_dir_create_mode) != 0)
{
/*
* When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will
* have been created by the wal receiver process. Also, when the WAL
* directory location was specified, pg_wal (or pg_xlog) has already
* been created as a symbolic link before starting the actual backup.
* So just ignore creation failures on related directories.
*/
if (!((pg_str_endswith(filename, "/pg_wal") ||
pg_str_endswith(filename, "/pg_xlog") ||
pg_str_endswith(filename, "/archive_status")) &&
errno == EEXIST))
{
pg_log_error("could not create directory \"%s\": %m",
filename);
exit(1);
}
}
#ifndef WIN32
if (chmod(filename, mode))
{
pg_log_error("could not set permissions on directory \"%s\": %m",
filename);
exit(1);
}
#endif
}
/*
* Create a symbolic link.
*
* It's most likely a link in pg_tblspc directory, to the location of a
* tablespace. Apply any tablespace mapping given on the command line
* (--tablespace-mapping). (We blindly apply the mapping without checking that
* the link really is inside pg_tblspc. We don't expect there to be other
* symlinks in a data directory, but if there are, you can call it an
* undocumented feature that you can map them too.)
*/
static void
extract_link(const char *filename, const char *linktarget)
{
if (symlink(linktarget, filename) != 0)
{
pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
filename, linktarget);
exit(1);
}
}
/*
* Create a regular file.
*
* Return the resulting handle so we can write the content to the file.
*/
static FILE *
create_file_for_extract(const char *filename, mode_t mode)
{
FILE *file;
file = fopen(filename, "wb");
if (file == NULL)
{
pg_log_error("could not create file \"%s\": %m", filename);
exit(1);
}
#ifndef WIN32
if (chmod(filename, mode))
{
pg_log_error("could not set permissions on file \"%s\": %m",
filename);
exit(1);
}
#endif
return file;
}
/*
* End-of-stream processing for extracting an archive.
*
* There's nothing to do here but sanity checking.
*/
static void
bbstreamer_extractor_finalize(bbstreamer *streamer)
{
bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
Assert(mystreamer->file == NULL);
}
/*
* Free memory.
*/
static void
bbstreamer_extractor_free(bbstreamer *streamer)
{
bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
pfree(mystreamer->basepath);
pfree(mystreamer);
}

View File

@ -0,0 +1,250 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_inject.c
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_inject.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "bbstreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
typedef struct bbstreamer_recovery_injector
{
bbstreamer base;
bool skip_file;
bool is_recovery_guc_supported;
bool is_postgresql_auto_conf;
bool found_postgresql_auto_conf;
PQExpBuffer recoveryconfcontents;
bbstreamer_member member;
} bbstreamer_recovery_injector;
static void bbstreamer_recovery_injector_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer);
static void bbstreamer_recovery_injector_free(bbstreamer *streamer);
const bbstreamer_ops bbstreamer_recovery_injector_ops = {
.content = bbstreamer_recovery_injector_content,
.finalize = bbstreamer_recovery_injector_finalize,
.free = bbstreamer_recovery_injector_free
};
/*
* Create a bbstreamer that can edit recoverydata into an archive stream.
*
* The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as
* per the conventions described in bbstreamer.h; the chunks forwarded to
* the next bbstreamer will be similarly typed, but the
* BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
* edited the archive stream.
*
* Our goal is to do one of the following three things with the content passed
* via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then
* put the content into recovery.conf, replacing any existing archive member
* by that name; (2) if is_recovery_guc_supported is true and
* postgresql.auto.conf exists in the archive, then append the content
* provided to the existing file; and (3) if is_recovery_guc_supported is
* true but postgresql.auto.conf does not exist in the archive, then create
* it with the specified content.
*
* In addition, if is_recovery_guc_supported is true, then we create a
* zero-length standby.signal file, dropping any file with that name from
* the archive.
*/
extern bbstreamer *
bbstreamer_recovery_injector_new(bbstreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents)
{
bbstreamer_recovery_injector *streamer;
streamer = palloc0(sizeof(bbstreamer_recovery_injector));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_recovery_injector_ops;
streamer->base.bbs_next = next;
streamer->is_recovery_guc_supported = is_recovery_guc_supported;
streamer->recoveryconfcontents = recoveryconfcontents;
return &streamer->base;
}
/*
* Handle each chunk of tar content while injecting recovery configuration.
*/
static void
bbstreamer_recovery_injector_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_recovery_injector *mystreamer;
mystreamer = (bbstreamer_recovery_injector *) streamer;
Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
switch (context)
{
case BBSTREAMER_MEMBER_HEADER:
/* Must copy provided data so we have the option to modify it. */
memcpy(&mystreamer->member, member, sizeof(bbstreamer_member));
/*
* On v12+, skip standby.signal and edit postgresql.auto.conf; on
* older versions, skip recovery.conf.
*/
if (mystreamer->is_recovery_guc_supported)
{
mystreamer->skip_file =
(strcmp(member->pathname, "standby.signal") == 0);
mystreamer->is_postgresql_auto_conf =
(strcmp(member->pathname, "postgresql.auto.conf") == 0);
if (mystreamer->is_postgresql_auto_conf)
{
/* Remember we saw it so we don't add it again. */
mystreamer->found_postgresql_auto_conf = true;
/* Increment length by data to be injected. */
mystreamer->member.size +=
mystreamer->recoveryconfcontents->len;
/*
* Zap data and len because the archive header is no
* longer valid; some subsequent bbstreamer must
* regenerate it if it's necessary.
*/
data = NULL;
len = 0;
}
}
else
mystreamer->skip_file =
(strcmp(member->pathname, "recovery.conf") == 0);
/* Do not forward if the file is to be skipped. */
if (mystreamer->skip_file)
return;
break;
case BBSTREAMER_MEMBER_CONTENTS:
/* Do not forward if the file is to be skipped. */
if (mystreamer->skip_file)
return;
break;
case BBSTREAMER_MEMBER_TRAILER:
/* Do not forward it the file is to be skipped. */
if (mystreamer->skip_file)
return;
/* Append provided content to whatever we already sent. */
if (mystreamer->is_postgresql_auto_conf)
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len,
BBSTREAMER_MEMBER_CONTENTS);
break;
case BBSTREAMER_ARCHIVE_TRAILER:
if (mystreamer->is_recovery_guc_supported)
{
/*
* If we didn't already find (and thus modify)
* postgresql.auto.conf, inject it as an additional archive
* member now.
*/
if (!mystreamer->found_postgresql_auto_conf)
bbstreamer_inject_file(mystreamer->base.bbs_next,
"postgresql.auto.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
/* Inject empty standby.signal file. */
bbstreamer_inject_file(mystreamer->base.bbs_next,
"standby.signal", "", 0);
}
else
{
/* Inject recovery.conf file with specified contents. */
bbstreamer_inject_file(mystreamer->base.bbs_next,
"recovery.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
}
/* Nothing to do here. */
break;
default:
/* Shouldn't happen. */
pg_log_error("unexpected state while injecting recovery settings");
exit(1);
}
bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
data, len, context);
}
/*
* End-of-stream processing for this bbstreamer.
*/
static void
bbstreamer_recovery_injector_finalize(bbstreamer *streamer)
{
bbstreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with this bbstreamer.
*/
static void
bbstreamer_recovery_injector_free(bbstreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
pfree(streamer);
}
/*
* Inject a member into the archive with specified contents.
*/
void
bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
int len)
{
bbstreamer_member member;
strlcpy(member.pathname, pathname, MAXPGPATH);
member.size = len;
member.mode = pg_file_create_mode;
member.is_directory = false;
member.is_link = false;
member.linktarget[0] = '\0';
/*
* There seems to be no principled argument for these values, but they are
* what PostgreSQL has historically used.
*/
member.uid = 04000;
member.gid = 02000;
/*
* We don't know here how to generate valid member headers and trailers
* for the archiving format in use, so if those are needed, some successor
* bbstreamer will have to generate them using the data from 'member'.
*/
bbstreamer_content(streamer, &member, NULL, 0,
BBSTREAMER_MEMBER_HEADER);
bbstreamer_content(streamer, &member, data, len,
BBSTREAMER_MEMBER_CONTENTS);
bbstreamer_content(streamer, &member, NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
}

View File

@ -0,0 +1,444 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_tar.c
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_tar.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <time.h>
#include "bbstreamer.h"
#include "common/logging.h"
#include "pgtar.h"
typedef struct bbstreamer_tar_parser
{
bbstreamer base;
bbstreamer_archive_context next_context;
bbstreamer_member member;
size_t file_bytes_sent;
size_t pad_bytes_expected;
} bbstreamer_tar_parser;
typedef struct bbstreamer_tar_archiver
{
bbstreamer base;
bool rearchive_member;
} bbstreamer_tar_archiver;
static void bbstreamer_tar_parser_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_tar_parser_finalize(bbstreamer *streamer);
static void bbstreamer_tar_parser_free(bbstreamer *streamer);
static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer);
const bbstreamer_ops bbstreamer_tar_parser_ops = {
.content = bbstreamer_tar_parser_content,
.finalize = bbstreamer_tar_parser_finalize,
.free = bbstreamer_tar_parser_free
};
static void bbstreamer_tar_archiver_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer);
static void bbstreamer_tar_archiver_free(bbstreamer *streamer);
const bbstreamer_ops bbstreamer_tar_archiver_ops = {
.content = bbstreamer_tar_archiver_content,
.finalize = bbstreamer_tar_archiver_finalize,
.free = bbstreamer_tar_archiver_free
};
/*
* Create a bbstreamer that can parse a stream of content as tar data.
*
* The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer
* specified by 'next' will receive a series of typed chunks, as per the
* conventions described in bbstreamer.h.
*/
extern bbstreamer *
bbstreamer_tar_parser_new(bbstreamer *next)
{
bbstreamer_tar_parser *streamer;
streamer = palloc0(sizeof(bbstreamer_tar_parser));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_tar_parser_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
streamer->next_context = BBSTREAMER_MEMBER_HEADER;
return &streamer->base;
}
/*
* Parse unknown content as tar data.
*/
static void
bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
size_t nbytes;
/* Expect unparsed input. */
Assert(member == NULL);
Assert(context == BBSTREAMER_UNKNOWN);
while (len > 0)
{
switch (mystreamer->next_context)
{
case BBSTREAMER_MEMBER_HEADER:
/*
* If we're expecting an archive member header, accumulate a
* full block of data before doing anything further.
*/
if (!bbstreamer_buffer_until(streamer, &data, &len,
TAR_BLOCK_SIZE))
return;
/*
* Now we can process the header and get ready to process the
* file contents; however, we might find out that what we
* thought was the next file header is actually the start of
* the archive trailer. Switch modes accordingly.
*/
if (bbstreamer_tar_header(mystreamer))
{
if (mystreamer->member.size == 0)
{
/* No content; trailer is zero-length. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
}
else
{
/* Expect contents. */
mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS;
}
mystreamer->base.bbs_buffer.len = 0;
mystreamer->file_bytes_sent = 0;
}
else
mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER;
break;
case BBSTREAMER_MEMBER_CONTENTS:
/*
* Send as much content as we have, but not more than the
* remaining file length.
*/
Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
nbytes = Min(nbytes, len);
Assert(nbytes > 0);
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, nbytes,
BBSTREAMER_MEMBER_CONTENTS);
mystreamer->file_bytes_sent += nbytes;
data += nbytes;
len -= nbytes;
/*
* If we've not yet sent the whole file, then there's more
* content to come; otherwise, it's time to expect the file
* trailer.
*/
Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
if (mystreamer->file_bytes_sent == mystreamer->member.size)
{
if (mystreamer->pad_bytes_expected == 0)
{
/* Trailer is zero-length. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
}
else
{
/* Trailer is not zero-length. */
mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER;
}
mystreamer->base.bbs_buffer.len = 0;
}
break;
case BBSTREAMER_MEMBER_TRAILER:
/*
* If we're expecting an archive member trailer, accumulate
* the expected number of padding bytes before sending
* anything onward.
*/
if (!bbstreamer_buffer_until(streamer, &data, &len,
mystreamer->pad_bytes_expected))
return;
/* OK, now we can send it. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, mystreamer->pad_bytes_expected,
BBSTREAMER_MEMBER_TRAILER);
/* Expect next file header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
mystreamer->base.bbs_buffer.len = 0;
break;
case BBSTREAMER_ARCHIVE_TRAILER:
/*
* We've seen an end-of-archive indicator, so anything more is
* buffered and sent as part of the archive trailer. But we
* don't expect more than 2 blocks.
*/
bbstreamer_buffer_bytes(streamer, &data, &len, len);
if (len > 2 * TAR_BLOCK_SIZE)
{
pg_log_error("tar file trailer exceeds 2 blocks");
exit(1);
}
return;
default:
/* Shouldn't happen. */
pg_log_error("unexpected state while parsing tar archive");
exit(1);
}
}
}
/*
* Parse a file header within a tar stream.
*
* The return value is true if we found a file header and passed it on to the
* next bbstreamer; it is false if we have reached the archive trailer.
*/
static bool
bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
{
bool has_nonzero_byte = false;
int i;
bbstreamer_member *member = &mystreamer->member;
char *buffer = mystreamer->base.bbs_buffer.data;
Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
/* Check whether we've got a block of all zero bytes. */
for (i = 0; i < TAR_BLOCK_SIZE; ++i)
{
if (buffer[i] != '\0')
{
has_nonzero_byte = true;
break;
}
}
/*
* If the entire block was zeros, this is the end of the archive, not the
* start of the next file.
*/
if (!has_nonzero_byte)
return false;
/*
* Parse key fields out of the header.
*
* FIXME: It's terrible that we use hard-coded values here instead of some
* more principled approach. It's been like this for a long time, but we
* ought to do better.
*/
strlcpy(member->pathname, &buffer[0], MAXPGPATH);
if (member->pathname[0] == '\0')
{
pg_log_error("tar member has empty name");
exit(1);
}
member->size = read_tar_number(&buffer[124], 12);
member->mode = read_tar_number(&buffer[100], 8);
member->uid = read_tar_number(&buffer[108], 8);
member->gid = read_tar_number(&buffer[116], 8);
member->is_directory = (buffer[156] == '5');
member->is_link = (buffer[156] == '2');
if (member->is_link)
strlcpy(member->linktarget, &buffer[157], 100);
/* Compute number of padding bytes. */
mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
/* Forward the entire header to the next bbstreamer. */
bbstreamer_content(mystreamer->base.bbs_next, member,
buffer, TAR_BLOCK_SIZE,
BBSTREAMER_MEMBER_HEADER);
return true;
}
/*
* End-of-stream processing for a tar parser.
*/
static void
bbstreamer_tar_parser_finalize(bbstreamer *streamer)
{
bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER &&
(mystreamer->next_context != BBSTREAMER_MEMBER_HEADER ||
mystreamer->base.bbs_buffer.len > 0))
{
pg_log_error("COPY stream ended before last file was finished");
exit(1);
}
/* Send the archive trailer, even if empty. */
bbstreamer_content(streamer->bbs_next, NULL,
streamer->bbs_buffer.data, streamer->bbs_buffer.len,
BBSTREAMER_ARCHIVE_TRAILER);
/* Now finalize successor. */
bbstreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar parser.
*/
static void
bbstreamer_tar_parser_free(bbstreamer *streamer)
{
pfree(streamer->bbs_buffer.data);
bbstreamer_free(streamer->bbs_next);
}
/*
* Create an bbstreamer that can generate a tar archive.
*
* This is intended to be usable either for generating a brand-new tar archive
* or for modifying one on the fly. The input should be a series of typed
* chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for
* bbstreamer_tar_parser_content.
*/
extern bbstreamer *
bbstreamer_tar_archiver_new(bbstreamer *next)
{
bbstreamer_tar_archiver *streamer;
streamer = palloc0(sizeof(bbstreamer_tar_archiver));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_tar_archiver_ops;
streamer->base.bbs_next = next;
return &streamer->base;
}
/*
* Fix up the stream of input chunks to create a valid tar file.
*
* If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
* newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
* passed through without change. Any other size is a fatal error (and
* indicates a bug).
*
* Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the
* corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from
* scratch. Specifically, we construct a block of zero bytes sufficient to
* pad out to a block boundary, as required by the tar format. Other
* BBSTREAMER_MEMBER_TRAILER chunks are passed through without change.
*
* Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change.
*
* The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two
* blocks of zero bytes. Not all tar programs require this, but apparently
* some do. The server does not supply this trailer. If no archive trailer is
* present, one will be added by bbstreamer_tar_parser_finalize.
*/
static void
bbstreamer_tar_archiver_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer;
char buffer[2 * TAR_BLOCK_SIZE];
Assert(context != BBSTREAMER_UNKNOWN);
if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
{
Assert(len == 0);
/* Replace zero-length tar header with a newly constructed one. */
tarCreateHeader(buffer, member->pathname, NULL,
member->size, member->mode, member->uid, member->gid,
time(NULL));
data = buffer;
len = TAR_BLOCK_SIZE;
/* Also make a note to replace padding, in case size changed. */
mystreamer->rearchive_member = true;
}
else if (context == BBSTREAMER_MEMBER_TRAILER &&
mystreamer->rearchive_member)
{
int pad_bytes = tarPaddingBytesRequired(member->size);
/* Also replace padding, if we regenerated the header. */
memset(buffer, 0, pad_bytes);
data = buffer;
len = pad_bytes;
/* Don't do this agian unless we replace another header. */
mystreamer->rearchive_member = false;
}
else if (context == BBSTREAMER_ARCHIVE_TRAILER)
{
/* Trailer should always be two blocks of zero bytes. */
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
data = buffer;
len = 2 * TAR_BLOCK_SIZE;
}
bbstreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
* End-of-stream processing for a tar archiver.
*/
static void
bbstreamer_tar_archiver_finalize(bbstreamer *streamer)
{
bbstreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar archiver.
*/
static void
bbstreamer_tar_archiver_free(bbstreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
pfree(streamer);
}

File diff suppressed because it is too large Load Diff