pg_rewind: Refactor the abstraction to fetch from local/libpq source.

This makes the abstraction of a "source" server more clear, by introducing
a common abstract class, borrowing the object-oriented programming term,
that represents all the operations that can be done on the source server.
There are two implementations of it, one for fetching via libpq, and
another to fetch from a local directory. This adds some code, but makes it
easier to understand what's going on.

The copy_executeFileMap() and libpq_executeFileMap() functions contained
basically the same logic, just calling different functions to fetch the
source files. Refactor so that the common logic is in one place, in a new
function called perform_rewind().

Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
This commit is contained in:
Heikki Linnakangas 2020-11-04 11:21:18 +02:00
parent f81e97d047
commit 37d2ff3803
12 changed files with 700 additions and 634 deletions

View File

@ -20,12 +20,11 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \ OBJS = \
$(WIN32RES) \ $(WIN32RES) \
copy_fetch.o \
datapagemap.o \ datapagemap.o \
fetch.o \
file_ops.o \ file_ops.o \
filemap.o \ filemap.o \
libpq_fetch.o \ libpq_source.o \
local_source.o \
parsexlog.o \ parsexlog.o \
pg_rewind.o \ pg_rewind.o \
timeline.o \ timeline.o \

View File

@ -1,266 +0,0 @@
/*-------------------------------------------------------------------------
*
* copy_fetch.c
* Functions for using a data directory as the source.
*
* Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
#include "datapagemap.h"
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
static void recurse_dir(const char *datadir, const char *path,
process_file_callback_t callback);
static void execute_pagemap(datapagemap_t *pagemap, const char *path);
/*
* Traverse through all files in a data directory, calling 'callback'
* for each file.
*/
void
traverse_datadir(const char *datadir, process_file_callback_t callback)
{
recurse_dir(datadir, NULL, callback);
}
/*
* recursive part of traverse_datadir
*
* parentpath is the current subdirectory's path relative to datadir,
* or NULL at the top level.
*/
static void
recurse_dir(const char *datadir, const char *parentpath,
process_file_callback_t callback)
{
DIR *xldir;
struct dirent *xlde;
char fullparentpath[MAXPGPATH];
if (parentpath)
snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
else
snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
xldir = opendir(fullparentpath);
if (xldir == NULL)
pg_fatal("could not open directory \"%s\": %m",
fullparentpath);
while (errno = 0, (xlde = readdir(xldir)) != NULL)
{
struct stat fst;
char fullpath[MAXPGPATH * 2];
char path[MAXPGPATH * 2];
if (strcmp(xlde->d_name, ".") == 0 ||
strcmp(xlde->d_name, "..") == 0)
continue;
snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
if (lstat(fullpath, &fst) < 0)
{
if (errno == ENOENT)
{
/*
* File doesn't exist anymore. This is ok, if the new primary
* is running and the file was just removed. If it was a data
* file, there should be a WAL record of the removal. If it
* was something else, it couldn't have been anyway.
*
* TODO: But complain if we're processing the target dir!
*/
}
else
pg_fatal("could not stat file \"%s\": %m",
fullpath);
}
if (parentpath)
snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
else
snprintf(path, sizeof(path), "%s", xlde->d_name);
if (S_ISREG(fst.st_mode))
callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
else if (S_ISDIR(fst.st_mode))
{
callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
/* recurse to handle subdirectories */
recurse_dir(datadir, path, callback);
}
#ifndef WIN32
else if (S_ISLNK(fst.st_mode))
#else
else if (pgwin32_is_junction(fullpath))
#endif
{
#if defined(HAVE_READLINK) || defined(WIN32)
char link_target[MAXPGPATH];
int len;
len = readlink(fullpath, link_target, sizeof(link_target));
if (len < 0)
pg_fatal("could not read symbolic link \"%s\": %m",
fullpath);
if (len >= sizeof(link_target))
pg_fatal("symbolic link \"%s\" target is too long",
fullpath);
link_target[len] = '\0';
callback(path, FILE_TYPE_SYMLINK, 0, link_target);
/*
* If it's a symlink within pg_tblspc, we need to recurse into it,
* to process all the tablespaces. We also follow a symlink if
* it's for pg_wal. Symlinks elsewhere are ignored.
*/
if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
strcmp(path, "pg_wal") == 0)
recurse_dir(datadir, path, callback);
#else
pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
fullpath);
#endif /* HAVE_READLINK */
}
}
if (errno)
pg_fatal("could not read directory \"%s\": %m",
fullparentpath);
if (closedir(xldir))
pg_fatal("could not close directory \"%s\": %m",
fullparentpath);
}
/*
* Copy a file from source to target, between 'begin' and 'end' offsets.
*
* If 'trunc' is true, any existing file with the same name is truncated.
*/
static void
rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
{
PGAlignedBlock buf;
char srcpath[MAXPGPATH];
int srcfd;
snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
pg_fatal("could not open source file \"%s\": %m",
srcpath);
if (lseek(srcfd, begin, SEEK_SET) == -1)
pg_fatal("could not seek in source file: %m");
open_target_file(path, trunc);
while (end - begin > 0)
{
int readlen;
int len;
if (end - begin > sizeof(buf))
len = sizeof(buf);
else
len = end - begin;
readlen = read(srcfd, buf.data, len);
if (readlen < 0)
pg_fatal("could not read file \"%s\": %m",
srcpath);
else if (readlen == 0)
pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
write_target_range(buf.data, begin, readlen);
begin += readlen;
}
if (close(srcfd) != 0)
pg_fatal("could not close file \"%s\": %m", srcpath);
}
/*
* Copy all relation data files from datadir_source to datadir_target, which
* are marked in the given data page map.
*/
void
copy_executeFileMap(filemap_t *map)
{
file_entry_t *entry;
int i;
for (i = 0; i < map->nentries; i++)
{
entry = map->entries[i];
execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
switch (entry->action)
{
case FILE_ACTION_NONE:
/* ok, do nothing.. */
break;
case FILE_ACTION_COPY:
rewind_copy_file_range(entry->path, 0, entry->source_size, true);
break;
case FILE_ACTION_TRUNCATE:
truncate_target_file(entry->path, entry->source_size);
break;
case FILE_ACTION_COPY_TAIL:
rewind_copy_file_range(entry->path, entry->target_size,
entry->source_size, false);
break;
case FILE_ACTION_CREATE:
create_target(entry);
break;
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
case FILE_ACTION_UNDECIDED:
pg_fatal("no action decided for \"%s\"", entry->path);
break;
}
}
close_target_file();
}
static void
execute_pagemap(datapagemap_t *pagemap, const char *path)
{
datapagemap_iterator_t *iter;
BlockNumber blkno;
off_t offset;
iter = datapagemap_iterate(pagemap);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
rewind_copy_file_range(path, offset, offset + BLCKSZ, false);
/* Ok, this block has now been copied from new data dir to old */
}
pg_free(iter);
}

View File

@ -1,60 +0,0 @@
/*-------------------------------------------------------------------------
*
* fetch.c
* Functions for fetching files from a local or remote data dir
*
* This file forms an abstraction of getting files from the "source".
* There are two implementations of this interface: one for copying files
* from a data directory via normal filesystem operations (copy_fetch.c),
* and another for fetching files from a remote server via a libpq
* connection (libpq_fetch.c)
*
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <unistd.h>
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
void
fetchSourceFileList(void)
{
if (datadir_source)
traverse_datadir(datadir_source, &process_source_file);
else
libpqProcessFileList();
}
/*
* Fetch all relation data files that are marked in the given data page map.
*/
void
execute_file_actions(filemap_t *filemap)
{
if (datadir_source)
copy_executeFileMap(filemap);
else
libpq_executeFileMap(filemap);
}
/*
* Fetch a single file into a malloc'd buffer. The file size is returned
* in *filesize. The returned buffer is always zero-terminated, which is
* handy for text files.
*/
char *
fetchFile(const char *filename, size_t *filesize)
{
if (datadir_source)
return slurpFile(datadir_source, filename, filesize);
else
return libpqGetFile(filename, filesize);
}

View File

@ -1,44 +0,0 @@
/*-------------------------------------------------------------------------
*
* fetch.h
* Fetching data from a local or remote data directory.
*
* This file includes the prototypes for functions used to copy files from
* one data directory to another. The source to copy from can be a local
* directory (copy method), or a remote PostgreSQL server (libpq fetch
* method).
*
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef FETCH_H
#define FETCH_H
#include "access/xlogdefs.h"
#include "filemap.h"
/*
* Common interface. Calls the copy or libpq method depending on global
* config options.
*/
extern void fetchSourceFileList(void);
extern char *fetchFile(const char *filename, size_t *filesize);
extern void execute_file_actions(filemap_t *filemap);
/* in libpq_fetch.c */
extern void libpqProcessFileList(void);
extern char *libpqGetFile(const char *filename, size_t *filesize);
extern void libpq_executeFileMap(filemap_t *map);
extern void libpqConnect(const char *connstr);
extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
/* in copy_fetch.c */
extern void copy_executeFileMap(filemap_t *map);
typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
#endif /* FETCH_H */

View File

@ -15,6 +15,7 @@
#include "postgres_fe.h" #include "postgres_fe.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
@ -35,6 +36,9 @@ static void remove_target_dir(const char *path);
static void create_target_symlink(const char *path, const char *link); static void create_target_symlink(const char *path, const char *link);
static void remove_target_symlink(const char *path); static void remove_target_symlink(const char *path);
static void recurse_dir(const char *datadir, const char *parentpath,
process_file_callback_t callback);
/* /*
* Open a target file for writing. If 'trunc' is true and the file already * Open a target file for writing. If 'trunc' is true and the file already
* exists, it will be truncated. * exists, it will be truncated.
@ -83,7 +87,7 @@ close_target_file(void)
void void
write_target_range(char *buf, off_t begin, size_t size) write_target_range(char *buf, off_t begin, size_t size)
{ {
int writeleft; size_t writeleft;
char *p; char *p;
/* update progress report */ /* update progress report */
@ -101,7 +105,7 @@ write_target_range(char *buf, off_t begin, size_t size)
p = buf; p = buf;
while (writeleft > 0) while (writeleft > 0)
{ {
int writelen; ssize_t writelen;
errno = 0; errno = 0;
writelen = write(dstfd, p, writeleft); writelen = write(dstfd, p, writeleft);
@ -305,9 +309,6 @@ sync_target_dir(void)
* buffer is actually *filesize + 1. That's handy when reading a text file. * buffer is actually *filesize + 1. That's handy when reading a text file.
* This function can be used to read binary files as well, you can just * This function can be used to read binary files as well, you can just
* ignore the zero-terminator in that case. * ignore the zero-terminator in that case.
*
* This function is used to implement the fetchFile function in the "fetch"
* interface (see fetch.c), but is also called directly.
*/ */
char * char *
slurpFile(const char *datadir, const char *path, size_t *filesize) slurpFile(const char *datadir, const char *path, size_t *filesize)
@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize)
*filesize = len; *filesize = len;
return buffer; return buffer;
} }
/*
* Traverse through all files in a data directory, calling 'callback'
* for each file.
*/
void
traverse_datadir(const char *datadir, process_file_callback_t callback)
{
recurse_dir(datadir, NULL, callback);
}
/*
* recursive part of traverse_datadir
*
* parentpath is the current subdirectory's path relative to datadir,
* or NULL at the top level.
*/
static void
recurse_dir(const char *datadir, const char *parentpath,
process_file_callback_t callback)
{
DIR *xldir;
struct dirent *xlde;
char fullparentpath[MAXPGPATH];
if (parentpath)
snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
else
snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
xldir = opendir(fullparentpath);
if (xldir == NULL)
pg_fatal("could not open directory \"%s\": %m",
fullparentpath);
while (errno = 0, (xlde = readdir(xldir)) != NULL)
{
struct stat fst;
char fullpath[MAXPGPATH * 2];
char path[MAXPGPATH * 2];
if (strcmp(xlde->d_name, ".") == 0 ||
strcmp(xlde->d_name, "..") == 0)
continue;
snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
if (lstat(fullpath, &fst) < 0)
{
if (errno == ENOENT)
{
/*
* File doesn't exist anymore. This is ok, if the new primary
* is running and the file was just removed. If it was a data
* file, there should be a WAL record of the removal. If it
* was something else, it couldn't have been anyway.
*
* TODO: But complain if we're processing the target dir!
*/
}
else
pg_fatal("could not stat file \"%s\": %m",
fullpath);
}
if (parentpath)
snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
else
snprintf(path, sizeof(path), "%s", xlde->d_name);
if (S_ISREG(fst.st_mode))
callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
else if (S_ISDIR(fst.st_mode))
{
callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
/* recurse to handle subdirectories */
recurse_dir(datadir, path, callback);
}
#ifndef WIN32
else if (S_ISLNK(fst.st_mode))
#else
else if (pgwin32_is_junction(fullpath))
#endif
{
#if defined(HAVE_READLINK) || defined(WIN32)
char link_target[MAXPGPATH];
int len;
len = readlink(fullpath, link_target, sizeof(link_target));
if (len < 0)
pg_fatal("could not read symbolic link \"%s\": %m",
fullpath);
if (len >= sizeof(link_target))
pg_fatal("symbolic link \"%s\" target is too long",
fullpath);
link_target[len] = '\0';
callback(path, FILE_TYPE_SYMLINK, 0, link_target);
/*
* If it's a symlink within pg_tblspc, we need to recurse into it,
* to process all the tablespaces. We also follow a symlink if
* it's for pg_wal. Symlinks elsewhere are ignored.
*/
if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
strcmp(path, "pg_wal") == 0)
recurse_dir(datadir, path, callback);
#else
pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
fullpath);
#endif /* HAVE_READLINK */
}
}
if (errno)
pg_fatal("could not read directory \"%s\": %m",
fullparentpath);
if (closedir(xldir))
pg_fatal("could not close directory \"%s\": %m",
fullparentpath);
}

View File

@ -23,4 +23,7 @@ extern void sync_target_dir(void);
extern char *slurpFile(const char *datadir, const char *path, size_t *filesize); extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
#endif /* FILE_OPS_H */ #endif /* FILE_OPS_H */

View File

@ -1,7 +1,7 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* libpq_fetch.c * libpq_source.c
* Functions for fetching files from a remote server. * Functions for fetching files from a remote server via libpq.
* *
* Copyright (c) 2013-2020, PostgreSQL Global Development Group * Copyright (c) 2013-2020, PostgreSQL Global Development Group
* *
@ -9,21 +9,14 @@
*/ */
#include "postgres_fe.h" #include "postgres_fe.h"
#include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
#include "catalog/pg_type_d.h" #include "catalog/pg_type_d.h"
#include "common/connect.h" #include "common/connect.h"
#include "datapagemap.h" #include "datapagemap.h"
#include "fetch.h"
#include "file_ops.h" #include "file_ops.h"
#include "filemap.h" #include "filemap.h"
#include "pg_rewind.h" #include "pg_rewind.h"
#include "port/pg_bswap.h" #include "port/pg_bswap.h"
#include "rewind_source.h"
PGconn *conn = NULL;
/* /*
* Files are fetched max CHUNKSIZE bytes at a time. * Files are fetched max CHUNKSIZE bytes at a time.
@ -34,30 +27,71 @@ PGconn *conn = NULL;
*/ */
#define CHUNKSIZE 1000000 #define CHUNKSIZE 1000000
static void receiveFileChunks(const char *sql); typedef struct
static void execute_pagemap(datapagemap_t *pagemap, const char *path); {
static char *run_simple_query(const char *sql); rewind_source common; /* common interface functions */
static void run_simple_command(const char *sql);
PGconn *conn;
void bool copy_started;
libpqConnect(const char *connstr) } libpq_source;
static void init_libpq_conn(PGconn *conn);
static char *run_simple_query(PGconn *conn, const char *sql);
static void run_simple_command(PGconn *conn, const char *sql);
/* public interface functions */
static void libpq_traverse_files(rewind_source *source,
process_file_callback_t callback);
static void libpq_queue_fetch_range(rewind_source *source, const char *path,
off_t off, size_t len);
static void libpq_finish_fetch(rewind_source *source);
static char *libpq_fetch_file(rewind_source *source, const char *path,
size_t *filesize);
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
static void libpq_destroy(rewind_source *source);
/*
* Create a new libpq source.
*
* The caller has already established the connection, but should not try
* to use it while the source is active.
*/
rewind_source *
init_libpq_source(PGconn *conn)
{
libpq_source *src;
init_libpq_conn(conn);
src = pg_malloc0(sizeof(libpq_source));
src->common.traverse_files = libpq_traverse_files;
src->common.fetch_file = libpq_fetch_file;
src->common.queue_fetch_range = libpq_queue_fetch_range;
src->common.finish_fetch = libpq_finish_fetch;
src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
src->common.destroy = libpq_destroy;
src->conn = conn;
return &src->common;
}
/*
* Initialize a libpq connection for use.
*/
static void
init_libpq_conn(PGconn *conn)
{ {
char *str;
PGresult *res; PGresult *res;
char *str;
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
pg_fatal("could not connect to server: %s",
PQerrorMessage(conn));
if (showprogress)
pg_log_info("connected to server");
/* disable all types of timeouts */ /* disable all types of timeouts */
run_simple_command("SET statement_timeout = 0"); run_simple_command(conn, "SET statement_timeout = 0");
run_simple_command("SET lock_timeout = 0"); run_simple_command(conn, "SET lock_timeout = 0");
run_simple_command("SET idle_in_transaction_session_timeout = 0"); run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
/* secure search_path */
res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not clear search_path: %s", pg_fatal("could not clear search_path: %s",
@ -70,7 +104,7 @@ libpqConnect(const char *connstr)
* currently because we use a temporary table. Better to check for it * currently because we use a temporary table. Better to check for it
* explicitly than error out, for a better error message. * explicitly than error out, for a better error message.
*/ */
str = run_simple_query("SELECT pg_is_in_recovery()"); str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
if (strcmp(str, "f") != 0) if (strcmp(str, "f") != 0)
pg_fatal("source server must not be in recovery mode"); pg_fatal("source server must not be in recovery mode");
pg_free(str); pg_free(str);
@ -80,27 +114,19 @@ libpqConnect(const char *connstr)
* a page is modified while we read it with pg_read_binary_file(), and we * a page is modified while we read it with pg_read_binary_file(), and we
* rely on full page images to fix them. * rely on full page images to fix them.
*/ */
str = run_simple_query("SHOW full_page_writes"); str = run_simple_query(conn, "SHOW full_page_writes");
if (strcmp(str, "on") != 0) if (strcmp(str, "on") != 0)
pg_fatal("full_page_writes must be enabled in the source server"); pg_fatal("full_page_writes must be enabled in the source server");
pg_free(str); pg_free(str);
/*
* Although we don't do any "real" updates, we do work with a temporary
* table. We don't care about synchronous commit for that. It doesn't
* otherwise matter much, but if the server is using synchronous
* replication, and replication isn't working for some reason, we don't
* want to get stuck, waiting for it to start working again.
*/
run_simple_command("SET synchronous_commit = off");
} }
/* /*
* Runs a query that returns a single value. * Run a query that returns a single value.
*
* The result should be pg_free'd after use. * The result should be pg_free'd after use.
*/ */
static char * static char *
run_simple_query(const char *sql) run_simple_query(PGconn *conn, const char *sql)
{ {
PGresult *res; PGresult *res;
char *result; char *result;
@ -123,11 +149,12 @@ run_simple_query(const char *sql)
} }
/* /*
* Runs a command. * Run a command.
*
* In the event of a failure, exit immediately. * In the event of a failure, exit immediately.
*/ */
static void static void
run_simple_command(const char *sql) run_simple_command(PGconn *conn, const char *sql)
{ {
PGresult *res; PGresult *res;
@ -141,17 +168,18 @@ run_simple_command(const char *sql)
} }
/* /*
* Calls pg_current_wal_insert_lsn() function * Call the pg_current_wal_insert_lsn() function in the remote system.
*/ */
XLogRecPtr static XLogRecPtr
libpqGetCurrentXlogInsertLocation(void) libpq_get_current_wal_insert_lsn(rewind_source *source)
{ {
PGconn *conn = ((libpq_source *) source)->conn;
XLogRecPtr result; XLogRecPtr result;
uint32 hi; uint32 hi;
uint32 lo; uint32 lo;
char *val; char *val;
val = run_simple_query("SELECT pg_current_wal_insert_lsn()"); val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
if (sscanf(val, "%X/%X", &hi, &lo) != 2) if (sscanf(val, "%X/%X", &hi, &lo) != 2)
pg_fatal("unrecognized result \"%s\" for current WAL insert location", val); pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
@ -166,9 +194,10 @@ libpqGetCurrentXlogInsertLocation(void)
/* /*
* Get a list of all files in the data directory. * Get a list of all files in the data directory.
*/ */
void static void
libpqProcessFileList(void) libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
{ {
PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res; PGresult *res;
const char *sql; const char *sql;
int i; int i;
@ -246,30 +275,114 @@ libpqProcessFileList(void)
PQclear(res); PQclear(res);
} }
/*---- /*
* Runs a query, which returns pieces of files from the remote source data * Queue up a request to fetch a piece of a file from remote system.
* directory, and overwrites the corresponding parts of target files with
* the received parts. The result set is expected to be of format:
*
* path text -- path in the data directory, e.g "base/1/123"
* begin int8 -- offset within the file
* chunk bytea -- file content
*----
*/ */
static void static void
receiveFileChunks(const char *sql) libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
size_t len)
{ {
PGresult *res; libpq_source *src = (libpq_source *) source;
uint64 begin = off;
uint64 end = off + len;
if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1) /*
pg_fatal("could not send query: %s", PQerrorMessage(conn)); * On first call, create a temporary table, and start COPYing to it.
* We will load it with the list of blocks that we need to fetch.
*/
if (!src->copy_started)
{
PGresult *res;
run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
if (PQresultStatus(res) != PGRES_COPY_IN)
pg_fatal("could not send file list: %s",
PQresultErrorMessage(res));
PQclear(res);
src->copy_started = true;
}
/*
* Write the file range to a temporary table in the server.
*
* The range is sent to the server as a COPY formatted line, to be inserted
* into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
* the temporary table to actually fetch the data.
*/
/* Split the range into CHUNKSIZE chunks */
while (end - begin > 0)
{
char linebuf[MAXPGPATH + 23];
unsigned int len;
/* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
if (end - begin > CHUNKSIZE)
len = CHUNKSIZE;
else
len = (unsigned int) (end - begin);
snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
pg_fatal("could not send COPY data: %s",
PQerrorMessage(src->conn));
begin += len;
}
}
/*
* Receive all the queued chunks and write them to the target data directory.
*/
static void
libpq_finish_fetch(rewind_source *source)
{
libpq_source *src = (libpq_source *) source;
PGresult *res;
const char *sql;
if (PQputCopyEnd(src->conn, NULL) != 1)
pg_fatal("could not send end-of-COPY: %s",
PQerrorMessage(src->conn));
while ((res = PQgetResult(src->conn)) != NULL)
{
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("unexpected result while sending file list: %s",
PQresultErrorMessage(res));
PQclear(res);
}
/*
* We've now copied the list of file ranges that we need to fetch to the
* temporary table. Now, actually fetch all of those ranges.
*/
sql =
"SELECT path, begin,\n"
" pg_read_binary_file(path, begin, len, true) AS chunk\n"
"FROM fetchchunks\n";
if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
pg_log_debug("getting file chunks"); pg_log_debug("getting file chunks");
if (PQsetSingleRowMode(conn) != 1) if (PQsetSingleRowMode(src->conn) != 1)
pg_fatal("could not set libpq connection to single row mode"); pg_fatal("could not set libpq connection to single row mode");
while ((res = PQgetResult(conn)) != NULL) /*----
* The result set is of format:
*
* path text -- path in the data directory, e.g "base/1/123"
* begin int8 -- offset within the file
* chunk bytea -- file content
*----
*/
while ((res = PQgetResult(src->conn)) != NULL)
{ {
char *filename; char *filename;
int filenamelen; int filenamelen;
@ -349,8 +462,8 @@ receiveFileChunks(const char *sql)
continue; continue;
} }
pg_log_debug("received chunk for file \"%s\", offset %lld, size %d", pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
filename, (long long int) chunkoff, chunksize); filename, chunkoff, chunksize);
open_target_file(filename, false); open_target_file(filename, false);
@ -363,28 +476,29 @@ receiveFileChunks(const char *sql)
} }
/* /*
* Receive a single file as a malloc'd buffer. * Fetch a single file as a malloc'd buffer.
*/ */
char * static char *
libpqGetFile(const char *filename, size_t *filesize) libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
{ {
PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res; PGresult *res;
char *result; char *result;
int len; int len;
const char *paramValues[1]; const char *paramValues[1];
paramValues[0] = filename; paramValues[0] = path;
res = PQexecParams(conn, "SELECT pg_read_binary_file($1)", res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
1, NULL, paramValues, NULL, NULL, 1); 1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not fetch remote file \"%s\": %s", pg_fatal("could not fetch remote file \"%s\": %s",
filename, PQresultErrorMessage(res)); path, PQresultErrorMessage(res));
/* sanity check the result set */ /* sanity check the result set */
if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0)) if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
pg_fatal("unexpected result set while fetching remote file \"%s\"", pg_fatal("unexpected result set while fetching remote file \"%s\"",
filename); path);
/* Read result to local variables */ /* Read result to local variables */
len = PQgetlength(res, 0, 0); len = PQgetlength(res, 0, 0);
@ -394,7 +508,7 @@ libpqGetFile(const char *filename, size_t *filesize)
PQclear(res); PQclear(res);
pg_log_debug("fetched file \"%s\", length %d", filename, len); pg_log_debug("fetched file \"%s\", length %d", path, len);
if (filesize) if (filesize)
*filesize = len; *filesize = len;
@ -402,142 +516,11 @@ libpqGetFile(const char *filename, size_t *filesize)
} }
/* /*
* Write a file range to a temporary table in the server. * Close a libpq source.
*
* The range is sent to the server as a COPY formatted line, to be inserted
* into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
* function to actually fetch the data.
*/ */
static void static void
fetch_file_range(const char *path, uint64 begin, uint64 end) libpq_destroy(rewind_source *source)
{ {
char linebuf[MAXPGPATH + 23]; pfree(source);
/* NOTE: we don't close the connection here, as it was not opened by us. */
/* Split the range into CHUNKSIZE chunks */
while (end - begin > 0)
{
unsigned int len;
/* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
if (end - begin > CHUNKSIZE)
len = CHUNKSIZE;
else
len = (unsigned int) (end - begin);
snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
pg_fatal("could not send COPY data: %s",
PQerrorMessage(conn));
begin += len;
}
}
/*
* Fetch all changed blocks from remote source data directory.
*/
void
libpq_executeFileMap(filemap_t *map)
{
file_entry_t *entry;
const char *sql;
PGresult *res;
int i;
/*
* First create a temporary table, and load it with the blocks that we
* need to fetch.
*/
sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
run_simple_command(sql);
sql = "COPY fetchchunks FROM STDIN";
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COPY_IN)
pg_fatal("could not send file list: %s",
PQresultErrorMessage(res));
PQclear(res);
for (i = 0; i < map->nentries; i++)
{
entry = map->entries[i];
/* If this is a relation file, copy the modified blocks */
execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
switch (entry->action)
{
case FILE_ACTION_NONE:
/* nothing else to do */
break;
case FILE_ACTION_COPY:
/* Truncate the old file out of the way, if any */
open_target_file(entry->path, true);
fetch_file_range(entry->path, 0, entry->source_size);
break;
case FILE_ACTION_TRUNCATE:
truncate_target_file(entry->path, entry->source_size);
break;
case FILE_ACTION_COPY_TAIL:
fetch_file_range(entry->path, entry->target_size, entry->source_size);
break;
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
case FILE_ACTION_CREATE:
create_target(entry);
break;
case FILE_ACTION_UNDECIDED:
pg_fatal("no action decided for \"%s\"", entry->path);
break;
}
}
if (PQputCopyEnd(conn, NULL) != 1)
pg_fatal("could not send end-of-COPY: %s",
PQerrorMessage(conn));
while ((res = PQgetResult(conn)) != NULL)
{
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("unexpected result while sending file list: %s",
PQresultErrorMessage(res));
PQclear(res);
}
/*
* We've now copied the list of file ranges that we need to fetch to the
* temporary table. Now, actually fetch all of those ranges.
*/
sql =
"SELECT path, begin,\n"
" pg_read_binary_file(path, begin, len, true) AS chunk\n"
"FROM fetchchunks\n";
receiveFileChunks(sql);
}
static void
execute_pagemap(datapagemap_t *pagemap, const char *path)
{
datapagemap_iterator_t *iter;
BlockNumber blkno;
off_t offset;
iter = datapagemap_iterate(pagemap);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
fetch_file_range(path, offset, offset + BLCKSZ);
}
pg_free(iter);
} }

View File

@ -0,0 +1,131 @@
/*-------------------------------------------------------------------------
*
* local_source.c
* Functions for using a local data directory as the source.
*
* Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <fcntl.h>
#include <unistd.h>
#include "datapagemap.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
#include "rewind_source.h"
typedef struct
{
rewind_source common; /* common interface functions */
const char *datadir; /* path to the source data directory */
} local_source;
static void local_traverse_files(rewind_source *source,
process_file_callback_t callback);
static char *local_fetch_file(rewind_source *source, const char *path,
size_t *filesize);
static void local_fetch_file_range(rewind_source *source, const char *path,
off_t off, size_t len);
static void local_finish_fetch(rewind_source *source);
static void local_destroy(rewind_source *source);
rewind_source *
init_local_source(const char *datadir)
{
local_source *src;
src = pg_malloc0(sizeof(local_source));
src->common.traverse_files = local_traverse_files;
src->common.fetch_file = local_fetch_file;
src->common.queue_fetch_range = local_fetch_file_range;
src->common.finish_fetch = local_finish_fetch;
src->common.get_current_wal_insert_lsn = NULL;
src->common.destroy = local_destroy;
src->datadir = datadir;
return &src->common;
}
static void
local_traverse_files(rewind_source *source, process_file_callback_t callback)
{
traverse_datadir(((local_source *) source)->datadir, &process_source_file);
}
static char *
local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
{
return slurpFile(((local_source *) source)->datadir, path, filesize);
}
/*
* Copy a file from source to target, starting at 'off', for 'len' bytes.
*/
static void
local_fetch_file_range(rewind_source *source, const char *path, off_t off,
size_t len)
{
const char *datadir = ((local_source *) source)->datadir;
PGAlignedBlock buf;
char srcpath[MAXPGPATH];
int srcfd;
off_t begin = off;
off_t end = off + len;
snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
pg_fatal("could not open source file \"%s\": %m",
srcpath);
if (lseek(srcfd, begin, SEEK_SET) == -1)
pg_fatal("could not seek in source file: %m");
open_target_file(path, false);
while (end - begin > 0)
{
ssize_t readlen;
size_t len;
if (end - begin > sizeof(buf))
len = sizeof(buf);
else
len = end - begin;
readlen = read(srcfd, buf.data, len);
if (readlen < 0)
pg_fatal("could not read file \"%s\": %m", srcpath);
else if (readlen == 0)
pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
write_target_range(buf.data, begin, readlen);
begin += readlen;
}
if (close(srcfd) != 0)
pg_fatal("could not close file \"%s\": %m", srcpath);
}
static void
local_finish_fetch(rewind_source *source)
{
/*
* Nothing to do, local_fetch_file_range() copies the ranges immediately.
*/
}
static void
local_destroy(rewind_source *source)
{
pfree(source);
}

View File

@ -23,20 +23,25 @@
#include "common/restricted_token.h" #include "common/restricted_token.h"
#include "common/string.h" #include "common/string.h"
#include "fe_utils/recovery_gen.h" #include "fe_utils/recovery_gen.h"
#include "fetch.h"
#include "file_ops.h" #include "file_ops.h"
#include "filemap.h" #include "filemap.h"
#include "getopt_long.h" #include "getopt_long.h"
#include "pg_rewind.h" #include "pg_rewind.h"
#include "rewind_source.h"
#include "storage/bufpage.h" #include "storage/bufpage.h"
static void usage(const char *progname); static void usage(const char *progname);
static void perform_rewind(filemap_t *filemap, rewind_source *source,
XLogRecPtr chkptrec,
TimeLineID chkpttli,
XLogRecPtr chkptredo);
static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli, static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
XLogRecPtr checkpointloc); XLogRecPtr checkpointloc);
static void digestControlFile(ControlFileData *ControlFile, char *source, static void digestControlFile(ControlFileData *ControlFile,
size_t size); const char *content, size_t size);
static void getRestoreCommand(const char *argv0); static void getRestoreCommand(const char *argv0);
static void sanityChecks(void); static void sanityChecks(void);
static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex); static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@ -69,6 +74,8 @@ int targetNentries;
uint64 fetch_size; uint64 fetch_size;
uint64 fetch_done; uint64 fetch_done;
static PGconn *conn;
static rewind_source *source;
static void static void
usage(const char *progname) usage(const char *progname)
@ -125,9 +132,6 @@ main(int argc, char **argv)
char *buffer; char *buffer;
bool no_ensure_shutdown = false; bool no_ensure_shutdown = false;
bool rewind_needed; bool rewind_needed;
XLogRecPtr endrec;
TimeLineID endtli;
ControlFileData ControlFile_new;
bool writerecoveryconf = false; bool writerecoveryconf = false;
filemap_t *filemap; filemap_t *filemap;
@ -269,19 +273,29 @@ main(int argc, char **argv)
atexit(disconnect_atexit); atexit(disconnect_atexit);
/* Connect to remote server */
if (connstr_source)
libpqConnect(connstr_source);
/* /*
* Ok, we have all the options and we're ready to start. Read in all the * Ok, we have all the options and we're ready to start. First, connect to
* information we need from both clusters. * remote server.
*/ */
buffer = slurpFile(datadir_target, "global/pg_control", &size); if (connstr_source)
digestControlFile(&ControlFile_target, buffer, size); {
pg_free(buffer); conn = PQconnectdb(connstr_source);
if (PQstatus(conn) == CONNECTION_BAD)
pg_fatal("could not connect to server: %s",
PQerrorMessage(conn));
if (showprogress)
pg_log_info("connected to server");
source = init_libpq_source(conn);
}
else
source = init_local_source(datadir_source);
/* /*
* Check the status of the target instance.
*
* If the target instance was not cleanly shut down, start and stop the * If the target instance was not cleanly shut down, start and stop the
* target cluster once in single-user mode to enforce recovery to finish, * target cluster once in single-user mode to enforce recovery to finish,
* ensuring that the cluster can be used by pg_rewind. Note that if * ensuring that the cluster can be used by pg_rewind. Note that if
@ -289,6 +303,10 @@ main(int argc, char **argv)
* need to make sure by themselves that the target cluster is in a clean * need to make sure by themselves that the target cluster is in a clean
* state. * state.
*/ */
buffer = slurpFile(datadir_target, "global/pg_control", &size);
digestControlFile(&ControlFile_target, buffer, size);
pg_free(buffer);
if (!no_ensure_shutdown && if (!no_ensure_shutdown &&
ControlFile_target.state != DB_SHUTDOWNED && ControlFile_target.state != DB_SHUTDOWNED &&
ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY) ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY)
@ -300,17 +318,20 @@ main(int argc, char **argv)
pg_free(buffer); pg_free(buffer);
} }
buffer = fetchFile("global/pg_control", &size); buffer = source->fetch_file(source, "global/pg_control", &size);
digestControlFile(&ControlFile_source, buffer, size); digestControlFile(&ControlFile_source, buffer, size);
pg_free(buffer); pg_free(buffer);
sanityChecks(); sanityChecks();
/* /*
* Find the common ancestor timeline between the clusters.
*
* If both clusters are already on the same timeline, there's nothing to * If both clusters are already on the same timeline, there's nothing to
* do. * do.
*/ */
if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID) if (ControlFile_target.checkPointCopy.ThisTimeLineID ==
ControlFile_source.checkPointCopy.ThisTimeLineID)
{ {
pg_log_info("source and target cluster are on the same timeline"); pg_log_info("source and target cluster are on the same timeline");
rewind_needed = false; rewind_needed = false;
@ -373,11 +394,11 @@ main(int argc, char **argv)
filehash_init(); filehash_init();
/* /*
* Collect information about all files in the target and source systems. * Collect information about all files in the both data directories.
*/ */
if (showprogress) if (showprogress)
pg_log_info("reading source file list"); pg_log_info("reading source file list");
fetchSourceFileList(); source->traverse_files(source, &process_source_file);
if (showprogress) if (showprogress)
pg_log_info("reading target file list"); pg_log_info("reading target file list");
@ -421,11 +442,124 @@ main(int argc, char **argv)
} }
/* /*
* This is the point of no return. Once we start copying things, we have * We have now collected all the information we need from both systems,
* modified the target directory and there is no turning back! * and we are ready to start modifying the target directory.
*
* This is the point of no return. Once we start copying things, there is
* no turning back!
*/ */
perform_rewind(filemap, source, chkptrec, chkpttli, chkptredo);
execute_file_actions(filemap); if (showprogress)
pg_log_info("syncing target data directory");
sync_target_dir();
/* Also update the standby configuration, if requested. */
if (writerecoveryconf && !dry_run)
WriteRecoveryConfig(conn, datadir_target,
GenerateRecoveryConfig(conn, NULL));
/* don't need the source connection anymore */
source->destroy(source);
if (conn)
{
PQfinish(conn);
conn = NULL;
}
pg_log_info("Done!");
return 0;
}
/*
* Perform the rewind.
*
* We have already collected all the information we need from the
* target and the source.
*/
static void
perform_rewind(filemap_t *filemap, rewind_source *source,
XLogRecPtr chkptrec,
TimeLineID chkpttli,
XLogRecPtr chkptredo)
{
XLogRecPtr endrec;
TimeLineID endtli;
ControlFileData ControlFile_new;
/*
* Execute the actions in the file map, fetching data from the source
* system as needed.
*/
for (int i = 0; i < filemap->nentries; i++)
{
file_entry_t *entry = filemap->entries[i];
/*
* If this is a relation file, copy the modified blocks.
*
* This is in addition to any other changes.
*/
if (entry->target_pages_to_overwrite.bitmapsize > 0)
{
datapagemap_iterator_t *iter;
BlockNumber blkno;
off_t offset;
iter = datapagemap_iterate(&entry->target_pages_to_overwrite);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
}
pg_free(iter);
}
switch (entry->action)
{
case FILE_ACTION_NONE:
/* nothing else to do */
break;
case FILE_ACTION_COPY:
/* Truncate the old file out of the way, if any */
open_target_file(entry->path, true);
source->queue_fetch_range(source, entry->path,
0, entry->source_size);
break;
case FILE_ACTION_TRUNCATE:
truncate_target_file(entry->path, entry->source_size);
break;
case FILE_ACTION_COPY_TAIL:
source->queue_fetch_range(source, entry->path,
entry->target_size,
entry->source_size - entry->target_size);
break;
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
case FILE_ACTION_CREATE:
create_target(entry);
break;
case FILE_ACTION_UNDECIDED:
pg_fatal("no action decided for \"%s\"", entry->path);
break;
}
}
/*
* We've now copied the list of file ranges that we need to fetch to the
* temporary table. Now, actually fetch all of those ranges.
*/
source->finish_fetch(source);
close_target_file();
progress_report(true); progress_report(true);
@ -437,15 +571,15 @@ main(int argc, char **argv)
* Update control file of target. Make it ready to perform archive * Update control file of target. Make it ready to perform archive
* recovery when restarting. * recovery when restarting.
* *
* minRecoveryPoint is set to the current WAL insert location in the * Like in an online backup, it's important that we replay all the WAL
* source server. Like in an online backup, it's important that we recover * that was generated while we copied the files over. To enforce that, set
* all the WAL that was generated while we copied the files over. * 'minRecoveryPoint' in the control file.
*/ */
memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData)); memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
if (connstr_source) if (connstr_source)
{ {
endrec = libpqGetCurrentXlogInsertLocation(); endrec = source->get_current_wal_insert_lsn(source);
endtli = ControlFile_source.checkPointCopy.ThisTimeLineID; endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
} }
else else
@ -458,18 +592,6 @@ main(int argc, char **argv)
ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY; ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
if (!dry_run) if (!dry_run)
update_controlfile(datadir_target, &ControlFile_new, do_sync); update_controlfile(datadir_target, &ControlFile_new, do_sync);
if (showprogress)
pg_log_info("syncing target data directory");
sync_target_dir();
if (writerecoveryconf && !dry_run)
WriteRecoveryConfig(conn, datadir_target,
GenerateRecoveryConfig(conn, NULL));
pg_log_info("Done!");
return 0;
} }
static void static void
@ -629,7 +751,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries)
/* Get history file from appropriate source */ /* Get history file from appropriate source */
if (controlFile == &ControlFile_source) if (controlFile == &ControlFile_source)
histfile = fetchFile(path, NULL); histfile = source->fetch_file(source, path, NULL);
else if (controlFile == &ControlFile_target) else if (controlFile == &ControlFile_target)
histfile = slurpFile(datadir_target, path, NULL); histfile = slurpFile(datadir_target, path, NULL);
else else
@ -785,16 +907,18 @@ checkControlFile(ControlFileData *ControlFile)
} }
/* /*
* Verify control file contents in the buffer src, and copy it to *ControlFile. * Verify control file contents in the buffer 'content', and copy it to
* *ControlFile.
*/ */
static void static void
digestControlFile(ControlFileData *ControlFile, char *src, size_t size) digestControlFile(ControlFileData *ControlFile, const char *content,
size_t size)
{ {
if (size != PG_CONTROL_FILE_SIZE) if (size != PG_CONTROL_FILE_SIZE)
pg_fatal("unexpected control file size %d, expected %d", pg_fatal("unexpected control file size %d, expected %d",
(int) size, PG_CONTROL_FILE_SIZE); (int) size, PG_CONTROL_FILE_SIZE);
memcpy(ControlFile, src, sizeof(ControlFileData)); memcpy(ControlFile, content, sizeof(ControlFileData));
/* set and validate WalSegSz */ /* set and validate WalSegSz */
WalSegSz = ControlFile->xlog_seg_size; WalSegSz = ControlFile->xlog_seg_size;

View File

@ -20,8 +20,6 @@
/* Configuration options */ /* Configuration options */
extern char *datadir_target; extern char *datadir_target;
extern char *datadir_source;
extern char *connstr_source;
extern bool showprogress; extern bool showprogress;
extern bool dry_run; extern bool dry_run;
extern bool do_sync; extern bool do_sync;
@ -31,9 +29,6 @@ extern int WalSegSz;
extern TimeLineHistoryEntry *targetHistory; extern TimeLineHistoryEntry *targetHistory;
extern int targetNentries; extern int targetNentries;
/* general state */
extern PGconn *conn;
/* Progress counters */ /* Progress counters */
extern uint64 fetch_size; extern uint64 fetch_size;
extern uint64 fetch_done; extern uint64 fetch_done;

View File

@ -0,0 +1,73 @@
/*-------------------------------------------------------------------------
*
* rewind_source.h
* Abstraction for fetching from source server.
*
* The source server can be either a libpq connection to a live system,
* or a local data directory. The 'rewind_source' struct abstracts the
* operations to fetch data from the source system, so that the rest of
* the code doesn't need to care what kind of a source its dealing with.
*
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef REWIND_SOURCE_H
#define REWIND_SOURCE_H
#include "access/xlogdefs.h"
#include "file_ops.h"
#include "filemap.h"
#include "libpq-fe.h"
typedef struct rewind_source
{
/*
* Traverse all files in the source data directory, and call 'callback' on
* each file.
*/
void (*traverse_files) (struct rewind_source *,
process_file_callback_t callback);
/*
* Fetch a single file into a malloc'd buffer. The file size is returned
* in *filesize. The returned buffer is always zero-terminated, which is
* handy for text files.
*/
char *(*fetch_file) (struct rewind_source *, const char *path,
size_t *filesize);
/*
* Request to fetch (part of) a file in the source system, specified by an
* offset and length, and write it to the same offset in the corresponding
* target file. The source implementation may queue up the request and
* execute it later when convenient. Call finish_fetch() to flush the
* queue and execute all requests.
*/
void (*queue_fetch_range) (struct rewind_source *, const char *path,
off_t offset, size_t len);
/*
* Execute all requests queued up with queue_fetch_range().
*/
void (*finish_fetch) (struct rewind_source *);
/*
* Get the current WAL insert position in the source system.
*/
XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *);
/*
* Free this rewind_source object.
*/
void (*destroy) (struct rewind_source *);
} rewind_source;
/* in libpq_source.c */
extern rewind_source *init_libpq_source(PGconn *conn);
/* in local_source.c */
extern rewind_source *init_local_source(const char *datadir);
#endif /* FETCH_H */

View File

@ -2956,9 +2956,11 @@ f_smgr
fd_set fd_set
fe_scram_state fe_scram_state
fe_scram_state_enum fe_scram_state_enum
fetch_range_request
file_action_t file_action_t
file_entry_t file_entry_t
file_type_t file_type_t
filehash_hash
filemap_t filemap_t
fill_string_relopt fill_string_relopt
finalize_primnode_context finalize_primnode_context
@ -3084,11 +3086,13 @@ lclContext
lclTocEntry lclTocEntry
leafSegmentInfo leafSegmentInfo
leaf_item leaf_item
libpq_source
line_t line_t
lineno_t lineno_t
list_sort_comparator list_sort_comparator
local_relopt local_relopt
local_relopts local_relopts
local_source
locale_t locale_t
locate_agg_of_level_context locate_agg_of_level_context
locate_var_of_level_context locate_var_of_level_context
@ -3312,6 +3316,7 @@ rendezvousHashEntry
replace_rte_variables_callback replace_rte_variables_callback
replace_rte_variables_context replace_rte_variables_context
ret_type ret_type
rewind_source
rewrite_event rewrite_event
rijndael_ctx rijndael_ctx
rm_detail_t rm_detail_t