Add Generic WAL interface

This interface is designed to give an access to WAL for extensions which
could implement new access method, for example. Previously it was
impossible because restoring from custom WAL would need to access system
catalog to find a redo custom function. This patch suggests generic way
to describe changes on page with standart layout.

Bump XLOG_PAGE_MAGIC because of new record type.

Author: Alexander Korotkov with a help of Petr Jelinek, Markus Nullmeier and
	minor editorization by my
Reviewers: Petr Jelinek, Alvaro Herrera, Teodor Sigaev, Jim Nasby,
	Michael Paquier
This commit is contained in:
Teodor Sigaev 2016-04-01 12:21:48 +03:00
parent c202ecf902
commit 65578341af
14 changed files with 685 additions and 6 deletions

View File

@ -100,6 +100,7 @@
<!ENTITY sources SYSTEM "sources.sgml">
<!ENTITY storage SYSTEM "storage.sgml">
<!ENTITY tablesample-method SYSTEM "tablesample-method.sgml">
<!ENTITY generic-wal SYSTEM "generic-wal.sgml">
<!-- contrib information -->
<!ENTITY contrib SYSTEM "contrib.sgml">

View File

@ -0,0 +1,141 @@
<!-- doc/src/sgml/generic-wal.sgml -->
<chapter id="generic-wal">
<title>Generic WAL records</title>
<para>
Despite all built-in access methods and WAL-logged modules having their own
types of WAL records, there is also a generic WAL record type, which describes
changes to pages in a generic way. This is useful for extensions that
provide custom access methods, because they cannot register their own
WAL redo routines.
</para>
<para>
The API for contructing generic WAL records is defined in
<filename>generic_xlog.h</> and implemented in <filename>generic_xlog.c</>.
Each generic WAL record must be constructed by following these steps:
<orderedlist>
<listitem>
<para>
<function>state = GenericXLogStart(relation)</> &mdash; start
construction of a generic xlog record for the given relation.
</para>
</listitem>
<listitem>
<para>
<function>page = GenericXLogRegister(state, buffer, isNew)</> &mdash;
register one or more buffers (one at a time) for the current generic
xlog record. This function returns a copy of the page image, where
modifications can be made. The second argument indicates if the page
is new (eventually, this will result in a full page image being put into
the xlog record).
</para>
</listitem>
<listitem>
<para>
Apply modifications to page images obtained in the previous step.
</para>
</listitem>
<listitem>
<para>
<function>GenericXLogAbort(state)</> &mdash; finish construction of
a generic xlog record.
</para>
</listitem>
</orderedlist>
</para>
<para>
The xlog record construction can be canceled between any of the above
steps by calling <function>GenericXLogAbort()</>. This will discard all
changes to the page image copies.
</para>
<para>
Please note the following points when constructing generic xlog records:
<itemizedlist>
<listitem>
<para>
No direct modifications of page images are allowed! All modifications
must be done in copies acquired from <function>GenericXLogRegister()</>.
In other words, code which makes generic xlog records must never call
<function>BufferGetPage()</>.
</para>
</listitem>
<listitem>
<para>
Registrations of buffers (step 2) and modifications of page images
(step 3) can be mixed freely, i.e., both steps may be repeated in any
sequence. The only restriction is that you can modify a page image
only after the registration of the corresponding buffer.
</para>
</listitem>
<listitem>
<para>
After registration, the buffer can also be unregistered by calling
<function>GenericXLogUnregister(buffer)</>. In this case, the changes
made to that particular page image copy will be discarded.
</para>
</listitem>
<listitem>
<para>
Generic xlog assumes that pages are using standard layout. I.e., all
information between pd_lower and pd_upper will be discarded.
</para>
</listitem>
<listitem>
<para>
The maximum number of buffers that can be simultaneously registered
for a generic xlog is <literal>MAX_GENERIC_XLOG_PAGES</>. An error will
be thrown if this limit is exceeded.
</para>
</listitem>
<listitem>
<para>
Since you modify copies of page images, <function>GenericXLogStart()</>
does not start a critical section. Thus, you can do memory allocation,
error throwing, etc. between <function>GenericXLogStart()</> and
<function>GenericXLogFinish()</>. The actual critical section is present
inside <function>GenericXLogFinish()</>.
</para>
</listitem>
<listitem>
<para>
<function>GenericXLogFinish()</> takes care of marking buffers as dirty
and setting their LSNs. You do not need to do this explicitly.
</para>
</listitem>
<listitem>
<para>
For unlogged relations, everything works the same except there is no
WAL record produced. Thus, you typically do not need to do any explicit
checks for unlogged relations.
</para>
</listitem>
<listitem>
<para>
If a registered buffer is not new, the generic xlog record contains
a delta between the old and the new page images. This delta is produced
using per byte comparison. The current delta mechanism is not effective
for moving data within a page and may be improved in the future.
</para>
</listitem>
<listitem>
<para>
The generic xlog redo function will acquire exclusive locks to buffers
in the same order as they were registered. After redoing all changes,
the locks will be released in the same order.
</para>
</listitem>
</itemizedlist>
</para>
</chapter>

View File

@ -247,6 +247,7 @@
&custom-scan;
&geqo;
&indexam;
&generic-wal;
&gist;
&spgist;
&gin;

View File

@ -8,9 +8,9 @@ subdir = src/backend/access/rmgrdesc
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
gindesc.o gistdesc.o hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o \
relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,58 @@
/*-------------------------------------------------------------------------
*
* genericdesc.c
* rmgr descriptor routines for access/transam/generic_xlog.c
*
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/rmgrdesc/genericdesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/generic_xlog.h"
#include "lib/stringinfo.h"
#include "storage/relfilenode.h"
/*
* Description of generic xlog record: write page regions that this record
* overrides.
*/
void
generic_desc(StringInfo buf, XLogReaderState *record)
{
Pointer ptr = XLogRecGetData(record),
end = ptr + XLogRecGetDataLen(record);
while (ptr < end)
{
OffsetNumber offset,
length;
memcpy(&offset, ptr, sizeof(offset));
ptr += sizeof(offset);
memcpy(&length, ptr, sizeof(length));
ptr += sizeof(length);
ptr += length;
if (ptr < end)
appendStringInfo(buf, "offset %u, length %u; ", offset, length);
else
appendStringInfo(buf, "offset %u, length %u", offset, length);
}
return;
}
/*
* Identification of generic xlog record: we don't distinguish any subtypes
* inside generic xlog records.
*/
const char *
generic_identify(uint8 info)
{
return "Generic";
}

View File

@ -12,8 +12,8 @@ subdir = src/backend/access/transam
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \
subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
xact.o xlog.o xlogarchive.o xlogfuncs.o \
xloginsert.o xlogreader.o xlogutils.o

View File

@ -0,0 +1,431 @@
/*-------------------------------------------------------------------------
*
* generic_xlog.c
* Implementation of generic xlog records.
*
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/transam/generic_xlog.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/generic_xlog.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "utils/memutils.h"
/*-------------------------------------------------------------------------
* Internally, a delta between pages consists of a set of fragments. Each
* fragment represents changes made in a given region of a page. A fragment
* is made up as follows:
*
* - offset of page region (OffsetNumber)
* - length of page region (OffsetNumber)
* - data - the data to place into the region ('length' number of bytes)
*
* Unchanged regions of a page are not represented in its delta. As a
* result, a delta can be more compact than the full page image. But having
* an unchanged region in the middle of two fragments that is smaller than
* the fragment header (offset and length) does not pay off in terms of the
* overall size of the delta. For this reason, we break fragments only if
* the unchanged region is bigger than MATCH_THRESHOLD.
*
* The worst case for delta sizes occurs when we did not find any unchanged
* region in the page. The size of the delta will be the size of the page plus
* the size of the fragment header in that case.
*-------------------------------------------------------------------------
*/
#define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber))
#define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE
#define MAX_DELTA_SIZE BLCKSZ + FRAGMENT_HEADER_SIZE
/* Struct of generic xlog data for single page */
typedef struct
{
Buffer buffer; /* registered buffer */
char image[BLCKSZ]; /* copy of page image for modification */
char data[MAX_DELTA_SIZE]; /* delta between page images */
int dataLen; /* space consumed in data field */
bool fullImage; /* are we taking a full image of this page? */
} PageData;
/* State of generic xlog record construction */
struct GenericXLogState
{
bool isLogged;
PageData pages[MAX_GENERIC_XLOG_PAGES];
};
static void writeFragment(PageData *pageData, OffsetNumber offset,
OffsetNumber len, Pointer data);
static void writeDelta(PageData *pageData);
static void applyPageRedo(Page page, Pointer data, Size dataSize);
/*
* Write next fragment into delta.
*/
static void
writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
Pointer data)
{
Pointer ptr = pageData->data + pageData->dataLen;
/* Check if we have enough space */
Assert(pageData->dataLen + sizeof(offset) +
sizeof(length) + length <= sizeof(pageData->data));
/* Write fragment data */
memcpy(ptr, &offset, sizeof(offset));
ptr += sizeof(offset);
memcpy(ptr, &length, sizeof(length));
ptr += sizeof(length);
memcpy(ptr, data, length);
ptr += length;
pageData->dataLen = ptr - pageData->data;
}
/*
* Make delta for given page.
*/
static void
writeDelta(PageData *pageData)
{
Page page = BufferGetPage(pageData->buffer),
image = (Page) pageData->image;
int i,
fragmentBegin = -1,
fragmentEnd = -1;
uint16 pageLower = ((PageHeader) page)->pd_lower,
pageUpper = ((PageHeader) page)->pd_upper,
imageLower = ((PageHeader) image)->pd_lower,
imageUpper = ((PageHeader) image)->pd_upper;
for (i = 0; i < BLCKSZ; i++)
{
bool match;
/*
* Check if bytes in old and new page images match. We do not care
* about data in the unallocated area between pd_lower and pd_upper.
* We assume the unallocated area to expand with unmatched bytes.
* Bytes inside the unallocated area are assumed to always match.
*/
if (i < pageLower)
{
if (i < imageLower)
match = (page[i] == image[i]);
else
match = false;
}
else if (i >= pageUpper)
{
if (i >= imageUpper)
match = (page[i] == image[i]);
else
match = false;
}
else
{
match = true;
}
if (match)
{
if (fragmentBegin >= 0)
{
/* Matched byte is potentially part of a fragment. */
if (fragmentEnd < 0)
fragmentEnd = i;
/*
* Write next fragment if sequence of matched bytes is longer
* than MATCH_THRESHOLD.
*/
if (i - fragmentEnd >= MATCH_THRESHOLD)
{
writeFragment(pageData, fragmentBegin,
fragmentEnd - fragmentBegin,
page + fragmentBegin);
fragmentBegin = -1;
fragmentEnd = -1;
}
}
}
else
{
/* On unmatched byte, start new fragment if it is not done yet */
if (fragmentBegin < 0)
fragmentBegin = i;
fragmentEnd = -1;
}
}
if (fragmentBegin >= 0)
writeFragment(pageData, fragmentBegin,
BLCKSZ - fragmentBegin,
page + fragmentBegin);
#ifdef WAL_DEBUG
/*
* If xlog debug is enabled, then check produced delta. Result of delta
* application to saved image should be the same as current page state.
*/
if (XLOG_DEBUG)
{
char tmp[BLCKSZ];
memcpy(tmp, image, BLCKSZ);
applyPageRedo(tmp, pageData->data, pageData->dataLen);
if (memcmp(tmp, page, pageLower)
|| memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper))
elog(ERROR, "result of generic xlog apply does not match");
}
#endif
}
/*
* Start new generic xlog record.
*/
GenericXLogState *
GenericXLogStart(Relation relation)
{
int i;
GenericXLogState *state;
state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
state->isLogged = RelationNeedsWAL(relation);
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
state->pages[i].buffer = InvalidBuffer;
return state;
}
/*
* Register new buffer for generic xlog record.
*/
Page
GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
{
int block_id;
/* Place new buffer to unused slot in array */
for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
{
PageData *page = &state->pages[block_id];
if (BufferIsInvalid(page->buffer))
{
page->buffer = buffer;
memcpy(page->image, BufferGetPage(buffer), BLCKSZ);
page->dataLen = 0;
page->fullImage = isNew;
return (Page)page->image;
}
else if (page->buffer == buffer)
{
/*
* Buffer is already registered. Just return the image, which is
* already prepared.
*/
return (Page)page->image;
}
}
elog(ERROR, "maximum number of %d generic xlog buffers is exceeded",
MAX_GENERIC_XLOG_PAGES);
/* keep compiler quiet */
return NULL;
}
/*
* Unregister particular buffer for generic xlog record.
*/
void
GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
{
int block_id;
/* Find block in array to unregister */
for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
{
if (state->pages[block_id].buffer == buffer)
{
/*
* Preserve order of pages in array because it could matter for
* concurrency.
*/
memmove(&state->pages[block_id], &state->pages[block_id + 1],
(MAX_GENERIC_XLOG_PAGES - block_id - 1) * sizeof(PageData));
state->pages[MAX_GENERIC_XLOG_PAGES - 1].buffer = InvalidBuffer;
return;
}
}
elog(ERROR, "registered generic xlog buffer not found");
}
/*
* Put all changes in registered buffers to generic xlog record.
*/
XLogRecPtr
GenericXLogFinish(GenericXLogState *state)
{
XLogRecPtr lsn = InvalidXLogRecPtr;
int i;
if (state->isLogged)
{
/* Logged relation: make xlog record in critical section. */
XLogBeginInsert();
START_CRIT_SECTION();
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{
char tmp[BLCKSZ];
PageData *page = &state->pages[i];
if (BufferIsInvalid(page->buffer))
continue;
/* Swap current and saved page image. */
memcpy(tmp, page->image, BLCKSZ);
memcpy(page->image, BufferGetPage(page->buffer), BLCKSZ);
memcpy(BufferGetPage(page->buffer), tmp, BLCKSZ);
if (page->fullImage)
{
/* A full page image does not require anything special */
XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE);
}
else
{
/*
* In normal mode, calculate delta and write it as data
* associated with this page.
*/
XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD);
writeDelta(page);
XLogRegisterBufData(i, page->data, page->dataLen);
}
}
/* Insert xlog record */
lsn = XLogInsert(RM_GENERIC_ID, 0);
/* Set LSN and mark buffers dirty */
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{
PageData *page = &state->pages[i];
if (BufferIsInvalid(page->buffer))
continue;
PageSetLSN(BufferGetPage(page->buffer), lsn);
MarkBufferDirty(page->buffer);
}
END_CRIT_SECTION();
}
else
{
/* Unlogged relation: skip xlog-related stuff */
START_CRIT_SECTION();
for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
{
PageData *page = &state->pages[i];
if (BufferIsInvalid(page->buffer))
continue;
memcpy(BufferGetPage(page->buffer), page->image, BLCKSZ);
MarkBufferDirty(page->buffer);
}
END_CRIT_SECTION();
}
pfree(state);
return lsn;
}
/*
* Abort generic xlog record.
*/
void
GenericXLogAbort(GenericXLogState *state)
{
pfree(state);
}
/*
* Apply delta to given page image.
*/
static void
applyPageRedo(Page page, Pointer data, Size dataSize)
{
Pointer ptr = data, end = data + dataSize;
while (ptr < end)
{
OffsetNumber offset,
length;
memcpy(&offset, ptr, sizeof(offset));
ptr += sizeof(offset);
memcpy(&length, ptr, sizeof(length));
ptr += sizeof(length);
memcpy(page + offset, ptr, length);
ptr += length;
}
}
/*
* Redo function for generic xlog record.
*/
void
generic_redo(XLogReaderState *record)
{
uint8 block_id;
Buffer buffers[MAX_GENERIC_XLOG_PAGES] = {InvalidBuffer};
XLogRecPtr lsn = record->EndRecPtr;
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
/* Iterate over blocks */
for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
XLogRedoAction action;
if (!XLogRecHasBlockRef(record, block_id))
continue;
action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
/* Apply redo to given block if needed */
if (action == BLK_NEEDS_REDO)
{
Pointer blockData;
Size blockDataSize;
Page page;
page = BufferGetPage(buffers[block_id]);
blockData = XLogRecGetBlockData(record, block_id, &blockDataSize);
applyPageRedo(page, blockData, blockDataSize);
PageSetLSN(page, lsn);
MarkBufferDirty(buffers[block_id]);
}
}
/* Changes are done: unlock and release all buffers */
for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
if (BufferIsValid(buffers[block_id]))
UnlockReleaseBuffer(buffers[block_id]);
}
}

View File

@ -11,6 +11,7 @@
#include "access/commit_ts.h"
#include "access/gin.h"
#include "access/gist_private.h"
#include "access/generic_xlog.h"
#include "access/hash.h"
#include "access/heapam_xlog.h"
#include "access/brin_xlog.h"

View File

@ -143,6 +143,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
case RM_GENERIC_ID:
/* just deal with xid, and done */
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
buf.origptr);

View File

@ -4,6 +4,7 @@
/clogdesc.c
/committsdesc.c
/dbasedesc.c
/genericdesc.c
/gindesc.c
/gistdesc.c
/hashdesc.c

View File

@ -11,6 +11,7 @@
#include "access/brin_xlog.h"
#include "access/clog.h"
#include "access/commit_ts.h"
#include "access/generic_xlog.h"
#include "access/gin.h"
#include "access/gist_private.h"
#include "access/hash.h"

View File

@ -0,0 +1,42 @@
/*-------------------------------------------------------------------------
*
* generic_xlog.h
* Generic xlog API definition.
*
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/access/generic_xlog.h
*
*-------------------------------------------------------------------------
*/
#ifndef GENERIC_XLOG_H
#define GENERIC_XLOG_H
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "storage/bufpage.h"
#include "utils/rel.h"
#define MAX_GENERIC_XLOG_PAGES XLR_NORMAL_MAX_BLOCK_ID
/* state of generic xlog record construction */
struct GenericXLogState;
typedef struct GenericXLogState GenericXLogState;
/* API for construction of generic xlog records */
extern GenericXLogState *GenericXLogStart(Relation relation);
extern Page GenericXLogRegister(GenericXLogState *state, Buffer buffer,
bool isNew);
extern void GenericXLogUnregister(GenericXLogState *state, Buffer buffer);
extern XLogRecPtr GenericXLogFinish(GenericXLogState *state);
extern void GenericXLogAbort(GenericXLogState *state);
/* functions defined for rmgr */
extern void generic_redo(XLogReaderState *record);
extern const char *generic_identify(uint8 info);
extern void generic_desc(StringInfo buf, XLogReaderState *record);
#endif /* GENERIC_XLOG_H */

View File

@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)

View File

@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD089 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{