Add new wal_level, logical, sufficient for logical decoding.

When wal_level=logical, we'll log columns from the old tuple as
configured by the REPLICA IDENTITY facility added in commit
07cacba983.  This makes it possible
a properly-configured logical replication solution to correctly
follow table updates even if they change the chosen key columns,
or, with REPLICA IDENTITY FULL, even if the table has no key at
all.  Note that updates which do not modify the replica identity
column won't log anything extra, making the choice of a good key
(i.e. one that will rarely be changed) important to performance
when wal_level=logical is configured.

Each insert, update, or delete to a catalog table will also log
the CMIN and/or CMAX values of stamped by the current transaction.
This is necessary because logical decoding will require access to
historical snapshots of the catalog in order to decode some data
types, and the CMIN/CMAX values that we may need in order to judge
row visibility may have been overwritten by the time we need them.

Andres Freund, reviewed in various versions by myself, Heikki
Linnakangas, KONDO Mitsumasa, and many others.
This commit is contained in:
Robert Haas 2013-12-10 18:33:45 -05:00
parent 9ec6199d18
commit e55704d8b2
22 changed files with 746 additions and 158 deletions

View File

@ -587,7 +587,7 @@ tar -cf backup.tar /usr/local/pgsql/data
<para>
To enable WAL archiving, set the <xref linkend="guc-wal-level">
configuration parameter to <literal>archive</> (or <literal>hot_standby</>),
configuration parameter to <literal>archive</> or higher,
<xref linkend="guc-archive-mode"> to <literal>on</>,
and specify the shell command to use in the <xref
linkend="guc-archive-command"> configuration parameter. In practice
@ -1259,7 +1259,7 @@ restore_command = 'cp /mnt/server/archivedir/%f %p'
If more flexibility in copying the backup files is needed, a lower
level process can be used for standalone hot backups as well.
To prepare for low level standalone hot backups, set <varname>wal_level</> to
<literal>archive</> (or <literal>hot_standby</>), <varname>archive_mode</> to
<literal>archive</> or higher, <varname>archive_mode</> to
<literal>on</>, and set up an <varname>archive_command</> that performs
archiving only when a <emphasis>switch file</> exists. For example:
<programlisting>

View File

@ -1648,10 +1648,12 @@ include 'filename'
<varname>wal_level</> determines how much information is written
to the WAL. The default value is <literal>minimal</>, which writes
only the information needed to recover from a crash or immediate
shutdown. <literal>archive</> adds logging required for WAL archiving,
and <literal>hot_standby</> further adds information required to run
read-only queries on a standby server.
This parameter can only be set at server start.
shutdown. <literal>archive</> adds logging required for WAL archiving;
<literal>hot_standby</> further adds information required to run
read-only queries on a standby server; and, finally
<literal>logical</> adds information necessary to support logical
decoding. Each level includes the information logged at all lower
levels. This parameter can only be set at server start.
</para>
<para>
In <literal>minimal</> level, WAL-logging of some bulk
@ -1665,24 +1667,30 @@ include 'filename'
<member><command>COPY</> into tables that were created or truncated in the same
transaction</member>
</simplelist>
But minimal WAL does not contain
enough information to reconstruct the data from a base backup and the
WAL logs, so either <literal>archive</> or <literal>hot_standby</>
level must be used to enable
WAL archiving (<xref linkend="guc-archive-mode">) and streaming
replication.
But minimal WAL does not contain enough information to reconstruct the
data from a base backup and the WAL logs, so <literal>archive</> or
higher must be used to enable WAL archiving
(<xref linkend="guc-archive-mode">) and streaming replication.
</para>
<para>
In <literal>hot_standby</> level, the same information is logged as
with <literal>archive</>, plus information needed to reconstruct
the status of running transactions from the WAL. To enable read-only
queries on a standby server, <varname>wal_level</> must be set to
<literal>hot_standby</> on the primary, and
<literal>hot_standby</> or higher on the primary, and
<xref linkend="guc-hot-standby"> must be enabled in the standby. It is
thought that there is
little measurable difference in performance between using
<literal>hot_standby</> and <literal>archive</> levels, so feedback
is welcome if any production impacts are noticeable.
thought that there is little measurable difference in performance
between using <literal>hot_standby</> and <literal>archive</> levels,
so feedback is welcome if any production impacts are noticeable.
</para>
<para>
In <literal>logical</> level, the same information is logged as
with <literal>hot_standby</>, plus information needed to allow
extracting logical changesets from the WAL. Using a level of
<literal>logical</> will increase the WAL volume, particularly if many
tables are configured for <literal>REPLICA IDENTITY FULL</literal> and
many <command>UPDATE</> and <command>DELETE</> statements are
executed.
</para>
</listitem>
</varlistentry>
@ -2239,9 +2247,9 @@ include 'filename'
disabled. WAL sender processes count towards the total number
of connections, so the parameter cannot be set higher than
<xref linkend="guc-max-connections">. This parameter can only
be set at server start. <varname>wal_level</> must be set
to <literal>archive</> or <literal>hot_standby</> to allow
connections from standby servers.
be set at server start. <varname>wal_level</> must be set to
<literal>archive</> or higher to allow connections from standby
servers.
</para>
</listitem>
</varlistentry>

View File

@ -1861,8 +1861,9 @@ LOG: database system is ready to accept read only connections
Consistency information is recorded once per checkpoint on the primary.
It is not possible to enable hot standby when reading WAL
written during a period when <varname>wal_level</> was not set to
<literal>hot_standby</> on the primary. Reaching a consistent state can
also be delayed in the presence of both of these conditions:
<literal>hot_standby</> or <literal>logical</> on the primary. Reaching
a consistent state can also be delayed in the presence of both of these
conditions:
<itemizedlist>
<listitem>

View File

@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup,
HeapTuple newtup, bool all_visible_cleared,
bool new_all_visible_cleared);
HeapTuple newtup, HeapTuple old_key_tup,
bool all_visible_cleared, bool new_all_visible_cleared);
static void HeapSatisfiesHOTandKeyUpdate(Relation relation,
Bitmapset *hot_attrs, Bitmapset *key_attrs,
bool *satisfies_hot, bool *satisfies_key,
HeapTuple oldtup, HeapTuple newtup);
Bitmapset *hot_attrs,
Bitmapset *key_attrs, Bitmapset *id_attrs,
bool *satisfies_hot, bool *satisfies_key,
bool *satisfies_id,
HeapTuple oldtup, HeapTuple newtup);
static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
uint16 old_infomask2, TransactionId add_to_xmax,
LockTupleMode mode, bool is_update,
@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
static bool ConditionalMultiXactIdWait(MultiXactId multi,
MultiXactStatus status, int *remaining,
uint16 infomask);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified,
bool *copy);
/*
@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
XLogRecData rdata[3];
XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
bool need_tuple_data;
xlrec.all_visible_cleared = all_visible_cleared;
/*
* For logical decoding, we need the tuple even if we're doing a
* full page write, so make sure to log it separately. (XXX We could
* alternatively store a pointer into the FPW).
*
* Also, if this is a catalog, we need to transmit combocids to
* properly decode, so log that as well.
*/
need_tuple_data = RelationIsLogicallyLogged(relation);
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, heaptup);
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
rdata[0].data = (char *) &xlrec;
@ -2126,17 +2144,35 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
rdata[1].data = (char *) &xlhdr;
rdata[1].len = SizeOfHeapHeader;
rdata[1].buffer = buffer;
rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
rdata[2].buffer = buffer;
rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
/*
* Make a separate rdata entry for the tuple's buffer if we're
* doing logical decoding, so that an eventual FPW doesn't
* remove the tuple's data.
*/
if (need_tuple_data)
{
rdata[2].next = &(rdata[3]);
rdata[3].data = NULL;
rdata[3].len = 0;
rdata[3].buffer = buffer;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
}
/*
* If this is the single and first tuple on page, we can reinit the
* page instead of restoring the whole thing. Set flag, and hide
@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
Page page;
bool needwal;
Size saveFreeSpace;
bool need_tuple_data = RelationIsLogicallyLogged(relation);
bool need_cids = RelationIsAccessibleInLogicalDecoding(relation);
needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
{
XLogRecPtr recptr;
xl_heap_multi_insert *xlrec;
XLogRecData rdata[2];
XLogRecData rdata[3];
uint8 info = XLOG_HEAP2_MULTI_INSERT;
char *tupledata;
int totaldatalen;
@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
/* the rest of the scratch space is used for tuple data */
tupledata = scratchptr;
xlrec->all_visible_cleared = all_visible_cleared;
xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec->node = relation->rd_node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntuples = nthispage;
@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
datalen);
tuphdr->datalen = datalen;
scratchptr += datalen;
/*
* We don't use heap_multi_insert for catalog tuples yet, but
* better be prepared...
*/
if (need_cids)
log_heap_new_cid(relation, heaptup);
}
totaldatalen = scratchptr - tupledata;
Assert((scratchptr - scratch) < BLCKSZ);
@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
rdata[1].data = tupledata;
rdata[1].len = totaldatalen;
rdata[1].buffer = buffer;
rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
/*
* Make a separate rdata entry for the tuple's buffer if
* we're doing logical decoding, so that an eventual FPW
* doesn't remove the tuple's data.
*/
if (need_tuple_data)
{
rdata[1].next = &(rdata[2]);
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
}
/*
* If we're going to reinitialize the whole page using the WAL
* record, hide buffer reference from XLogInsert.
*/
if (init)
{
rdata[1].buffer = InvalidBuffer;
rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
info |= XLOG_HEAP_INIT_PAGE;
}
@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid,
bool have_tuple_lock = false;
bool iscombo;
bool all_visible_cleared = false;
HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */
bool old_key_copied = false;
Assert(ItemPointerIsValid(tid));
@ -2734,6 +2798,12 @@ l1:
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
/*
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
*/
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
START_CRIT_SECTION();
/*
@ -2786,9 +2856,13 @@ l1:
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
XLogRecData rdata[4];
xlrec.all_visible_cleared = all_visible_cleared;
/* For logical decode we need combocids to properly decode the catalog */
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, &tp);
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
xlrec.target.node = relation->rd_node;
@ -2805,6 +2879,37 @@ l1:
rdata[1].buffer_std = true;
rdata[1].next = NULL;
/*
* Log replica identity of the deleted tuple if there is one
*/
if (old_key_tuple != NULL)
{
xl_heap_header xlhdr;
xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
rdata[1].next = &(rdata[2]);
rdata[2].data = (char*)&xlhdr;
rdata[2].len = SizeOfHeapHeader;
rdata[2].buffer = InvalidBuffer;
rdata[2].next = NULL;
rdata[2].next = &(rdata[3]);
rdata[3].data = (char *) old_key_tuple->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = old_key_tuple->t_len
- offsetof(HeapTupleHeaderData, t_bits);
rdata[3].buffer = InvalidBuffer;
rdata[3].next = NULL;
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
else
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
}
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
PageSetLSN(page, recptr);
@ -2850,6 +2955,9 @@ l1:
pgstat_count_heap_delete(relation);
if (old_key_tuple != NULL && old_key_copied)
heap_freetuple(old_key_tuple);
return HeapTupleMayBeUpdated;
}
@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
TransactionId xid = GetCurrentTransactionId();
Bitmapset *hot_attrs;
Bitmapset *key_attrs;
Bitmapset *id_attrs;
ItemId lp;
HeapTupleData oldtup;
HeapTuple heaptup;
HeapTuple old_key_tuple = NULL;
bool old_key_copied = false;
Page page;
BlockNumber block;
MultiXactStatus mxact_status;
@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
bool iscombo;
bool satisfies_hot;
bool satisfies_key;
bool satisfies_id;
bool use_hot_update = false;
bool key_intact;
bool all_visible_cleared = false;
@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
* Note that we get a copy here, so we need not worry about relcache flush
* happening midway through.
*/
hot_attrs = RelationGetIndexAttrBitmap(relation, false);
key_attrs = RelationGetIndexAttrBitmap(relation, true);
hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL);
key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
id_attrs = RelationGetIndexAttrBitmap(relation,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
block = ItemPointerGetBlockNumber(otid);
buffer = ReadBuffer(relation, block);
@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
* is updates that don't manipulate key columns, not those that
* serendipitiously arrive at the same key values.
*/
HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs,
HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs,
&satisfies_hot, &satisfies_key,
&oldtup, newtup);
&satisfies_id, &oldtup, newtup);
if (satisfies_key)
{
*lockmode = LockTupleNoKeyExclusive;
@ -3514,6 +3628,14 @@ l2:
PageSetFull(page);
}
/*
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
* ExtractReplicaIdentity() will return NULL if nothing needs to be
* logged.
*/
old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied);
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@ -3589,11 +3711,23 @@ l2:
/* XLOG stuff */
if (RelationNeedsWAL(relation))
{
XLogRecPtr recptr = log_heap_update(relation, buffer,
newbuf, &oldtup, heaptup,
all_visible_cleared,
all_visible_cleared_new);
XLogRecPtr recptr;
/*
* For logical decoding we need combocids to properly decode the
* catalog.
*/
if (RelationIsAccessibleInLogicalDecoding(relation))
{
log_heap_new_cid(relation, &oldtup);
log_heap_new_cid(relation, heaptup);
}
recptr = log_heap_update(relation, buffer,
newbuf, &oldtup, heaptup,
old_key_tuple,
all_visible_cleared,
all_visible_cleared_new);
if (newbuf != buffer)
{
PageSetLSN(BufferGetPage(newbuf), recptr);
@ -3644,6 +3778,9 @@ l2:
heap_freetuple(heaptup);
}
if (old_key_tuple != NULL && old_key_copied)
heap_freetuple(old_key_tuple);
bms_free(hot_attrs);
bms_free(key_attrs);
@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
/*
* Check which columns are being updated.
*
* This simultaneously checks conditions for HOT updates and for FOR KEY
* SHARE updates. Since much of the time they will be checking very similar
* sets of columns, and doing the same tests on them, it makes sense to
* optimize and do them together.
* This simultaneously checks conditions for HOT updates, for FOR KEY
* SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they
* will be checking very similar sets of columns, and doing the same tests on
* them, it makes sense to optimize and do them together.
*
* We receive two bitmapsets comprising the two sets of columns we're
* We receive three bitmapsets comprising the three sets of columns we're
* interested in. Note these are destructively modified; that is OK since
* this is invoked at most once in heap_update.
*
* hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not
* modified indexed columns); key_result is set to TRUE if the update does not
* modify columns used in the key.
* modify columns used in the key; id_result is set to TRUE if the update does
* not modify columns in any index marked as the REPLICA IDENTITY.
*/
static void
HeapSatisfiesHOTandKeyUpdate(Relation relation,
Bitmapset *hot_attrs, Bitmapset *key_attrs,
HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs,
Bitmapset *key_attrs, Bitmapset *id_attrs,
bool *satisfies_hot, bool *satisfies_key,
bool *satisfies_id,
HeapTuple oldtup, HeapTuple newtup)
{
int next_hot_attnum;
int next_key_attnum;
int next_id_attnum;
bool hot_result = true;
bool key_result = true;
bool key_done = false;
bool hot_done = false;
bool id_result = true;
/* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */
Assert(bms_is_subset(id_attrs, key_attrs));
Assert(bms_is_subset(key_attrs, hot_attrs));
/*
* If one of these sets contains no remaining bits, bms_first_member will
* return -1, and after adding FirstLowInvalidHeapAttributeNumber (which
* is negative!) we'll get an attribute number that can't possibly be
* real, and thus won't match any actual attribute number.
*/
next_hot_attnum = bms_first_member(hot_attrs);
if (next_hot_attnum == -1)
hot_done = true;
else
/* Adjust for system attributes */
next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
next_key_attnum = bms_first_member(key_attrs);
if (next_key_attnum == -1)
key_done = true;
else
/* Adjust for system attributes */
next_key_attnum += FirstLowInvalidHeapAttributeNumber;
next_key_attnum += FirstLowInvalidHeapAttributeNumber;
next_id_attnum = bms_first_member(id_attrs);
next_id_attnum += FirstLowInvalidHeapAttributeNumber;
for (;;)
{
int check_now;
bool changed;
int check_now;
/* both bitmapsets are now empty */
if (key_done && hot_done)
/*
* Since the HOT attributes are a superset of the key attributes and
* the key attributes are a superset of the id attributes, this logic
* is guaranteed to identify the next column that needs to be
* checked.
*/
if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber)
check_now = next_hot_attnum;
else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber)
check_now = next_key_attnum;
else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber)
check_now = next_id_attnum;
else
break;
/* XXX there's probably an easier way ... */
if (hot_done)
check_now = next_key_attnum;
if (key_done)
check_now = next_hot_attnum;
else
check_now = Min(next_hot_attnum, next_key_attnum);
/* See whether it changed. */
changed = !heap_tuple_attr_equals(RelationGetDescr(relation),
check_now, oldtup, newtup);
if (changed)
@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation,
hot_result = false;
if (check_now == next_key_attnum)
key_result = false;
if (check_now == next_id_attnum)
id_result = false;
/* if all are false now, we can stop checking */
if (!hot_result && !key_result && !id_result)
break;
}
/* if both are false now, we can stop checking */
if (!hot_result && !key_result)
break;
if (check_now == next_hot_attnum)
/*
* Advance the next attribute numbers for the sets that contain
* the attribute we just checked. As we work our way through the
* columns, the next_attnum values will rise; but when each set
* becomes empty, bms_first_member() will return -1 and the attribute
* number will end up with a value less than
* FirstLowInvalidHeapAttributeNumber.
*/
if (hot_result && check_now == next_hot_attnum)
{
next_hot_attnum = bms_first_member(hot_attrs);
if (next_hot_attnum == -1)
hot_done = true;
else
/* Adjust for system attributes */
next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
}
if (check_now == next_key_attnum)
if (key_result && check_now == next_key_attnum)
{
next_key_attnum = bms_first_member(key_attrs);
if (next_key_attnum == -1)
key_done = true;
else
/* Adjust for system attributes */
next_key_attnum += FirstLowInvalidHeapAttributeNumber;
next_key_attnum += FirstLowInvalidHeapAttributeNumber;
}
if (id_result && check_now == next_id_attnum)
{
next_id_attnum = bms_first_member(id_attrs);
next_id_attnum += FirstLowInvalidHeapAttributeNumber;
}
}
*satisfies_hot = hot_result;
*satisfies_key = key_result;
*satisfies_id = id_result;
}
/*
@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
static XLogRecPtr
log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
HeapTuple old_key_tuple,
bool all_visible_cleared, bool new_all_visible_cleared)
{
xl_heap_update xlrec;
xl_heap_header xlhdr;
xl_heap_header_len xlhdr;
xl_heap_header_len xlhdr_idx;
uint8 info;
XLogRecPtr recptr;
XLogRecData rdata[4];
XLogRecData rdata[7];
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
oldtup->t_data->t_infomask2);
xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
xlrec.all_visible_cleared = all_visible_cleared;
xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
xlrec.newtid = newtup->t_self;
xlrec.new_all_visible_cleared = new_all_visible_cleared;
if (new_all_visible_cleared)
xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
xlhdr.t_infomask = newtup->t_data->t_infomask;
xlhdr.t_hoff = newtup->t_data->t_hoff;
xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
xlhdr.header.t_infomask = newtup->t_data->t_infomask;
xlhdr.header.t_hoff = newtup->t_data->t_hoff;
xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
/*
* As with insert records, we need not store the rdata[2] segment if we
* decide to store the whole buffer instead.
* As with insert records, we need not store the rdata[2] segment
* if we decide to store the whole buffer instead unless we're
* doing logical decoding.
*/
rdata[2].data = (char *) &xlhdr;
rdata[2].len = SizeOfHeapHeader;
rdata[2].buffer = newbuf;
rdata[2].len = SizeOfHeapHeaderLen;
rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[3].data = (char *) newtup->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
rdata[3].buffer = newbuf;
rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
/*
* Separate storage for the FPW buffer reference of the new page in the
* wal_level >= logical case.
*/
if (need_tuple_data)
{
rdata[3].next = &(rdata[4]);
rdata[4].data = NULL,
rdata[4].len = 0;
rdata[4].buffer = newbuf;
rdata[4].buffer_std = true;
rdata[4].next = NULL;
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
/* We need to log a tuple identity */
if (old_key_tuple)
{
/* don't really need this, but its more comfy to decode */
xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2;
xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask;
xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
xlhdr_idx.t_len = old_key_tuple->t_len;
rdata[4].next = &(rdata[5]);
rdata[5].data = (char *) &xlhdr_idx;
rdata[5].len = SizeOfHeapHeaderLen;
rdata[5].buffer = InvalidBuffer;
rdata[5].next = &(rdata[6]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[6].data = (char *) old_key_tuple->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
rdata[6].len = old_key_tuple->t_len
- offsetof(HeapTupleHeaderData, t_bits);
rdata[6].buffer = InvalidBuffer;
rdata[6].next = NULL;
if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
else
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
}
}
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
XLogRecData *rcur = &rdata[2];
info |= XLOG_HEAP_INIT_PAGE;
rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
while (rcur != NULL)
{
rcur->buffer = InvalidBuffer;
rcur = rcur->next;
}
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@ -6339,6 +6552,184 @@ log_newpage_buffer(Buffer buffer, bool page_std)
return log_newpage(&rnode, forkNum, blkno, page, page_std);
}
/*
* Perform XLogInsert of a XLOG_HEAP2_NEW_CID record
*
* This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
* tuples.
*/
static XLogRecPtr
log_heap_new_cid(Relation relation, HeapTuple tup)
{
xl_heap_new_cid xlrec;
XLogRecPtr recptr;
XLogRecData rdata[1];
HeapTupleHeader hdr = tup->t_data;
Assert(ItemPointerIsValid(&tup->t_self));
Assert(tup->t_tableOid != InvalidOid);
xlrec.top_xid = GetTopTransactionId();
xlrec.target.node = relation->rd_node;
xlrec.target.tid = tup->t_self;
/*
* If the tuple got inserted & deleted in the same TX we definitely have a
* combocid, set cmin and cmax.
*/
if (hdr->t_infomask & HEAP_COMBOCID)
{
Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID));
xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
}
/* No combocid, so only cmin or cmax can be set by this TX */
else
{
/*
* Tuple inserted.
*
* We need to check for LOCK ONLY because multixacts might be
* transferred to the new tuple in case of FOR KEY SHARE updates in
* which case there will be a xmax, although the tuple just got
* inserted.
*/
if (hdr->t_infomask & HEAP_XMAX_INVALID ||
HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
{
xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
xlrec.cmax = InvalidCommandId;
}
/* Tuple from a different tx updated or deleted. */
else
{
xlrec.cmin = InvalidCommandId;
xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
}
xlrec.combocid = InvalidCommandId;
}
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapNewCid;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = NULL;
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
return recptr;
}
/*
* Build a heap tuple representing the configured REPLICA IDENTITY to represent
* the old tuple in a UPDATE or DELETE.
*
* Returns NULL if there's no need to log a identity or if there's no suitable
* key in the Relation relation.
*/
static HeapTuple
ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy)
{
TupleDesc desc = RelationGetDescr(relation);
Relation idx_rel;
TupleDesc idx_desc;
char replident = relation->rd_rel->relreplident;
HeapTuple key_tuple = NULL;
bool copy_oid = false;
bool nulls[MaxHeapAttributeNumber];
Datum values[MaxHeapAttributeNumber];
int natt;
*copy = false;
if (!RelationIsLogicallyLogged(relation))
return NULL;
if (replident == REPLICA_IDENTITY_NOTHING)
return NULL;
if (replident == REPLICA_IDENTITY_FULL)
{
/*
* When logging the entire old tuple, it very well could contain
* toasted columns. If so, force them to be inlined.
*/
if (HeapTupleHasExternal(tp))
{
*copy = true;
tp = toast_flatten_tuple(tp, RelationGetDescr(relation));
}
return tp;
}
/* if the key hasn't changed and we're only logging the key, we're done */
if (!key_changed)
return NULL;
/* needs to already have been fetched? */
if (relation->rd_indexvalid == 0)
RelationGetIndexList(relation);
if (!OidIsValid(relation->rd_replidindex))
{
elog(DEBUG4, "Could not find configured replica identity for table \"%s\"",
RelationGetRelationName(relation));
return NULL;
}
idx_rel = RelationIdGetRelation(relation->rd_replidindex);
idx_desc = RelationGetDescr(idx_rel);
/* deform tuple, so we have fast access to columns */
heap_deform_tuple(tp, desc, values, nulls);
/* set all columns to NULL, regardless of whether they actually are */
memset(nulls, 1, sizeof(nulls));
/*
* Now set all columns contained in the index to NOT NULL, they cannot
* currently be NULL.
*/
for (natt = 0; natt < idx_desc->natts; natt++)
{
int attno = idx_rel->rd_index->indkey.values[natt];
if (attno == ObjectIdAttributeNumber)
copy_oid = true;
else if (attno < 0)
elog(ERROR, "system column in index");
else
nulls[attno - 1] = false;
}
key_tuple = heap_form_tuple(desc, values, nulls);
*copy = true;
RelationClose(idx_rel);
/* XXX: we could also do this unconditionally, the space is used anyway */
if (copy_oid)
HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp));
/*
* If the tuple, which by here only contains indexed columns, still has
* toasted columns, force them to be inlined. This is somewhat unlikely
* since there's limits on the size of indexed columns, so we don't
* duplicate toast_flatten_tuple()s functionality in the above loop over
* the indexed columns, even if it would be more efficient.
*/
if (HeapTupleHasExternal(key_tuple))
{
HeapTuple oldtup = key_tuple;
key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation));
heap_freetuple(oldtup);
}
return key_tuple;
}
/*
* Handles CLEANUP_INFO
*/
@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
Buffer vmbuffer = InvalidBuffer;
@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure there is no forward chain link in t_ctid */
@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
Buffer vmbuffer = InvalidBuffer;
@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->node);
Buffer vmbuffer = InvalidBuffer;
@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
xl_heap_header xlhdr;
xl_heap_header_len xlhdr;
int hsize;
uint32 newlen;
Size freespace;
@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
if (xlrec->all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/*
@ -7164,7 +7555,7 @@ newt:;
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
if (xlrec->new_all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
@ -7222,13 +7613,13 @@ newsame:;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
newlen = record->xl_len - hsize;
Assert(newlen <= MaxHeapTupleSize);
memcpy((char *) &xlhdr,
(char *) xlrec + SizeOfHeapUpdate,
SizeOfHeapHeader);
SizeOfHeapHeaderLen);
newlen = xlhdr.t_len;
Assert(newlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
@ -7236,9 +7627,9 @@ newsame:;
(char *) xlrec + hsize,
newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
htup->t_infomask2 = xlhdr.header.t_infomask2;
htup->t_infomask = xlhdr.header.t_infomask;
htup->t_hoff = xlhdr.header.t_hoff;
HeapTupleHeaderSetXmin(htup, record->xl_xid);
HeapTupleHeaderSetCmin(htup, FirstCommandId);
@ -7250,7 +7641,7 @@ newsame:;
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_update_redo: failed to add tuple");
if (xlrec->new_all_visible_cleared)
if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_HEAP2_LOCK_UPDATED:
heap_xlog_lock_updated(lsn, record);
break;
case XLOG_HEAP2_NEW_CID:
/*
* Nothing to do on a real replay, only used during logical
* decoding.
*/
break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}

View File

@ -184,6 +184,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->infobits_set);
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_HEAP2_NEW_CID)
{
xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec;
appendStringInfo(buf, "new_cid: ");
out_target(buf, &(xlrec->target));
appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u",
xlrec->cmin, xlrec->cmax, xlrec->combocid);
}
else
appendStringInfoString(buf, "UNKNOWN");
}

View File

@ -28,6 +28,7 @@ const struct config_enum_entry wal_level_options[] = {
{"minimal", WAL_LEVEL_MINIMAL, false},
{"archive", WAL_LEVEL_ARCHIVE, false},
{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
{"logical", WAL_LEVEL_LOGICAL, false},
{NULL, 0, false}
};

View File

@ -47,6 +47,7 @@
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void)
* the prepared transaction generated xid assignment records. Test
* here must match one used in AssignTransactionId().
*/
if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
XLogLogicalInfoActive()))
overwriteOK = true;
/*

View File

@ -148,6 +148,7 @@ typedef struct TransactionStateData
int prevSecContext; /* previous SecurityRestrictionContext */
bool prevXactReadOnly; /* entry-time xact r/o state */
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@ -177,6 +178,7 @@ static TransactionStateData TopTransactionStateData = {
0, /* previous SecurityRestrictionContext */
false, /* entry-time xact r/o state */
false, /* startedInRecovery */
false, /* didLogXid */
NULL /* link to parent state block */
};
@ -394,6 +396,19 @@ GetCurrentTransactionIdIfAny(void)
return CurrentTransactionState->transactionId;
}
/*
* MarkCurrentTransactionIdLoggedIfAny
*
* Remember that the current xid - if it is assigned - now has been wal logged.
*/
void
MarkCurrentTransactionIdLoggedIfAny(void)
{
if (TransactionIdIsValid(CurrentTransactionState->transactionId))
CurrentTransactionState->didLogXid = true;
}
/*
* GetStableLatestTransactionId
*
@ -435,6 +450,7 @@ AssignTransactionId(TransactionState s)
{
bool isSubXact = (s->parent != NULL);
ResourceOwner currentOwner;
bool log_unknown_top = false;
/* Assert that caller didn't screw up */
Assert(!TransactionIdIsValid(s->transactionId));
@ -469,6 +485,20 @@ AssignTransactionId(TransactionState s)
pfree(parents);
}
/*
* When wal_level=logical, guarantee that a subtransaction's xid can only
* be seen in the WAL stream if its toplevel xid has been logged
* before. If necessary we log a xact_assignment record with fewer than
* PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
* for a transaction even though it appears in a WAL record, we just might
* superfluously log something. That can happen when an xid is included
* somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
* xl_standby_locks.
*/
if (isSubXact && XLogLogicalInfoActive() &&
!TopTransactionStateData.didLogXid)
log_unknown_top = true;
/*
* Generate a new Xid and record it in PG_PROC and pg_subtrans.
*
@ -523,6 +553,9 @@ AssignTransactionId(TransactionState s)
* top-level transaction that each subxact belongs to. This is correct in
* recovery only because aborted subtransactions are separately WAL
* logged.
*
* This is correct even for the case where several levels above us didn't
* have an xid assigned as we recursed up to them beforehand.
*/
if (isSubXact && XLogStandbyInfoActive())
{
@ -533,7 +566,8 @@ AssignTransactionId(TransactionState s)
* ensure this test matches similar one in
* RecoverPreparedTransactions()
*/
if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
log_unknown_top)
{
XLogRecData rdata[2];
xl_xact_assignment xlrec;
@ -552,13 +586,15 @@ AssignTransactionId(TransactionState s)
rdata[0].next = &rdata[1];
rdata[1].data = (char *) unreportedXids;
rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
rdata[1].len = nUnreportedXids * sizeof(TransactionId);
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
nUnreportedXids = 0;
/* mark top, not current xact as having been logged */
TopTransactionStateData.didLogXid = true;
}
}
}
@ -1737,6 +1773,7 @@ StartTransaction(void)
* initialize reported xid accounting
*/
nUnreportedXids = 0;
s->didLogXid = false;
/*
* must initialize resource-management stuff first

View File

@ -1191,6 +1191,8 @@ begin:;
*/
WALInsertSlotRelease();
MarkCurrentTransactionIdLoggedIfAny();
END_CRIT_SECTION();
/*
@ -5961,7 +5963,7 @@ CheckRequiredParameterValues(void)
{
if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
ereport(ERROR,
(errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"),
(errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"),
errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here.")));
/* We ignore autovacuum_max_workers when we make this test. */
@ -9650,7 +9652,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL level not sufficient for making an online backup"),
errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
if (strlen(backupidstr) > MAXPGPATH)
ereport(ERROR,
@ -9988,7 +9990,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL level not sufficient for making an online backup"),
errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
/*
* OK to update backup counters and forcePageWrites

View File

@ -3321,7 +3321,7 @@ reindex_relation(Oid relid, int flags)
/* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */
if (is_pg_class)
(void) RelationGetIndexAttrBitmap(rel, false);
(void) RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_ALL);
PG_TRY();
{

View File

@ -2355,7 +2355,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
* concurrency.
*/
modifiedCols = GetModifiedColumns(relinfo, estate);
keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true);
keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
INDEX_ATTR_BITMAP_KEY);
if (bms_overlap(keyCols, modifiedCols))
lockmode = LockTupleExclusive;
else

View File

@ -816,10 +816,10 @@ PostmasterMain(int argc, char *argv[])
}
if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
(errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\" or \"hot_standby\"")));
(errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
/*
* Other one-time internal sanity checks can go here, if they are fast.

View File

@ -3818,8 +3818,9 @@ RelationGetIndexPredicate(Relation relation)
* simple index keys, but attributes used in expressions and partial-index
* predicates.)
*
* If "keyAttrs" is true, only attributes that can be referenced by foreign
* keys are considered.
* Depending on attrKind, a bitmap covering the attnums for all index columns,
* for all key columns or for all the columns the configured replica identity
* are returned.
*
* Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
* we can include system attributes (e.g., OID) in the bitmap representation.
@ -3832,17 +3833,28 @@ RelationGetIndexPredicate(Relation relation)
* be bms_free'd when not needed anymore.
*/
Bitmapset *
RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
{
Bitmapset *indexattrs;
Bitmapset *uindexattrs;
Bitmapset *indexattrs; /* indexed columns */
Bitmapset *uindexattrs; /* columns in unique indexes */
Bitmapset *idindexattrs; /* columns in the the replica identity */
List *indexoidlist;
ListCell *l;
MemoryContext oldcxt;
/* Quick exit if we already computed the result. */
if (relation->rd_indexattr != NULL)
return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr);
switch(attrKind)
{
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return bms_copy(relation->rd_idattr);
case INDEX_ATTR_BITMAP_KEY:
return bms_copy(relation->rd_keyattr);
case INDEX_ATTR_BITMAP_ALL:
return bms_copy(relation->rd_indexattr);
default:
elog(ERROR, "unknown attrKind %u", attrKind);
}
/* Fast path if definitely no indexes */
if (!RelationGetForm(relation)->relhasindex)
@ -3869,13 +3881,16 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
*/
indexattrs = NULL;
uindexattrs = NULL;
idindexattrs = NULL;
foreach(l, indexoidlist)
{
Oid indexOid = lfirst_oid(l);
Relation indexDesc;
IndexInfo *indexInfo;
int i;
bool isKey;
bool isKey; /* candidate key */
bool isIDKey; /* replica identity index */
indexDesc = index_open(indexOid, AccessShareLock);
@ -3887,6 +3902,9 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
indexInfo->ii_Expressions == NIL &&
indexInfo->ii_Predicate == NIL;
/* Is this index the configured (or default) replica identity? */
isIDKey = indexOid == relation->rd_replidindex;
/* Collect simple attribute references */
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
{
@ -3896,6 +3914,11 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
{
indexattrs = bms_add_member(indexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
if (isIDKey)
idindexattrs = bms_add_member(idindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
if (isKey)
uindexattrs = bms_add_member(uindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
@ -3917,10 +3940,21 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_indexattr = bms_copy(indexattrs);
relation->rd_keyattr = bms_copy(uindexattrs);
relation->rd_idattr = bms_copy(idindexattrs);
MemoryContextSwitchTo(oldcxt);
/* We return our original working copy for caller to play with */
return keyAttrs ? uindexattrs : indexattrs;
switch(attrKind)
{
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return idindexattrs;
case INDEX_ATTR_BITMAP_KEY:
return uindexattrs;
case INDEX_ATTR_BITMAP_ALL:
return indexattrs;
default:
elog(ERROR, "unknown attrKind %u", attrKind);
}
}
/*

View File

@ -170,7 +170,7 @@
# - Settings -
#wal_level = minimal # minimal, archive, or hot_standby
#wal_level = minimal # minimal, archive, hot_standby or logical
# (change requires restart)
#fsync = on # turns forced synchronization on or off
#synchronous_commit = on # synchronization level;

View File

@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level)
return "archive";
case WAL_LEVEL_HOT_STANDBY:
return "hot_standby";
case WAL_LEVEL_LOGICAL:
return "logical";
}
return _("unrecognized wal_level");
}

View File

@ -55,6 +55,22 @@
#define XLOG_HEAP2_VISIBLE 0x40
#define XLOG_HEAP2_MULTI_INSERT 0x50
#define XLOG_HEAP2_LOCK_UPDATED 0x60
#define XLOG_HEAP2_NEW_CID 0x70
/*
* xl_heap_* ->flag values, 8 bits are available.
*/
/* PD_ALL_VISIBLE was cleared */
#define XLOG_HEAP_ALL_VISIBLE_CLEARED (1<<0)
/* PD_ALL_VISIBLE was cleared in the 2nd page */
#define XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED (1<<1)
#define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2)
#define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3)
#define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4)
/* convenience macro for checking whether any form of old tuple was logged */
#define XLOG_HEAP_CONTAINS_OLD \
(XLOG_HEAP_CONTAINS_OLD_TUPLE | XLOG_HEAP_CONTAINS_OLD_KEY)
/*
* All what we need to find changed tuple
@ -78,10 +94,10 @@ typedef struct xl_heap_delete
xl_heaptid target; /* deleted tuple id */
TransactionId xmax; /* xmax of the deleted tuple */
uint8 infobits_set; /* infomask bits */
bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
uint8 flags;
} xl_heap_delete;
#define SizeOfHeapDelete (offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool))
#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
/*
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
@ -100,15 +116,29 @@ typedef struct xl_heap_header
#define SizeOfHeapHeader (offsetof(xl_heap_header, t_hoff) + sizeof(uint8))
/*
* Variant of xl_heap_header that contains the length of the tuple, which is
* useful if the length of the tuple cannot be computed using the overall
* record length. E.g. because there are several tuples inside a single
* record.
*/
typedef struct xl_heap_header_len
{
uint16 t_len;
xl_heap_header header;
} xl_heap_header_len;
#define SizeOfHeapHeaderLen (offsetof(xl_heap_header_len, header) + SizeOfHeapHeader)
/* This is what we need to know about insert */
typedef struct xl_heap_insert
{
xl_heaptid target; /* inserted tuple id */
bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
uint8 flags;
/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_heap_insert;
#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
#define SizeOfHeapInsert (offsetof(xl_heap_insert, flags) + sizeof(uint8))
/*
* This is what we need to know about a multi-insert. The record consists of
@ -120,7 +150,7 @@ typedef struct xl_heap_multi_insert
{
RelFileNode node;
BlockNumber blkno;
bool all_visible_cleared;
uint8 flags;
uint16 ntuples;
OffsetNumber offsets[1];
@ -147,13 +177,12 @@ typedef struct xl_heap_update
TransactionId old_xmax; /* xmax of the old tuple */
TransactionId new_xmax; /* xmax of the new tuple */
ItemPointerData newtid; /* new inserted tuple id */
uint8 old_infobits_set; /* infomask bits to set on old tuple */
bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
bool new_all_visible_cleared; /* same for the page of newtid */
uint8 old_infobits_set; /* infomask bits to set on old tuple */
uint8 flags;
/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_heap_update;
#define SizeOfHeapUpdate (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
#define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8))
/*
* This is what we need to know about vacuum page cleanup/redirect
@ -263,6 +292,29 @@ typedef struct xl_heap_visible
#define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId))
typedef struct xl_heap_new_cid
{
/*
* store toplevel xid so we don't have to merge cids from different
* transactions
*/
TransactionId top_xid;
CommandId cmin;
CommandId cmax;
/*
* don't really need the combocid since we have the actual values
* right in this struct, but the padding makes it free and its
* useful for debugging.
*/
CommandId combocid;
/*
* Store the relfilenode/ctid pair to facilitate lookups.
*/
xl_heaptid target;
} xl_heap_new_cid;
#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid)
extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
TransactionId *latestRemovedXid);

View File

@ -215,6 +215,7 @@ extern TransactionId GetCurrentTransactionId(void);
extern TransactionId GetCurrentTransactionIdIfAny(void);
extern TransactionId GetStableLatestTransactionId(void);
extern SubTransactionId GetCurrentSubTransactionId(void);
extern void MarkCurrentTransactionIdLoggedIfAny(void);
extern bool SubTransactionIsActive(SubTransactionId subxid);
extern CommandId GetCurrentCommandId(bool used);
extern TimestampTz GetCurrentTransactionStartTimestamp(void);

View File

@ -197,7 +197,8 @@ typedef enum WalLevel
{
WAL_LEVEL_MINIMAL = 0,
WAL_LEVEL_ARCHIVE,
WAL_LEVEL_HOT_STANDBY
WAL_LEVEL_HOT_STANDBY,
WAL_LEVEL_LOGICAL
} WalLevel;
extern int wal_level;
@ -210,9 +211,12 @@ extern int wal_level;
*/
#define XLogIsNeeded() (wal_level >= WAL_LEVEL_ARCHIVE)
/* Do we need to WAL-log information required only for Hot Standby? */
/* Do we need to WAL-log information required only for Hot Standby and logical replication? */
#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
/* Do we need to WAL-log information required only for logical replication? */
#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
#ifdef WAL_DEBUG
extern bool XLOG_DEBUG;
#endif

View File

@ -55,7 +55,7 @@ typedef struct BkpBlock
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD078 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD079 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{

View File

@ -104,6 +104,7 @@ typedef struct RelationData
List *rd_indexlist; /* list of OIDs of indexes on relation */
Bitmapset *rd_indexattr; /* identifies columns used in indexes */
Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */
Bitmapset *rd_idattr; /* included in replica identity index */
Oid rd_oidindex; /* OID of unique index on OID, if any */
LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */
RuleLock *rd_rules; /* rewrite rules */
@ -453,6 +454,29 @@ typedef struct StdRdOptions
*/
#define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated)
/*
* RelationIsAccessibleInLogicalDecoding
* True if we need to log enough information to have access via
* decoding snapshot.
*/
#define RelationIsAccessibleInLogicalDecoding(relation) \
(XLogLogicalInfoActive() && \
RelationNeedsWAL(relation) && \
IsCatalogRelation(relation))
/*
* RelationIsLogicallyLogged
* True if we need to log enough information to extract the data from the
* WAL stream.
*
* We don't log information for unlogged tables (since they don't WAL log
* anyway) and for system tables (their content is hard to make sense of, and
* it would complicate decoding slightly for little gain).
*/
#define RelationIsLogicallyLogged(relation) \
(XLogLogicalInfoActive() && \
RelationNeedsWAL(relation) && \
!IsCatalogRelation(relation))
/* routines in utils/cache/relcache.c */
extern void RelationIncrementReferenceCount(Relation rel);

View File

@ -41,7 +41,17 @@ extern List *RelationGetIndexList(Relation relation);
extern Oid RelationGetOidIndex(Relation relation);
extern List *RelationGetIndexExpressions(Relation relation);
extern List *RelationGetIndexPredicate(Relation relation);
extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs);
typedef enum IndexAttrBitmapKind
{
INDEX_ATTR_BITMAP_ALL,
INDEX_ATTR_BITMAP_KEY,
INDEX_ATTR_BITMAP_IDENTITY_KEY
} IndexAttrBitmapKind;
extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation,
IndexAttrBitmapKind keyAttrs);
extern void RelationGetExclusionInfo(Relation indexRelation,
Oid **operators,
Oid **procs,

View File

@ -791,6 +791,7 @@ IdentifySystemCmd
IncrementVarSublevelsUp_context
Index
IndexArrayKeyInfo
IndexAttrBitmapKind
IndexBuildCallback
IndexBuildResult
IndexBulkDeleteCallback
@ -2419,11 +2420,13 @@ xl_heap_cleanup_info
xl_heap_delete
xl_heap_freeze
xl_heap_header
xl_heap_header_len
xl_heap_inplace
xl_heap_insert
xl_heap_lock
xl_heap_lock_updated
xl_heap_multi_insert
xl_heap_new_cid
xl_heap_newpage
xl_heap_update
xl_heap_visible