diff --git a/src/bin/pg_rewind/Makefile b/src/bin/pg_rewind/Makefile index f398c3d848..9bfde5c087 100644 --- a/src/bin/pg_rewind/Makefile +++ b/src/bin/pg_rewind/Makefile @@ -20,12 +20,11 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ - copy_fetch.o \ datapagemap.o \ - fetch.o \ file_ops.o \ filemap.o \ - libpq_fetch.o \ + libpq_source.o \ + local_source.o \ parsexlog.o \ pg_rewind.o \ timeline.o \ diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c deleted file mode 100644 index 1cd4449314..0000000000 --- a/src/bin/pg_rewind/copy_fetch.c +++ /dev/null @@ -1,266 +0,0 @@ -/*------------------------------------------------------------------------- - * - * copy_fetch.c - * Functions for using a data directory as the source. - * - * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group - * - *------------------------------------------------------------------------- - */ -#include "postgres_fe.h" - -#include -#include -#include -#include - -#include "datapagemap.h" -#include "fetch.h" -#include "file_ops.h" -#include "filemap.h" -#include "pg_rewind.h" - -static void recurse_dir(const char *datadir, const char *path, - process_file_callback_t callback); - -static void execute_pagemap(datapagemap_t *pagemap, const char *path); - -/* - * Traverse through all files in a data directory, calling 'callback' - * for each file. - */ -void -traverse_datadir(const char *datadir, process_file_callback_t callback) -{ - recurse_dir(datadir, NULL, callback); -} - -/* - * recursive part of traverse_datadir - * - * parentpath is the current subdirectory's path relative to datadir, - * or NULL at the top level. - */ -static void -recurse_dir(const char *datadir, const char *parentpath, - process_file_callback_t callback) -{ - DIR *xldir; - struct dirent *xlde; - char fullparentpath[MAXPGPATH]; - - if (parentpath) - snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath); - else - snprintf(fullparentpath, MAXPGPATH, "%s", datadir); - - xldir = opendir(fullparentpath); - if (xldir == NULL) - pg_fatal("could not open directory \"%s\": %m", - fullparentpath); - - while (errno = 0, (xlde = readdir(xldir)) != NULL) - { - struct stat fst; - char fullpath[MAXPGPATH * 2]; - char path[MAXPGPATH * 2]; - - if (strcmp(xlde->d_name, ".") == 0 || - strcmp(xlde->d_name, "..") == 0) - continue; - - snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name); - - if (lstat(fullpath, &fst) < 0) - { - if (errno == ENOENT) - { - /* - * File doesn't exist anymore. This is ok, if the new primary - * is running and the file was just removed. If it was a data - * file, there should be a WAL record of the removal. If it - * was something else, it couldn't have been anyway. - * - * TODO: But complain if we're processing the target dir! - */ - } - else - pg_fatal("could not stat file \"%s\": %m", - fullpath); - } - - if (parentpath) - snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name); - else - snprintf(path, sizeof(path), "%s", xlde->d_name); - - if (S_ISREG(fst.st_mode)) - callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL); - else if (S_ISDIR(fst.st_mode)) - { - callback(path, FILE_TYPE_DIRECTORY, 0, NULL); - /* recurse to handle subdirectories */ - recurse_dir(datadir, path, callback); - } -#ifndef WIN32 - else if (S_ISLNK(fst.st_mode)) -#else - else if (pgwin32_is_junction(fullpath)) -#endif - { -#if defined(HAVE_READLINK) || defined(WIN32) - char link_target[MAXPGPATH]; - int len; - - len = readlink(fullpath, link_target, sizeof(link_target)); - if (len < 0) - pg_fatal("could not read symbolic link \"%s\": %m", - fullpath); - if (len >= sizeof(link_target)) - pg_fatal("symbolic link \"%s\" target is too long", - fullpath); - link_target[len] = '\0'; - - callback(path, FILE_TYPE_SYMLINK, 0, link_target); - - /* - * If it's a symlink within pg_tblspc, we need to recurse into it, - * to process all the tablespaces. We also follow a symlink if - * it's for pg_wal. Symlinks elsewhere are ignored. - */ - if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) || - strcmp(path, "pg_wal") == 0) - recurse_dir(datadir, path, callback); -#else - pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform", - fullpath); -#endif /* HAVE_READLINK */ - } - } - - if (errno) - pg_fatal("could not read directory \"%s\": %m", - fullparentpath); - - if (closedir(xldir)) - pg_fatal("could not close directory \"%s\": %m", - fullparentpath); -} - -/* - * Copy a file from source to target, between 'begin' and 'end' offsets. - * - * If 'trunc' is true, any existing file with the same name is truncated. - */ -static void -rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc) -{ - PGAlignedBlock buf; - char srcpath[MAXPGPATH]; - int srcfd; - - snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path); - - srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0); - if (srcfd < 0) - pg_fatal("could not open source file \"%s\": %m", - srcpath); - - if (lseek(srcfd, begin, SEEK_SET) == -1) - pg_fatal("could not seek in source file: %m"); - - open_target_file(path, trunc); - - while (end - begin > 0) - { - int readlen; - int len; - - if (end - begin > sizeof(buf)) - len = sizeof(buf); - else - len = end - begin; - - readlen = read(srcfd, buf.data, len); - - if (readlen < 0) - pg_fatal("could not read file \"%s\": %m", - srcpath); - else if (readlen == 0) - pg_fatal("unexpected EOF while reading file \"%s\"", srcpath); - - write_target_range(buf.data, begin, readlen); - begin += readlen; - } - - if (close(srcfd) != 0) - pg_fatal("could not close file \"%s\": %m", srcpath); -} - -/* - * Copy all relation data files from datadir_source to datadir_target, which - * are marked in the given data page map. - */ -void -copy_executeFileMap(filemap_t *map) -{ - file_entry_t *entry; - int i; - - for (i = 0; i < map->nentries; i++) - { - entry = map->entries[i]; - execute_pagemap(&entry->target_pages_to_overwrite, entry->path); - - switch (entry->action) - { - case FILE_ACTION_NONE: - /* ok, do nothing.. */ - break; - - case FILE_ACTION_COPY: - rewind_copy_file_range(entry->path, 0, entry->source_size, true); - break; - - case FILE_ACTION_TRUNCATE: - truncate_target_file(entry->path, entry->source_size); - break; - - case FILE_ACTION_COPY_TAIL: - rewind_copy_file_range(entry->path, entry->target_size, - entry->source_size, false); - break; - - case FILE_ACTION_CREATE: - create_target(entry); - break; - - case FILE_ACTION_REMOVE: - remove_target(entry); - break; - - case FILE_ACTION_UNDECIDED: - pg_fatal("no action decided for \"%s\"", entry->path); - break; - } - } - - close_target_file(); -} - -static void -execute_pagemap(datapagemap_t *pagemap, const char *path) -{ - datapagemap_iterator_t *iter; - BlockNumber blkno; - off_t offset; - - iter = datapagemap_iterate(pagemap); - while (datapagemap_next(iter, &blkno)) - { - offset = blkno * BLCKSZ; - rewind_copy_file_range(path, offset, offset + BLCKSZ, false); - /* Ok, this block has now been copied from new data dir to old */ - } - pg_free(iter); -} diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c deleted file mode 100644 index f41d0f295e..0000000000 --- a/src/bin/pg_rewind/fetch.c +++ /dev/null @@ -1,60 +0,0 @@ -/*------------------------------------------------------------------------- - * - * fetch.c - * Functions for fetching files from a local or remote data dir - * - * This file forms an abstraction of getting files from the "source". - * There are two implementations of this interface: one for copying files - * from a data directory via normal filesystem operations (copy_fetch.c), - * and another for fetching files from a remote server via a libpq - * connection (libpq_fetch.c) - * - * - * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group - * - *------------------------------------------------------------------------- - */ -#include "postgres_fe.h" - -#include -#include - -#include "fetch.h" -#include "file_ops.h" -#include "filemap.h" -#include "pg_rewind.h" - -void -fetchSourceFileList(void) -{ - if (datadir_source) - traverse_datadir(datadir_source, &process_source_file); - else - libpqProcessFileList(); -} - -/* - * Fetch all relation data files that are marked in the given data page map. - */ -void -execute_file_actions(filemap_t *filemap) -{ - if (datadir_source) - copy_executeFileMap(filemap); - else - libpq_executeFileMap(filemap); -} - -/* - * Fetch a single file into a malloc'd buffer. The file size is returned - * in *filesize. The returned buffer is always zero-terminated, which is - * handy for text files. - */ -char * -fetchFile(const char *filename, size_t *filesize) -{ - if (datadir_source) - return slurpFile(datadir_source, filename, filesize); - else - return libpqGetFile(filename, filesize); -} diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h deleted file mode 100644 index b20df8b153..0000000000 --- a/src/bin/pg_rewind/fetch.h +++ /dev/null @@ -1,44 +0,0 @@ -/*------------------------------------------------------------------------- - * - * fetch.h - * Fetching data from a local or remote data directory. - * - * This file includes the prototypes for functions used to copy files from - * one data directory to another. The source to copy from can be a local - * directory (copy method), or a remote PostgreSQL server (libpq fetch - * method). - * - * Copyright (c) 2013-2020, PostgreSQL Global Development Group - * - *------------------------------------------------------------------------- - */ -#ifndef FETCH_H -#define FETCH_H - -#include "access/xlogdefs.h" - -#include "filemap.h" - -/* - * Common interface. Calls the copy or libpq method depending on global - * config options. - */ -extern void fetchSourceFileList(void); -extern char *fetchFile(const char *filename, size_t *filesize); -extern void execute_file_actions(filemap_t *filemap); - -/* in libpq_fetch.c */ -extern void libpqProcessFileList(void); -extern char *libpqGetFile(const char *filename, size_t *filesize); -extern void libpq_executeFileMap(filemap_t *map); - -extern void libpqConnect(const char *connstr); -extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void); - -/* in copy_fetch.c */ -extern void copy_executeFileMap(filemap_t *map); - -typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target); -extern void traverse_datadir(const char *datadir, process_file_callback_t callback); - -#endif /* FETCH_H */ diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c index ec37d0b2e0..065368a220 100644 --- a/src/bin/pg_rewind/file_ops.c +++ b/src/bin/pg_rewind/file_ops.c @@ -15,6 +15,7 @@ #include "postgres_fe.h" #include +#include #include #include @@ -35,6 +36,9 @@ static void remove_target_dir(const char *path); static void create_target_symlink(const char *path, const char *link); static void remove_target_symlink(const char *path); +static void recurse_dir(const char *datadir, const char *parentpath, + process_file_callback_t callback); + /* * Open a target file for writing. If 'trunc' is true and the file already * exists, it will be truncated. @@ -83,7 +87,7 @@ close_target_file(void) void write_target_range(char *buf, off_t begin, size_t size) { - int writeleft; + size_t writeleft; char *p; /* update progress report */ @@ -101,7 +105,7 @@ write_target_range(char *buf, off_t begin, size_t size) p = buf; while (writeleft > 0) { - int writelen; + ssize_t writelen; errno = 0; writelen = write(dstfd, p, writeleft); @@ -305,9 +309,6 @@ sync_target_dir(void) * buffer is actually *filesize + 1. That's handy when reading a text file. * This function can be used to read binary files as well, you can just * ignore the zero-terminator in that case. - * - * This function is used to implement the fetchFile function in the "fetch" - * interface (see fetch.c), but is also called directly. */ char * slurpFile(const char *datadir, const char *path, size_t *filesize) @@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize) *filesize = len; return buffer; } + +/* + * Traverse through all files in a data directory, calling 'callback' + * for each file. + */ +void +traverse_datadir(const char *datadir, process_file_callback_t callback) +{ + recurse_dir(datadir, NULL, callback); +} + +/* + * recursive part of traverse_datadir + * + * parentpath is the current subdirectory's path relative to datadir, + * or NULL at the top level. + */ +static void +recurse_dir(const char *datadir, const char *parentpath, + process_file_callback_t callback) +{ + DIR *xldir; + struct dirent *xlde; + char fullparentpath[MAXPGPATH]; + + if (parentpath) + snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath); + else + snprintf(fullparentpath, MAXPGPATH, "%s", datadir); + + xldir = opendir(fullparentpath); + if (xldir == NULL) + pg_fatal("could not open directory \"%s\": %m", + fullparentpath); + + while (errno = 0, (xlde = readdir(xldir)) != NULL) + { + struct stat fst; + char fullpath[MAXPGPATH * 2]; + char path[MAXPGPATH * 2]; + + if (strcmp(xlde->d_name, ".") == 0 || + strcmp(xlde->d_name, "..") == 0) + continue; + + snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name); + + if (lstat(fullpath, &fst) < 0) + { + if (errno == ENOENT) + { + /* + * File doesn't exist anymore. This is ok, if the new primary + * is running and the file was just removed. If it was a data + * file, there should be a WAL record of the removal. If it + * was something else, it couldn't have been anyway. + * + * TODO: But complain if we're processing the target dir! + */ + } + else + pg_fatal("could not stat file \"%s\": %m", + fullpath); + } + + if (parentpath) + snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name); + else + snprintf(path, sizeof(path), "%s", xlde->d_name); + + if (S_ISREG(fst.st_mode)) + callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL); + else if (S_ISDIR(fst.st_mode)) + { + callback(path, FILE_TYPE_DIRECTORY, 0, NULL); + /* recurse to handle subdirectories */ + recurse_dir(datadir, path, callback); + } +#ifndef WIN32 + else if (S_ISLNK(fst.st_mode)) +#else + else if (pgwin32_is_junction(fullpath)) +#endif + { +#if defined(HAVE_READLINK) || defined(WIN32) + char link_target[MAXPGPATH]; + int len; + + len = readlink(fullpath, link_target, sizeof(link_target)); + if (len < 0) + pg_fatal("could not read symbolic link \"%s\": %m", + fullpath); + if (len >= sizeof(link_target)) + pg_fatal("symbolic link \"%s\" target is too long", + fullpath); + link_target[len] = '\0'; + + callback(path, FILE_TYPE_SYMLINK, 0, link_target); + + /* + * If it's a symlink within pg_tblspc, we need to recurse into it, + * to process all the tablespaces. We also follow a symlink if + * it's for pg_wal. Symlinks elsewhere are ignored. + */ + if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) || + strcmp(path, "pg_wal") == 0) + recurse_dir(datadir, path, callback); +#else + pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform", + fullpath); +#endif /* HAVE_READLINK */ + } + } + + if (errno) + pg_fatal("could not read directory \"%s\": %m", + fullparentpath); + + if (closedir(xldir)) + pg_fatal("could not close directory \"%s\": %m", + fullparentpath); +} diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h index d8466385cf..c763085976 100644 --- a/src/bin/pg_rewind/file_ops.h +++ b/src/bin/pg_rewind/file_ops.h @@ -23,4 +23,7 @@ extern void sync_target_dir(void); extern char *slurpFile(const char *datadir, const char *path, size_t *filesize); +typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target); +extern void traverse_datadir(const char *datadir, process_file_callback_t callback); + #endif /* FILE_OPS_H */ diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_source.c similarity index 65% rename from src/bin/pg_rewind/libpq_fetch.c rename to src/bin/pg_rewind/libpq_source.c index 16d451ae16..c73e8bf470 100644 --- a/src/bin/pg_rewind/libpq_fetch.c +++ b/src/bin/pg_rewind/libpq_source.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * - * libpq_fetch.c - * Functions for fetching files from a remote server. + * libpq_source.c + * Functions for fetching files from a remote server via libpq. * * Copyright (c) 2013-2020, PostgreSQL Global Development Group * @@ -9,21 +9,14 @@ */ #include "postgres_fe.h" -#include -#include -#include -#include - #include "catalog/pg_type_d.h" #include "common/connect.h" #include "datapagemap.h" -#include "fetch.h" #include "file_ops.h" #include "filemap.h" #include "pg_rewind.h" #include "port/pg_bswap.h" - -PGconn *conn = NULL; +#include "rewind_source.h" /* * Files are fetched max CHUNKSIZE bytes at a time. @@ -34,30 +27,71 @@ PGconn *conn = NULL; */ #define CHUNKSIZE 1000000 -static void receiveFileChunks(const char *sql); -static void execute_pagemap(datapagemap_t *pagemap, const char *path); -static char *run_simple_query(const char *sql); -static void run_simple_command(const char *sql); - -void -libpqConnect(const char *connstr) +typedef struct +{ + rewind_source common; /* common interface functions */ + + PGconn *conn; + bool copy_started; +} libpq_source; + +static void init_libpq_conn(PGconn *conn); +static char *run_simple_query(PGconn *conn, const char *sql); +static void run_simple_command(PGconn *conn, const char *sql); + +/* public interface functions */ +static void libpq_traverse_files(rewind_source *source, + process_file_callback_t callback); +static void libpq_queue_fetch_range(rewind_source *source, const char *path, + off_t off, size_t len); +static void libpq_finish_fetch(rewind_source *source); +static char *libpq_fetch_file(rewind_source *source, const char *path, + size_t *filesize); +static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source); +static void libpq_destroy(rewind_source *source); + +/* + * Create a new libpq source. + * + * The caller has already established the connection, but should not try + * to use it while the source is active. + */ +rewind_source * +init_libpq_source(PGconn *conn) +{ + libpq_source *src; + + init_libpq_conn(conn); + + src = pg_malloc0(sizeof(libpq_source)); + + src->common.traverse_files = libpq_traverse_files; + src->common.fetch_file = libpq_fetch_file; + src->common.queue_fetch_range = libpq_queue_fetch_range; + src->common.finish_fetch = libpq_finish_fetch; + src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn; + src->common.destroy = libpq_destroy; + + src->conn = conn; + + return &src->common; +} + +/* + * Initialize a libpq connection for use. + */ +static void +init_libpq_conn(PGconn *conn) { - char *str; PGresult *res; - - conn = PQconnectdb(connstr); - if (PQstatus(conn) == CONNECTION_BAD) - pg_fatal("could not connect to server: %s", - PQerrorMessage(conn)); - - if (showprogress) - pg_log_info("connected to server"); + char *str; /* disable all types of timeouts */ - run_simple_command("SET statement_timeout = 0"); - run_simple_command("SET lock_timeout = 0"); - run_simple_command("SET idle_in_transaction_session_timeout = 0"); + run_simple_command(conn, "SET statement_timeout = 0"); + run_simple_command(conn, "SET lock_timeout = 0"); + run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0"); + /* secure search_path */ res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pg_fatal("could not clear search_path: %s", @@ -70,7 +104,7 @@ libpqConnect(const char *connstr) * currently because we use a temporary table. Better to check for it * explicitly than error out, for a better error message. */ - str = run_simple_query("SELECT pg_is_in_recovery()"); + str = run_simple_query(conn, "SELECT pg_is_in_recovery()"); if (strcmp(str, "f") != 0) pg_fatal("source server must not be in recovery mode"); pg_free(str); @@ -80,27 +114,19 @@ libpqConnect(const char *connstr) * a page is modified while we read it with pg_read_binary_file(), and we * rely on full page images to fix them. */ - str = run_simple_query("SHOW full_page_writes"); + str = run_simple_query(conn, "SHOW full_page_writes"); if (strcmp(str, "on") != 0) pg_fatal("full_page_writes must be enabled in the source server"); pg_free(str); - - /* - * Although we don't do any "real" updates, we do work with a temporary - * table. We don't care about synchronous commit for that. It doesn't - * otherwise matter much, but if the server is using synchronous - * replication, and replication isn't working for some reason, we don't - * want to get stuck, waiting for it to start working again. - */ - run_simple_command("SET synchronous_commit = off"); } /* - * Runs a query that returns a single value. + * Run a query that returns a single value. + * * The result should be pg_free'd after use. */ static char * -run_simple_query(const char *sql) +run_simple_query(PGconn *conn, const char *sql) { PGresult *res; char *result; @@ -123,11 +149,12 @@ run_simple_query(const char *sql) } /* - * Runs a command. + * Run a command. + * * In the event of a failure, exit immediately. */ static void -run_simple_command(const char *sql) +run_simple_command(PGconn *conn, const char *sql) { PGresult *res; @@ -141,17 +168,18 @@ run_simple_command(const char *sql) } /* - * Calls pg_current_wal_insert_lsn() function + * Call the pg_current_wal_insert_lsn() function in the remote system. */ -XLogRecPtr -libpqGetCurrentXlogInsertLocation(void) +static XLogRecPtr +libpq_get_current_wal_insert_lsn(rewind_source *source) { + PGconn *conn = ((libpq_source *) source)->conn; XLogRecPtr result; uint32 hi; uint32 lo; char *val; - val = run_simple_query("SELECT pg_current_wal_insert_lsn()"); + val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()"); if (sscanf(val, "%X/%X", &hi, &lo) != 2) pg_fatal("unrecognized result \"%s\" for current WAL insert location", val); @@ -166,9 +194,10 @@ libpqGetCurrentXlogInsertLocation(void) /* * Get a list of all files in the data directory. */ -void -libpqProcessFileList(void) +static void +libpq_traverse_files(rewind_source *source, process_file_callback_t callback) { + PGconn *conn = ((libpq_source *) source)->conn; PGresult *res; const char *sql; int i; @@ -246,30 +275,114 @@ libpqProcessFileList(void) PQclear(res); } -/*---- - * Runs a query, which returns pieces of files from the remote source data - * directory, and overwrites the corresponding parts of target files with - * the received parts. The result set is expected to be of format: - * - * path text -- path in the data directory, e.g "base/1/123" - * begin int8 -- offset within the file - * chunk bytea -- file content - *---- +/* + * Queue up a request to fetch a piece of a file from remote system. */ static void -receiveFileChunks(const char *sql) +libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, + size_t len) { - PGresult *res; + libpq_source *src = (libpq_source *) source; + uint64 begin = off; + uint64 end = off + len; - if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1) - pg_fatal("could not send query: %s", PQerrorMessage(conn)); + /* + * On first call, create a temporary table, and start COPYing to it. + * We will load it with the list of blocks that we need to fetch. + */ + if (!src->copy_started) + { + PGresult *res; + + run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)"); + + res = PQexec(src->conn, "COPY fetchchunks FROM STDIN"); + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("could not send file list: %s", + PQresultErrorMessage(res)); + PQclear(res); + + src->copy_started = true; + } + + /* + * Write the file range to a temporary table in the server. + * + * The range is sent to the server as a COPY formatted line, to be inserted + * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses + * the temporary table to actually fetch the data. + */ + + /* Split the range into CHUNKSIZE chunks */ + while (end - begin > 0) + { + char linebuf[MAXPGPATH + 23]; + unsigned int len; + + /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */ + if (end - begin > CHUNKSIZE) + len = CHUNKSIZE; + else + len = (unsigned int) (end - begin); + + snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len); + + if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1) + pg_fatal("could not send COPY data: %s", + PQerrorMessage(src->conn)); + + begin += len; + } +} + +/* + * Receive all the queued chunks and write them to the target data directory. + */ +static void +libpq_finish_fetch(rewind_source *source) +{ + libpq_source *src = (libpq_source *) source; + PGresult *res; + const char *sql; + + if (PQputCopyEnd(src->conn, NULL) != 1) + pg_fatal("could not send end-of-COPY: %s", + PQerrorMessage(src->conn)); + + while ((res = PQgetResult(src->conn)) != NULL) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("unexpected result while sending file list: %s", + PQresultErrorMessage(res)); + PQclear(res); + } + + /* + * We've now copied the list of file ranges that we need to fetch to the + * temporary table. Now, actually fetch all of those ranges. + */ + sql = + "SELECT path, begin,\n" + " pg_read_binary_file(path, begin, len, true) AS chunk\n" + "FROM fetchchunks\n"; + + if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1) + pg_fatal("could not send query: %s", PQerrorMessage(src->conn)); pg_log_debug("getting file chunks"); - if (PQsetSingleRowMode(conn) != 1) + if (PQsetSingleRowMode(src->conn) != 1) pg_fatal("could not set libpq connection to single row mode"); - while ((res = PQgetResult(conn)) != NULL) + /*---- + * The result set is of format: + * + * path text -- path in the data directory, e.g "base/1/123" + * begin int8 -- offset within the file + * chunk bytea -- file content + *---- + */ + while ((res = PQgetResult(src->conn)) != NULL) { char *filename; int filenamelen; @@ -349,8 +462,8 @@ receiveFileChunks(const char *sql) continue; } - pg_log_debug("received chunk for file \"%s\", offset %lld, size %d", - filename, (long long int) chunkoff, chunksize); + pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d", + filename, chunkoff, chunksize); open_target_file(filename, false); @@ -363,28 +476,29 @@ receiveFileChunks(const char *sql) } /* - * Receive a single file as a malloc'd buffer. + * Fetch a single file as a malloc'd buffer. */ -char * -libpqGetFile(const char *filename, size_t *filesize) +static char * +libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize) { + PGconn *conn = ((libpq_source *) source)->conn; PGresult *res; char *result; int len; const char *paramValues[1]; - paramValues[0] = filename; + paramValues[0] = path; res = PQexecParams(conn, "SELECT pg_read_binary_file($1)", 1, NULL, paramValues, NULL, NULL, 1); if (PQresultStatus(res) != PGRES_TUPLES_OK) pg_fatal("could not fetch remote file \"%s\": %s", - filename, PQresultErrorMessage(res)); + path, PQresultErrorMessage(res)); /* sanity check the result set */ if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0)) pg_fatal("unexpected result set while fetching remote file \"%s\"", - filename); + path); /* Read result to local variables */ len = PQgetlength(res, 0, 0); @@ -394,7 +508,7 @@ libpqGetFile(const char *filename, size_t *filesize) PQclear(res); - pg_log_debug("fetched file \"%s\", length %d", filename, len); + pg_log_debug("fetched file \"%s\", length %d", path, len); if (filesize) *filesize = len; @@ -402,142 +516,11 @@ libpqGetFile(const char *filename, size_t *filesize) } /* - * Write a file range to a temporary table in the server. - * - * The range is sent to the server as a COPY formatted line, to be inserted - * into the 'fetchchunks' temporary table. It is used in receiveFileChunks() - * function to actually fetch the data. + * Close a libpq source. */ static void -fetch_file_range(const char *path, uint64 begin, uint64 end) +libpq_destroy(rewind_source *source) { - char linebuf[MAXPGPATH + 23]; - - /* Split the range into CHUNKSIZE chunks */ - while (end - begin > 0) - { - unsigned int len; - - /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */ - if (end - begin > CHUNKSIZE) - len = CHUNKSIZE; - else - len = (unsigned int) (end - begin); - - snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len); - - if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1) - pg_fatal("could not send COPY data: %s", - PQerrorMessage(conn)); - - begin += len; - } -} - -/* - * Fetch all changed blocks from remote source data directory. - */ -void -libpq_executeFileMap(filemap_t *map) -{ - file_entry_t *entry; - const char *sql; - PGresult *res; - int i; - - /* - * First create a temporary table, and load it with the blocks that we - * need to fetch. - */ - sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);"; - run_simple_command(sql); - - sql = "COPY fetchchunks FROM STDIN"; - res = PQexec(conn, sql); - - if (PQresultStatus(res) != PGRES_COPY_IN) - pg_fatal("could not send file list: %s", - PQresultErrorMessage(res)); - PQclear(res); - - for (i = 0; i < map->nentries; i++) - { - entry = map->entries[i]; - - /* If this is a relation file, copy the modified blocks */ - execute_pagemap(&entry->target_pages_to_overwrite, entry->path); - - switch (entry->action) - { - case FILE_ACTION_NONE: - /* nothing else to do */ - break; - - case FILE_ACTION_COPY: - /* Truncate the old file out of the way, if any */ - open_target_file(entry->path, true); - fetch_file_range(entry->path, 0, entry->source_size); - break; - - case FILE_ACTION_TRUNCATE: - truncate_target_file(entry->path, entry->source_size); - break; - - case FILE_ACTION_COPY_TAIL: - fetch_file_range(entry->path, entry->target_size, entry->source_size); - break; - - case FILE_ACTION_REMOVE: - remove_target(entry); - break; - - case FILE_ACTION_CREATE: - create_target(entry); - break; - - case FILE_ACTION_UNDECIDED: - pg_fatal("no action decided for \"%s\"", entry->path); - break; - } - } - - if (PQputCopyEnd(conn, NULL) != 1) - pg_fatal("could not send end-of-COPY: %s", - PQerrorMessage(conn)); - - while ((res = PQgetResult(conn)) != NULL) - { - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_fatal("unexpected result while sending file list: %s", - PQresultErrorMessage(res)); - PQclear(res); - } - - /* - * We've now copied the list of file ranges that we need to fetch to the - * temporary table. Now, actually fetch all of those ranges. - */ - sql = - "SELECT path, begin,\n" - " pg_read_binary_file(path, begin, len, true) AS chunk\n" - "FROM fetchchunks\n"; - - receiveFileChunks(sql); -} - -static void -execute_pagemap(datapagemap_t *pagemap, const char *path) -{ - datapagemap_iterator_t *iter; - BlockNumber blkno; - off_t offset; - - iter = datapagemap_iterate(pagemap); - while (datapagemap_next(iter, &blkno)) - { - offset = blkno * BLCKSZ; - - fetch_file_range(path, offset, offset + BLCKSZ); - } - pg_free(iter); + pfree(source); + /* NOTE: we don't close the connection here, as it was not opened by us. */ } diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c new file mode 100644 index 0000000000..fa1b6e80ec --- /dev/null +++ b/src/bin/pg_rewind/local_source.c @@ -0,0 +1,131 @@ +/*------------------------------------------------------------------------- + * + * local_source.c + * Functions for using a local data directory as the source. + * + * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#include "postgres_fe.h" + +#include +#include + +#include "datapagemap.h" +#include "file_ops.h" +#include "filemap.h" +#include "pg_rewind.h" +#include "rewind_source.h" + +typedef struct +{ + rewind_source common; /* common interface functions */ + + const char *datadir; /* path to the source data directory */ +} local_source; + +static void local_traverse_files(rewind_source *source, + process_file_callback_t callback); +static char *local_fetch_file(rewind_source *source, const char *path, + size_t *filesize); +static void local_fetch_file_range(rewind_source *source, const char *path, + off_t off, size_t len); +static void local_finish_fetch(rewind_source *source); +static void local_destroy(rewind_source *source); + +rewind_source * +init_local_source(const char *datadir) +{ + local_source *src; + + src = pg_malloc0(sizeof(local_source)); + + src->common.traverse_files = local_traverse_files; + src->common.fetch_file = local_fetch_file; + src->common.queue_fetch_range = local_fetch_file_range; + src->common.finish_fetch = local_finish_fetch; + src->common.get_current_wal_insert_lsn = NULL; + src->common.destroy = local_destroy; + + src->datadir = datadir; + + return &src->common; +} + +static void +local_traverse_files(rewind_source *source, process_file_callback_t callback) +{ + traverse_datadir(((local_source *) source)->datadir, &process_source_file); +} + +static char * +local_fetch_file(rewind_source *source, const char *path, size_t *filesize) +{ + return slurpFile(((local_source *) source)->datadir, path, filesize); +} + +/* + * Copy a file from source to target, starting at 'off', for 'len' bytes. + */ +static void +local_fetch_file_range(rewind_source *source, const char *path, off_t off, + size_t len) +{ + const char *datadir = ((local_source *) source)->datadir; + PGAlignedBlock buf; + char srcpath[MAXPGPATH]; + int srcfd; + off_t begin = off; + off_t end = off + len; + + snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path); + + srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0); + if (srcfd < 0) + pg_fatal("could not open source file \"%s\": %m", + srcpath); + + if (lseek(srcfd, begin, SEEK_SET) == -1) + pg_fatal("could not seek in source file: %m"); + + open_target_file(path, false); + + while (end - begin > 0) + { + ssize_t readlen; + size_t len; + + if (end - begin > sizeof(buf)) + len = sizeof(buf); + else + len = end - begin; + + readlen = read(srcfd, buf.data, len); + + if (readlen < 0) + pg_fatal("could not read file \"%s\": %m", srcpath); + else if (readlen == 0) + pg_fatal("unexpected EOF while reading file \"%s\"", srcpath); + + write_target_range(buf.data, begin, readlen); + begin += readlen; + } + + if (close(srcfd) != 0) + pg_fatal("could not close file \"%s\": %m", srcpath); +} + +static void +local_finish_fetch(rewind_source *source) +{ + /* + * Nothing to do, local_fetch_file_range() copies the ranges immediately. + */ +} + +static void +local_destroy(rewind_source *source) +{ + pfree(source); +} diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c index 574d7f7163..421a45ef5b 100644 --- a/src/bin/pg_rewind/pg_rewind.c +++ b/src/bin/pg_rewind/pg_rewind.c @@ -23,20 +23,25 @@ #include "common/restricted_token.h" #include "common/string.h" #include "fe_utils/recovery_gen.h" -#include "fetch.h" #include "file_ops.h" #include "filemap.h" #include "getopt_long.h" #include "pg_rewind.h" +#include "rewind_source.h" #include "storage/bufpage.h" static void usage(const char *progname); +static void perform_rewind(filemap_t *filemap, rewind_source *source, + XLogRecPtr chkptrec, + TimeLineID chkpttli, + XLogRecPtr chkptredo); + static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli, XLogRecPtr checkpointloc); -static void digestControlFile(ControlFileData *ControlFile, char *source, - size_t size); +static void digestControlFile(ControlFileData *ControlFile, + const char *content, size_t size); static void getRestoreCommand(const char *argv0); static void sanityChecks(void); static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex); @@ -69,6 +74,8 @@ int targetNentries; uint64 fetch_size; uint64 fetch_done; +static PGconn *conn; +static rewind_source *source; static void usage(const char *progname) @@ -125,9 +132,6 @@ main(int argc, char **argv) char *buffer; bool no_ensure_shutdown = false; bool rewind_needed; - XLogRecPtr endrec; - TimeLineID endtli; - ControlFileData ControlFile_new; bool writerecoveryconf = false; filemap_t *filemap; @@ -269,19 +273,29 @@ main(int argc, char **argv) atexit(disconnect_atexit); - /* Connect to remote server */ - if (connstr_source) - libpqConnect(connstr_source); - /* - * Ok, we have all the options and we're ready to start. Read in all the - * information we need from both clusters. + * Ok, we have all the options and we're ready to start. First, connect to + * remote server. */ - buffer = slurpFile(datadir_target, "global/pg_control", &size); - digestControlFile(&ControlFile_target, buffer, size); - pg_free(buffer); + if (connstr_source) + { + conn = PQconnectdb(connstr_source); + + if (PQstatus(conn) == CONNECTION_BAD) + pg_fatal("could not connect to server: %s", + PQerrorMessage(conn)); + + if (showprogress) + pg_log_info("connected to server"); + + source = init_libpq_source(conn); + } + else + source = init_local_source(datadir_source); /* + * Check the status of the target instance. + * * If the target instance was not cleanly shut down, start and stop the * target cluster once in single-user mode to enforce recovery to finish, * ensuring that the cluster can be used by pg_rewind. Note that if @@ -289,6 +303,10 @@ main(int argc, char **argv) * need to make sure by themselves that the target cluster is in a clean * state. */ + buffer = slurpFile(datadir_target, "global/pg_control", &size); + digestControlFile(&ControlFile_target, buffer, size); + pg_free(buffer); + if (!no_ensure_shutdown && ControlFile_target.state != DB_SHUTDOWNED && ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY) @@ -300,17 +318,20 @@ main(int argc, char **argv) pg_free(buffer); } - buffer = fetchFile("global/pg_control", &size); + buffer = source->fetch_file(source, "global/pg_control", &size); digestControlFile(&ControlFile_source, buffer, size); pg_free(buffer); sanityChecks(); /* + * Find the common ancestor timeline between the clusters. + * * If both clusters are already on the same timeline, there's nothing to * do. */ - if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID) + if (ControlFile_target.checkPointCopy.ThisTimeLineID == + ControlFile_source.checkPointCopy.ThisTimeLineID) { pg_log_info("source and target cluster are on the same timeline"); rewind_needed = false; @@ -373,11 +394,11 @@ main(int argc, char **argv) filehash_init(); /* - * Collect information about all files in the target and source systems. + * Collect information about all files in the both data directories. */ if (showprogress) pg_log_info("reading source file list"); - fetchSourceFileList(); + source->traverse_files(source, &process_source_file); if (showprogress) pg_log_info("reading target file list"); @@ -421,11 +442,124 @@ main(int argc, char **argv) } /* - * This is the point of no return. Once we start copying things, we have - * modified the target directory and there is no turning back! + * We have now collected all the information we need from both systems, + * and we are ready to start modifying the target directory. + * + * This is the point of no return. Once we start copying things, there is + * no turning back! */ + perform_rewind(filemap, source, chkptrec, chkpttli, chkptredo); - execute_file_actions(filemap); + if (showprogress) + pg_log_info("syncing target data directory"); + sync_target_dir(); + + /* Also update the standby configuration, if requested. */ + if (writerecoveryconf && !dry_run) + WriteRecoveryConfig(conn, datadir_target, + GenerateRecoveryConfig(conn, NULL)); + + /* don't need the source connection anymore */ + source->destroy(source); + if (conn) + { + PQfinish(conn); + conn = NULL; + } + + pg_log_info("Done!"); + + return 0; +} + +/* + * Perform the rewind. + * + * We have already collected all the information we need from the + * target and the source. + */ +static void +perform_rewind(filemap_t *filemap, rewind_source *source, + XLogRecPtr chkptrec, + TimeLineID chkpttli, + XLogRecPtr chkptredo) +{ + XLogRecPtr endrec; + TimeLineID endtli; + ControlFileData ControlFile_new; + + /* + * Execute the actions in the file map, fetching data from the source + * system as needed. + */ + for (int i = 0; i < filemap->nentries; i++) + { + file_entry_t *entry = filemap->entries[i]; + + /* + * If this is a relation file, copy the modified blocks. + * + * This is in addition to any other changes. + */ + if (entry->target_pages_to_overwrite.bitmapsize > 0) + { + datapagemap_iterator_t *iter; + BlockNumber blkno; + off_t offset; + + iter = datapagemap_iterate(&entry->target_pages_to_overwrite); + while (datapagemap_next(iter, &blkno)) + { + offset = blkno * BLCKSZ; + source->queue_fetch_range(source, entry->path, offset, BLCKSZ); + } + pg_free(iter); + } + + switch (entry->action) + { + case FILE_ACTION_NONE: + /* nothing else to do */ + break; + + case FILE_ACTION_COPY: + /* Truncate the old file out of the way, if any */ + open_target_file(entry->path, true); + source->queue_fetch_range(source, entry->path, + 0, entry->source_size); + break; + + case FILE_ACTION_TRUNCATE: + truncate_target_file(entry->path, entry->source_size); + break; + + case FILE_ACTION_COPY_TAIL: + source->queue_fetch_range(source, entry->path, + entry->target_size, + entry->source_size - entry->target_size); + break; + + case FILE_ACTION_REMOVE: + remove_target(entry); + break; + + case FILE_ACTION_CREATE: + create_target(entry); + break; + + case FILE_ACTION_UNDECIDED: + pg_fatal("no action decided for \"%s\"", entry->path); + break; + } + } + + /* + * We've now copied the list of file ranges that we need to fetch to the + * temporary table. Now, actually fetch all of those ranges. + */ + source->finish_fetch(source); + + close_target_file(); progress_report(true); @@ -437,15 +571,15 @@ main(int argc, char **argv) * Update control file of target. Make it ready to perform archive * recovery when restarting. * - * minRecoveryPoint is set to the current WAL insert location in the - * source server. Like in an online backup, it's important that we recover - * all the WAL that was generated while we copied the files over. + * Like in an online backup, it's important that we replay all the WAL + * that was generated while we copied the files over. To enforce that, set + * 'minRecoveryPoint' in the control file. */ memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData)); if (connstr_source) { - endrec = libpqGetCurrentXlogInsertLocation(); + endrec = source->get_current_wal_insert_lsn(source); endtli = ControlFile_source.checkPointCopy.ThisTimeLineID; } else @@ -458,18 +592,6 @@ main(int argc, char **argv) ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY; if (!dry_run) update_controlfile(datadir_target, &ControlFile_new, do_sync); - - if (showprogress) - pg_log_info("syncing target data directory"); - sync_target_dir(); - - if (writerecoveryconf && !dry_run) - WriteRecoveryConfig(conn, datadir_target, - GenerateRecoveryConfig(conn, NULL)); - - pg_log_info("Done!"); - - return 0; } static void @@ -629,7 +751,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries) /* Get history file from appropriate source */ if (controlFile == &ControlFile_source) - histfile = fetchFile(path, NULL); + histfile = source->fetch_file(source, path, NULL); else if (controlFile == &ControlFile_target) histfile = slurpFile(datadir_target, path, NULL); else @@ -785,16 +907,18 @@ checkControlFile(ControlFileData *ControlFile) } /* - * Verify control file contents in the buffer src, and copy it to *ControlFile. + * Verify control file contents in the buffer 'content', and copy it to + * *ControlFile. */ static void -digestControlFile(ControlFileData *ControlFile, char *src, size_t size) +digestControlFile(ControlFileData *ControlFile, const char *content, + size_t size) { if (size != PG_CONTROL_FILE_SIZE) pg_fatal("unexpected control file size %d, expected %d", (int) size, PG_CONTROL_FILE_SIZE); - memcpy(ControlFile, src, sizeof(ControlFileData)); + memcpy(ControlFile, content, sizeof(ControlFileData)); /* set and validate WalSegSz */ WalSegSz = ControlFile->xlog_seg_size; diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h index 67f90c2a38..0dc3dbd525 100644 --- a/src/bin/pg_rewind/pg_rewind.h +++ b/src/bin/pg_rewind/pg_rewind.h @@ -20,8 +20,6 @@ /* Configuration options */ extern char *datadir_target; -extern char *datadir_source; -extern char *connstr_source; extern bool showprogress; extern bool dry_run; extern bool do_sync; @@ -31,9 +29,6 @@ extern int WalSegSz; extern TimeLineHistoryEntry *targetHistory; extern int targetNentries; -/* general state */ -extern PGconn *conn; - /* Progress counters */ extern uint64 fetch_size; extern uint64 fetch_done; diff --git a/src/bin/pg_rewind/rewind_source.h b/src/bin/pg_rewind/rewind_source.h new file mode 100644 index 0000000000..e87f239a47 --- /dev/null +++ b/src/bin/pg_rewind/rewind_source.h @@ -0,0 +1,73 @@ +/*------------------------------------------------------------------------- + * + * rewind_source.h + * Abstraction for fetching from source server. + * + * The source server can be either a libpq connection to a live system, + * or a local data directory. The 'rewind_source' struct abstracts the + * operations to fetch data from the source system, so that the rest of + * the code doesn't need to care what kind of a source its dealing with. + * + * Copyright (c) 2013-2020, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef REWIND_SOURCE_H +#define REWIND_SOURCE_H + +#include "access/xlogdefs.h" +#include "file_ops.h" +#include "filemap.h" +#include "libpq-fe.h" + +typedef struct rewind_source +{ + /* + * Traverse all files in the source data directory, and call 'callback' on + * each file. + */ + void (*traverse_files) (struct rewind_source *, + process_file_callback_t callback); + + /* + * Fetch a single file into a malloc'd buffer. The file size is returned + * in *filesize. The returned buffer is always zero-terminated, which is + * handy for text files. + */ + char *(*fetch_file) (struct rewind_source *, const char *path, + size_t *filesize); + + /* + * Request to fetch (part of) a file in the source system, specified by an + * offset and length, and write it to the same offset in the corresponding + * target file. The source implementation may queue up the request and + * execute it later when convenient. Call finish_fetch() to flush the + * queue and execute all requests. + */ + void (*queue_fetch_range) (struct rewind_source *, const char *path, + off_t offset, size_t len); + + /* + * Execute all requests queued up with queue_fetch_range(). + */ + void (*finish_fetch) (struct rewind_source *); + + /* + * Get the current WAL insert position in the source system. + */ + XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *); + + /* + * Free this rewind_source object. + */ + void (*destroy) (struct rewind_source *); + +} rewind_source; + +/* in libpq_source.c */ +extern rewind_source *init_libpq_source(PGconn *conn); + +/* in local_source.c */ +extern rewind_source *init_local_source(const char *datadir); + +#endif /* FETCH_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index da3e5f73d0..f2ba92be53 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2956,9 +2956,11 @@ f_smgr fd_set fe_scram_state fe_scram_state_enum +fetch_range_request file_action_t file_entry_t file_type_t +filehash_hash filemap_t fill_string_relopt finalize_primnode_context @@ -3084,11 +3086,13 @@ lclContext lclTocEntry leafSegmentInfo leaf_item +libpq_source line_t lineno_t list_sort_comparator local_relopt local_relopts +local_source locale_t locate_agg_of_level_context locate_var_of_level_context @@ -3312,6 +3316,7 @@ rendezvousHashEntry replace_rte_variables_callback replace_rte_variables_context ret_type +rewind_source rewrite_event rijndael_ctx rm_detail_t