Redesign archive modules

A new callback named startup_cb, called shortly after a module is
loaded, is added.  This makes possible the initialization of any
additional state data required by a module.  This initial state data can
be saved in a ArchiveModuleState, that is now passed down to all the
callbacks that can be defined in a module.  With this design, it is
possible to have a per-module state, aimed at opening the door to the
support of more than one archive module.

The initialization of the callbacks is changed so as
_PG_archive_module_init() does not anymore give in input a
ArchiveModuleCallbacks that a module has to fill in with callback
definitions.  Instead, a module now needs to return a const
ArchiveModuleCallbacks.

All the structure and callback definitions of archive modules are moved
into their own header, named archive_module.h, from pgarch.h.
Command-based archiving follows the same line, with a new set of files
named shell_archive.{c,h}.

There are a few more items that are under discussion to improve the
design of archive modules, like the fact that basic_archive calls
sigsetjmp() by itself to define its own error handling flow.  These will
be adjusted later, the changes done here cover already a good portion
of what has been discussed.

Any modules created for v15 will need to be adjusted to this new
design.

Author: Nathan Bossart
Reviewed-by: Andres Freund
Discussion: https://postgr.es/m/20230130194810.6fztfgbn32e7qarj@awork3.anarazel.de
This commit is contained in:
Michael Paquier 2023-02-17 14:26:42 +09:00
parent d2ea2d310d
commit 35739b87dc
15 changed files with 253 additions and 90 deletions

View File

@ -30,9 +30,9 @@
#include <sys/time.h>
#include <unistd.h>
#include "archive/archive_module.h"
#include "common/int.h"
#include "miscadmin.h"
#include "postmaster/pgarch.h"
#include "storage/copydir.h"
#include "storage/fd.h"
#include "utils/guc.h"
@ -40,14 +40,27 @@
PG_MODULE_MAGIC;
static char *archive_directory = NULL;
static MemoryContext basic_archive_context;
typedef struct BasicArchiveData
{
MemoryContext context;
} BasicArchiveData;
static bool basic_archive_configured(void);
static bool basic_archive_file(const char *file, const char *path);
static char *archive_directory = NULL;
static void basic_archive_startup(ArchiveModuleState *state);
static bool basic_archive_configured(ArchiveModuleState *state);
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
static void basic_archive_file_internal(const char *file, const char *path);
static bool check_archive_directory(char **newval, void **extra, GucSource source);
static bool compare_files(const char *file1, const char *file2);
static void basic_archive_shutdown(ArchiveModuleState *state);
static const ArchiveModuleCallbacks basic_archive_callbacks = {
.startup_cb = basic_archive_startup,
.check_configured_cb = basic_archive_configured,
.archive_file_cb = basic_archive_file,
.shutdown_cb = basic_archive_shutdown
};
/*
* _PG_init
@ -67,10 +80,6 @@ _PG_init(void)
check_archive_directory, NULL, NULL);
MarkGUCPrefixReserved("basic_archive");
basic_archive_context = AllocSetContextCreate(TopMemoryContext,
"basic_archive",
ALLOCSET_DEFAULT_SIZES);
}
/*
@ -78,11 +87,28 @@ _PG_init(void)
*
* Returns the module's archiving callbacks.
*/
void
_PG_archive_module_init(ArchiveModuleCallbacks *cb)
const ArchiveModuleCallbacks *
_PG_archive_module_init(void)
{
cb->check_configured_cb = basic_archive_configured;
cb->archive_file_cb = basic_archive_file;
return &basic_archive_callbacks;
}
/*
* basic_archive_startup
*
* Creates the module's memory context.
*/
void
basic_archive_startup(ArchiveModuleState *state)
{
BasicArchiveData *data;
data = (BasicArchiveData *) MemoryContextAllocZero(TopMemoryContext,
sizeof(BasicArchiveData));
data->context = AllocSetContextCreate(TopMemoryContext,
"basic_archive",
ALLOCSET_DEFAULT_SIZES);
state->private_data = (void *) data;
}
/*
@ -133,7 +159,7 @@ check_archive_directory(char **newval, void **extra, GucSource source)
* Checks that archive_directory is not blank.
*/
static bool
basic_archive_configured(void)
basic_archive_configured(ArchiveModuleState *state)
{
return archive_directory != NULL && archive_directory[0] != '\0';
}
@ -144,10 +170,12 @@ basic_archive_configured(void)
* Archives one file.
*/
static bool
basic_archive_file(const char *file, const char *path)
basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
{
sigjmp_buf local_sigjmp_buf;
MemoryContext oldcontext;
BasicArchiveData *data = (BasicArchiveData *) state->private_data;
MemoryContext basic_archive_context = data->context;
/*
* We run basic_archive_file_internal() in our own memory context so that
@ -366,3 +394,35 @@ compare_files(const char *file1, const char *file2)
return ret;
}
/*
* basic_archive_shutdown
*
* Frees our allocated state.
*/
static void
basic_archive_shutdown(ArchiveModuleState *state)
{
BasicArchiveData *data = (BasicArchiveData *) state->private_data;
MemoryContext basic_archive_context;
/*
* If we didn't get to storing the pointer to our allocated state, we don't
* have anything to clean up.
*/
if (data == NULL)
return;
basic_archive_context = data->context;
Assert(CurrentMemoryContext != basic_archive_context);
if (MemoryContextIsValid(basic_archive_context))
MemoryContextDelete(basic_archive_context);
data->context = NULL;
/*
* Finally, free the state.
*/
pfree(data);
state->private_data = NULL;
}

View File

@ -47,18 +47,22 @@
normal library search path is used to locate the library. To provide the
required archive module callbacks and to indicate that the library is
actually an archive module, it needs to provide a function named
<function>_PG_archive_module_init</function>. This function is passed a
struct that needs to be filled with the callback function pointers for
individual actions.
<function>_PG_archive_module_init</function>. The result of the function
must be a pointer to a struct of type
<structname>ArchiveModuleCallbacks</structname>, which contains everything
that the core code needs to know how to make use of the archive module. The
return value needs to be of server lifetime, which is typically achieved by
defining it as a <literal>static const</literal> variable in global scope.
<programlisting>
typedef struct ArchiveModuleCallbacks
{
ArchiveStartupCB startup_cb;
ArchiveCheckConfiguredCB check_configured_cb;
ArchiveFileCB archive_file_cb;
ArchiveShutdownCB shutdown_cb;
} ArchiveModuleCallbacks;
typedef void (*ArchiveModuleInit) (struct ArchiveModuleCallbacks *cb);
typedef const ArchiveModuleCallbacks *(*ArchiveModuleInit) (void);
</programlisting>
Only the <function>archive_file_cb</function> callback is required. The
@ -73,6 +77,20 @@ typedef void (*ArchiveModuleInit) (struct ArchiveModuleCallbacks *cb);
The server will call them as required to process each individual WAL file.
</para>
<sect2 id="archive-module-startup">
<title>Startup Callback</title>
<para>
The <function>startup_cb</function> callback is called shortly after the
module is loaded. This callback can be used to perform any additional
initialization required. If the archive module has a state, it can use
<structfield>state->private_data</structfield> to store it.
<programlisting>
typedef void (*ArchiveStartupCB) (ArchiveModuleState *state);
</programlisting>
</para>
</sect2>
<sect2 id="archive-module-check">
<title>Check Callback</title>
<para>
@ -83,7 +101,7 @@ typedef void (*ArchiveModuleInit) (struct ArchiveModuleCallbacks *cb);
assumes the module is configured.
<programlisting>
typedef bool (*ArchiveCheckConfiguredCB) (void);
typedef bool (*ArchiveCheckConfiguredCB) (ArchiveModuleState *state);
</programlisting>
If <literal>true</literal> is returned, the server will proceed with
@ -105,7 +123,7 @@ WARNING: archive_mode enabled, yet archiving is not configured
single WAL file.
<programlisting>
typedef bool (*ArchiveFileCB) (const char *file, const char *path);
typedef bool (*ArchiveFileCB) (ArchiveModuleState *state, const char *file, const char *path);
</programlisting>
If <literal>true</literal> is returned, the server proceeds as if the file
@ -125,10 +143,11 @@ typedef bool (*ArchiveFileCB) (const char *file, const char *path);
process exits (e.g., after an error) or the value of
<xref linkend="guc-archive-library"/> changes. If no
<function>shutdown_cb</function> is defined, no special action is taken in
these situations.
these situations. If the archive module has a state, this callback should
free it to avoid leaks.
<programlisting>
typedef void (*ArchiveShutdownCB) (void);
typedef void (*ArchiveShutdownCB) (ArchiveModuleState *state);
</programlisting>
</para>
</sect2>

View File

@ -17,7 +17,7 @@ subdir = src/backend
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = access backup bootstrap catalog parser commands executor \
SUBDIRS = access archive backup bootstrap catalog parser commands executor \
foreign lib libpq \
main nodes optimizer partitioning port postmaster \
regex replication rewrite \

View File

@ -0,0 +1,18 @@
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for src/backend/archive
#
# IDENTIFICATION
# src/backend/archive/Makefile
#
#-------------------------------------------------------------------------
subdir = src/backend/archive
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = \
shell_archive.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,5 @@
# Copyright (c) 2023, PostgreSQL Global Development Group
backend_sources += files(
'shell_archive.c'
)

View File

@ -9,7 +9,7 @@
* Copyright (c) 2022-2023, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/postmaster/shell_archive.c
* src/backend/archive/shell_archive.c
*
*-------------------------------------------------------------------------
*/
@ -18,30 +18,39 @@
#include <sys/wait.h>
#include "access/xlog.h"
#include "archive/archive_module.h"
#include "archive/shell_archive.h"
#include "common/percentrepl.h"
#include "pgstat.h"
#include "postmaster/pgarch.h"
static bool shell_archive_configured(void);
static bool shell_archive_file(const char *file, const char *path);
static void shell_archive_shutdown(void);
static bool shell_archive_configured(ArchiveModuleState *state);
static bool shell_archive_file(ArchiveModuleState *state,
const char *file,
const char *path);
static void shell_archive_shutdown(ArchiveModuleState *state);
void
shell_archive_init(ArchiveModuleCallbacks *cb)
static const ArchiveModuleCallbacks shell_archive_callbacks = {
.startup_cb = NULL,
.check_configured_cb = shell_archive_configured,
.archive_file_cb = shell_archive_file,
.shutdown_cb = shell_archive_shutdown
};
const ArchiveModuleCallbacks *
shell_archive_init(void)
{
cb->check_configured_cb = shell_archive_configured;
cb->archive_file_cb = shell_archive_file;
cb->shutdown_cb = shell_archive_shutdown;
return &shell_archive_callbacks;
}
static bool
shell_archive_configured(void)
shell_archive_configured(ArchiveModuleState *state)
{
return XLogArchiveCommand[0] != '\0';
}
static bool
shell_archive_file(const char *file, const char *path)
shell_archive_file(ArchiveModuleState *state, const char *file,
const char *path)
{
char *xlogarchcmd;
char *nativePath = NULL;
@ -53,7 +62,9 @@ shell_archive_file(const char *file, const char *path)
make_native_path(nativePath);
}
xlogarchcmd = replace_percent_placeholders(XLogArchiveCommand, "archive_command", "fp", file, nativePath);
xlogarchcmd = replace_percent_placeholders(XLogArchiveCommand,
"archive_command", "fp",
file, nativePath);
if (nativePath)
pfree(nativePath);
@ -123,7 +134,7 @@ shell_archive_file(const char *file, const char *path)
}
static void
shell_archive_shutdown(void)
shell_archive_shutdown(ArchiveModuleState *state)
{
elog(DEBUG1, "archiver process shutting down");
}

View File

@ -7,6 +7,7 @@ backend_link_with = [pgport_srv, common_srv]
generated_backend_sources = []
subdir('access')
subdir('archive')
subdir('backup')
subdir('bootstrap')
subdir('catalog')

View File

@ -22,7 +22,6 @@ OBJS = \
interrupt.o \
pgarch.o \
postmaster.o \
shell_archive.o \
startup.o \
syslogger.o \
walwriter.o

View File

@ -10,7 +10,6 @@ backend_sources += files(
'interrupt.c',
'pgarch.c',
'postmaster.c',
'shell_archive.c',
'startup.c',
'syslogger.c',
'walwriter.c',

View File

@ -31,6 +31,8 @@
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "archive/archive_module.h"
#include "archive/shell_archive.h"
#include "lib/binaryheap.h"
#include "libpq/pqsignal.h"
#include "pgstat.h"
@ -97,7 +99,8 @@ char *XLogArchiveLibrary = "";
*/
static time_t last_sigterm_time = 0;
static PgArchData *PgArch = NULL;
static ArchiveModuleCallbacks ArchiveContext;
static const ArchiveModuleCallbacks *ArchiveCallbacks;
static ArchiveModuleState *archive_module_state;
/*
@ -406,8 +409,8 @@ pgarch_ArchiverCopyLoop(void)
HandlePgArchInterrupts();
/* can't do anything if not configured ... */
if (ArchiveContext.check_configured_cb != NULL &&
!ArchiveContext.check_configured_cb())
if (ArchiveCallbacks->check_configured_cb != NULL &&
!ArchiveCallbacks->check_configured_cb(archive_module_state))
{
ereport(WARNING,
(errmsg("archive_mode enabled, yet archiving is not configured")));
@ -508,7 +511,7 @@ pgarch_archiveXlog(char *xlog)
snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
set_ps_display(activitymsg);
ret = ArchiveContext.archive_file_cb(xlog, pathname);
ret = ArchiveCallbacks->archive_file_cb(archive_module_state, xlog, pathname);
if (ret)
snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
else
@ -814,7 +817,7 @@ HandlePgArchInterrupts(void)
/*
* LoadArchiveLibrary
*
* Loads the archiving callbacks into our local ArchiveContext.
* Loads the archiving callbacks into our local ArchiveCallbacks.
*/
static void
LoadArchiveLibrary(void)
@ -827,8 +830,6 @@ LoadArchiveLibrary(void)
errmsg("both archive_command and archive_library set"),
errdetail("Only one of archive_command, archive_library may be set.")));
memset(&ArchiveContext, 0, sizeof(ArchiveModuleCallbacks));
/*
* If shell archiving is enabled, use our special initialization function.
* Otherwise, load the library and call its _PG_archive_module_init().
@ -844,12 +845,16 @@ LoadArchiveLibrary(void)
ereport(ERROR,
(errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
(*archive_init) (&ArchiveContext);
ArchiveCallbacks = (*archive_init) ();
if (ArchiveContext.archive_file_cb == NULL)
if (ArchiveCallbacks->archive_file_cb == NULL)
ereport(ERROR,
(errmsg("archive modules must register an archive callback")));
archive_module_state = (ArchiveModuleState *) palloc0(sizeof(ArchiveModuleState));
if (ArchiveCallbacks->startup_cb != NULL)
ArchiveCallbacks->startup_cb(archive_module_state);
before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
}
@ -859,6 +864,6 @@ LoadArchiveLibrary(void)
static void
pgarch_call_module_shutdown_cb(int code, Datum arg)
{
if (ArchiveContext.shutdown_cb != NULL)
ArchiveContext.shutdown_cb();
if (ArchiveCallbacks->shutdown_cb != NULL)
ArchiveCallbacks->shutdown_cb(archive_module_state);
}

View File

@ -33,6 +33,7 @@
#include "access/xlog_internal.h"
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "archive/archive_module.h"
#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "commands/async.h"

View File

@ -0,0 +1,59 @@
/*-------------------------------------------------------------------------
*
* archive_module.h
* Exports for archive modules.
*
* Copyright (c) 2022-2023, PostgreSQL Global Development Group
*
* src/include/archive/archive_module.h
*
*-------------------------------------------------------------------------
*/
#ifndef _ARCHIVE_MODULE_H
#define _ARCHIVE_MODULE_H
/*
* The value of the archive_library GUC.
*/
extern PGDLLIMPORT char *XLogArchiveLibrary;
typedef struct ArchiveModuleState
{
/*
* Private data pointer for use by an archive module. This can be used to
* store state for the module that will be passed to each of its
* callbacks.
*/
void *private_data;
} ArchiveModuleState;
/*
* Archive module callbacks
*
* These callback functions should be defined by archive libraries and returned
* via _PG_archive_module_init(). ArchiveFileCB is the only required callback.
* For more information about the purpose of each callback, refer to the
* archive modules documentation.
*/
typedef void (*ArchiveStartupCB) (ArchiveModuleState *state);
typedef bool (*ArchiveCheckConfiguredCB) (ArchiveModuleState *state);
typedef bool (*ArchiveFileCB) (ArchiveModuleState *state, const char *file, const char *path);
typedef void (*ArchiveShutdownCB) (ArchiveModuleState *state);
typedef struct ArchiveModuleCallbacks
{
ArchiveStartupCB startup_cb;
ArchiveCheckConfiguredCB check_configured_cb;
ArchiveFileCB archive_file_cb;
ArchiveShutdownCB shutdown_cb;
} ArchiveModuleCallbacks;
/*
* Type of the shared library symbol _PG_archive_module_init that is looked
* up when loading an archive library.
*/
typedef const ArchiveModuleCallbacks *(*ArchiveModuleInit) (void);
extern PGDLLEXPORT const ArchiveModuleCallbacks *_PG_archive_module_init(void);
#endif /* _ARCHIVE_MODULE_H */

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* shell_archive.h
* Exports for archiving via shell.
*
* Copyright (c) 2022-2023, PostgreSQL Global Development Group
*
* src/include/archive/shell_archive.h
*
*-------------------------------------------------------------------------
*/
#ifndef _SHELL_ARCHIVE_H
#define _SHELL_ARCHIVE_H
#include "archive/archive_module.h"
/*
* Since the logic for archiving via a shell command is in the core server
* and does not need to be loaded via a shared library, it has a special
* initialization function.
*/
extern const ArchiveModuleCallbacks *shell_archive_init(void);
#endif /* _SHELL_ARCHIVE_H */

View File

@ -33,43 +33,4 @@ extern void PgArchiverMain(void) pg_attribute_noreturn();
extern void PgArchWakeup(void);
extern void PgArchForceDirScan(void);
/*
* The value of the archive_library GUC.
*/
extern PGDLLIMPORT char *XLogArchiveLibrary;
/*
* Archive module callbacks
*
* These callback functions should be defined by archive libraries and returned
* via _PG_archive_module_init(). ArchiveFileCB is the only required callback.
* For more information about the purpose of each callback, refer to the
* archive modules documentation.
*/
typedef bool (*ArchiveCheckConfiguredCB) (void);
typedef bool (*ArchiveFileCB) (const char *file, const char *path);
typedef void (*ArchiveShutdownCB) (void);
typedef struct ArchiveModuleCallbacks
{
ArchiveCheckConfiguredCB check_configured_cb;
ArchiveFileCB archive_file_cb;
ArchiveShutdownCB shutdown_cb;
} ArchiveModuleCallbacks;
/*
* Type of the shared library symbol _PG_archive_module_init that is looked
* up when loading an archive library.
*/
typedef void (*ArchiveModuleInit) (ArchiveModuleCallbacks *cb);
extern PGDLLEXPORT void _PG_archive_module_init(ArchiveModuleCallbacks *cb);
/*
* Since the logic for archiving via a shell command is in the core server
* and does not need to be loaded via a shared library, it has a special
* initialization function.
*/
extern void shell_archive_init(ArchiveModuleCallbacks *cb);
#endif /* _PGARCH_H */

View File

@ -128,6 +128,7 @@ ArchiveHandle
ArchiveMode
ArchiveModuleCallbacks
ArchiveModuleInit
ArchiveModuleState
ArchiveOpts
ArchiveShutdownCB
ArchiveStreamState