Allow the use of indexes other than PK and REPLICA IDENTITY on the subscriber.

Using REPLICA IDENTITY FULL on the publisher can lead to a full table scan
per tuple change on the subscription when REPLICA IDENTITY or PK index is
not available. This makes REPLICA IDENTITY FULL impractical to use apart
from some small number of use cases.

This patch allows using indexes other than PRIMARY KEY or REPLICA
IDENTITY on the subscriber during apply of update/delete. The index that
can be used must be a btree index, not a partial index, and it must have
at least one column reference (i.e. cannot consist of only expressions).
We can uplift these restrictions in the future. There is no smart
mechanism to pick the index. If there is more than one index that
satisfies these requirements, we just pick the first one. We discussed
using some of the optimizer's low-level APIs for this but ruled it out
as that can be a maintenance burden in the long run.

This patch improves the performance in the vast majority of cases and the
improvement is proportional to the amount of data in the table. However,
there could be some regression in a small number of cases where the indexes
have a lot of duplicate and dead rows. It was discussed that those are
mostly impractical cases but we can provide a table or subscription level
option to disable this feature if required.

Author: Onder Kalaci, Amit Kapila
Reviewed-by: Peter Smith, Shi yu, Hou Zhijie, Vignesh C, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/CACawEhVLqmAAyPXdHEPv1ssU2c=dqOniiGz7G73HfyS7+nGV4w@mail.gmail.com
This commit is contained in:
Amit Kapila 2023-03-15 08:36:38 +05:30
parent 720de00af4
commit 89e46da5e5
6 changed files with 325 additions and 69 deletions

View File

@ -132,7 +132,14 @@
certain additional requirements) can also be set to be the replica
identity. If the table does not have any suitable key, then it can be set
to replica identity <quote>full</quote>, which means the entire row becomes
the key. This, however, is very inefficient and should only be used as a
the key. When replica identity <quote>full</quote> is specified,
indexes can be used on the subscriber side for searching the rows. Candidate
indexes must be btree, non-partial, and have at least one column reference
(i.e. cannot consist of only expressions). These restrictions
on the non-unique index properties adhere to some of the restrictions that
are enforced for primary keys. If there are no such suitable indexes,
the search on the subscriber side can be very inefficient, therefore
replica identity <quote>full</quote> should only be used as a
fallback if no other solution is possible. If a replica identity other
than <quote>full</quote> is set on the publisher side, a replica identity
comprising the same or fewer columns must also be set on the subscriber

View File

@ -25,6 +25,7 @@
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "replication/logicalrelation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
@ -37,49 +38,63 @@
#include "utils/typcache.h"
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
TypeCacheEntry **eq);
/*
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
* is setup to match 'rel' (*NOT* idxrel!).
*
* Returns whether any column contains NULLs.
* Returns how many columns to use for the index scan.
*
* This is not generic routine, it expects the idxrel to be replication
* identity of a rel and meet all limitations associated with that.
* This is not generic routine, it expects the idxrel to be a btree, non-partial
* and have at least one column reference (i.e. cannot consist of only
* expressions).
*
* By definition, replication identity of a rel meets all limitations associated
* with that. Note that any other index could also meet these limitations.
*/
static bool
static int
build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
TupleTableSlot *searchslot)
{
int attoff;
int index_attoff;
int skey_attoff = 0;
bool isnull;
Datum indclassDatum;
oidvector *opclass;
int2vector *indkey = &idxrel->rd_index->indkey;
bool hasnulls = false;
Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
opclass = (oidvector *) DatumGetPointer(indclassDatum);
/* Build scankey for every attribute in the index. */
for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
/* Build scankey for every non-expression attribute in the index. */
for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
index_attoff++)
{
Oid operator;
Oid optype;
Oid opfamily;
RegProcedure regop;
int pkattno = attoff + 1;
int mainattno = indkey->values[attoff];
Oid optype = get_opclass_input_type(opclass->values[attoff]);
int table_attno = indkey->values[index_attoff];
if (!AttributeNumberIsValid(table_attno))
{
/*
* XXX: Currently, we don't support expressions in the scan key,
* see code below.
*/
continue;
}
/*
* Load the operator info. We need this to get the equality operator
* function for the scan key.
*/
opfamily = get_opclass_family(opclass->values[attoff]);
optype = get_opclass_input_type(opclass->values[index_attoff]);
opfamily = get_opclass_family(opclass->values[index_attoff]);
operator = get_opfamily_member(opfamily, optype,
optype,
@ -91,23 +106,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
regop = get_opcode(operator);
/* Initialize the scankey. */
ScanKeyInit(&skey[attoff],
pkattno,
ScanKeyInit(&skey[skey_attoff],
index_attoff + 1,
BTEqualStrategyNumber,
regop,
searchslot->tts_values[mainattno - 1]);
searchslot->tts_values[table_attno - 1]);
skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
/* Check for null value. */
if (searchslot->tts_isnull[mainattno - 1])
{
hasnulls = true;
skey[attoff].sk_flags |= SK_ISNULL;
}
if (searchslot->tts_isnull[table_attno - 1])
skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
skey_attoff++;
}
return hasnulls;
/* There must always be at least one attribute for the index scan. */
Assert(skey_attoff > 0);
return skey_attoff;
}
/*
@ -123,33 +140,58 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
TupleTableSlot *outslot)
{
ScanKeyData skey[INDEX_MAX_KEYS];
int skey_attoff;
IndexScanDesc scan;
SnapshotData snap;
TransactionId xwait;
Relation idxrel;
bool found;
TypeCacheEntry **eq = NULL;
bool isIdxSafeToSkipDuplicates;
/* Open the index. */
idxrel = index_open(idxoid, RowExclusiveLock);
/* Start an index scan. */
isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
InitDirtySnapshot(snap);
scan = index_beginscan(rel, idxrel, &snap,
IndexRelationGetNumberOfKeyAttributes(idxrel),
0);
/* Build scan key. */
build_replindex_scan_key(skey, rel, idxrel, searchslot);
skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
/* Start an index scan. */
scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
retry:
found = false;
index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
index_rescan(scan, skey, skey_attoff, NULL, 0);
/* Try to find the tuple */
if (index_getnext_slot(scan, ForwardScanDirection, outslot))
while (index_getnext_slot(scan, ForwardScanDirection, outslot))
{
found = true;
/*
* Avoid expensive equality check if the index is primary key or
* replica identity index.
*/
if (!isIdxSafeToSkipDuplicates)
{
if (eq == NULL)
{
#ifdef USE_ASSERT_CHECKING
/* apply assertions only once for the input idxoid */
IndexInfo *indexInfo = BuildIndexInfo(idxrel);
Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
#endif
eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
}
if (!tuples_equal(outslot, searchslot, eq))
continue;
}
ExecMaterializeSlot(outslot);
xwait = TransactionIdIsValid(snap.xmin) ?
@ -164,6 +206,10 @@ retry:
XactLockTableWait(xwait, NULL, NULL, XLTW_None);
goto retry;
}
/* Found our tuple and it's not locked */
found = true;
break;
}
/* Found tuple, try to lock it in the lockmode. */

View File

@ -17,8 +17,10 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/table.h"
#include "catalog/namespace.h"
#include "catalog/pg_am_d.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
@ -50,6 +52,9 @@ typedef struct LogicalRepPartMapEntry
LogicalRepRelMapEntry relmapentry;
} LogicalRepPartMapEntry;
static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
AttrMap *attrMap);
/*
* Relcache invalidation callback for our relation map cache.
*/
@ -439,6 +444,15 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
*/
logicalrep_rel_mark_updatable(entry);
/*
* Finding a usable index is an infrequent task. It occurs when an
* operation is first performed on the relation, or after invalidation
* of the relation cache entry (such as ANALYZE or CREATE/DROP index
* on the relation).
*/
entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel,
entry->attrmap);
entry->localrelvalid = true;
}
@ -697,10 +711,204 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
/* Set if the table's replica identity is enough to apply update/delete. */
logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
/* state and statelsn are left set to 0. */
MemoryContextSwitchTo(oldctx);
/*
* Finding a usable index is an infrequent task. It occurs when an
* operation is first performed on the relation, or after invalidation of
* the relation cache entry (such as ANALYZE or CREATE/DROP index on the
* relation).
*
* We also prefer to run this code on the oldctx so that we do not leak
* anything in the LogicalRepPartMapContext (hence CacheMemoryContext).
*/
entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel,
entry->attrmap);
entry->localrelvalid = true;
return entry;
}
/*
* Returns true if the given index consists only of expressions such as:
* CREATE INDEX idx ON table(foo(col));
*
* Returns false even if there is one column reference:
* CREATE INDEX idx ON table(foo(col), col_2);
*/
static bool
IsIndexOnlyOnExpression(IndexInfo *indexInfo)
{
for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
AttrNumber attnum = indexInfo->ii_IndexAttrNumbers[i];
if (AttributeNumberIsValid(attnum))
return false;
}
return true;
}
/*
* Returns true if the attrmap contains the leftmost column of the index.
* Otherwise returns false.
*
* attrmap is a map of local attributes to remote ones. We can consult this
* map to check whether the local index attribute has a corresponding remote
* attribute.
*/
static bool
RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap)
{
AttrNumber keycol;
Assert(indexInfo->ii_NumIndexAttrs >= 1);
keycol = indexInfo->ii_IndexAttrNumbers[0];
if (!AttributeNumberIsValid(keycol))
return false;
if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol))
return false;
return attrmap->attnums[AttrNumberGetAttrOffset(keycol)] >= 0;
}
/*
* Returns the oid of an index that can be used by the apply worker to scan
* the relation. The index must be btree, non-partial, and have at least
* one column reference (i.e. cannot consist of only expressions). These
* limitations help to keep the index scan similar to PK/RI index scans.
*
* Note that the limitations of index scans for replica identity full only
* adheres to a subset of the limitations of PK/RI. For example, we support
* columns that are marked as [NULL] or we are not interested in the [NOT
* DEFERRABLE] aspect of constraints here. It works for us because we always
* compare the tuples for non-PK/RI index scans. See
* RelationFindReplTupleByIndex().
*
* XXX: There are no fundamental problems for supporting non-btree indexes.
* We mostly need to relax the limitations in RelationFindReplTupleByIndex().
* For partial indexes, the required changes are likely to be larger. If
* none of the tuples satisfy the expression for the index scan, we fall-back
* to sequential execution, which might not be a good idea in some cases.
*
* We also skip indexes if the remote relation does not contain the leftmost
* column of the index. This is because in most such cases sequential scan is
* favorable over index scan.
*
* We expect to call this function when REPLICA IDENTITY FULL is defined for
* the remote relation.
*
* If no suitable index is found, returns InvalidOid.
*/
static Oid
FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
{
List *idxlist = RelationGetIndexList(localrel);
ListCell *lc;
foreach(lc, idxlist)
{
Oid idxoid = lfirst_oid(lc);
bool isUsableIdx;
bool containsLeftMostCol;
Relation idxRel;
IndexInfo *idxInfo;
idxRel = index_open(idxoid, AccessShareLock);
idxInfo = BuildIndexInfo(idxRel);
isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo);
containsLeftMostCol =
RemoteRelContainsLeftMostColumnOnIdx(idxInfo, attrmap);
index_close(idxRel, AccessShareLock);
/* Return the first eligible index found */
if (isUsableIdx && containsLeftMostCol)
return idxoid;
}
return InvalidOid;
}
/*
* Returns true if the index is usable for replica identity full. For details,
* see FindUsableIndexForReplicaIdentityFull.
*/
bool
IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
{
bool is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
bool is_partial = (indexInfo->ii_Predicate != NIL);
bool is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
return is_btree && !is_partial && !is_only_on_expression;
}
/*
* Get replica identity index or if it is not defined a primary key.
*
* If neither is defined, returns InvalidOid
*/
Oid
GetRelationIdentityOrPK(Relation rel)
{
Oid idxoid;
idxoid = RelationGetReplicaIndex(rel);
if (!OidIsValid(idxoid))
idxoid = RelationGetPrimaryKeyIndex(rel);
return idxoid;
}
/*
* Returns the index oid if we can use an index for subscriber. Otherwise,
* returns InvalidOid.
*/
static Oid
FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
AttrMap *attrMap)
{
Oid idxoid;
/*
* We never need index oid for partitioned tables, always rely on leaf
* partition's index.
*/
if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
return InvalidOid;
/*
* Simple case, we already have a primary key or a replica identity index.
*/
idxoid = GetRelationIdentityOrPK(localrel);
if (OidIsValid(idxoid))
return idxoid;
if (remoterel->replident == REPLICA_IDENTITY_FULL)
{
/*
* We are looking for one more opportunity for using an index. If
* there are any indexes defined on the local relation, try to pick a
* suitable index.
*
* The index selection safely assumes that all the columns are going
* to be available for the index scan given that remote relation has
* replica identity full.
*
* Note that we are not using the planner to find the cheapest method
* to scan the relation as that would require us to either use lower
* level planner functions which would be a maintenance burden in the
* long run or use the full-fledged planner which could cause
* overhead.
*/
return FindUsableIndexForReplicaIdentityFull(localrel, attrMap);
}
return InvalidOid;
}

View File

@ -400,12 +400,15 @@ static void apply_handle_insert_internal(ApplyExecutionData *edata,
static void apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup);
LogicalRepTupleData *newtup,
Oid localindexoid);
static void apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot);
TupleTableSlot *remoteslot,
Oid localindexoid);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@ -2350,24 +2353,6 @@ apply_handle_type(StringInfo s)
logicalrep_read_typ(s, &typ);
}
/*
* Get replica identity index or if it is not defined a primary key.
*
* If neither is defined, returns InvalidOid
*/
static Oid
GetRelationIdentityOrPK(Relation rel)
{
Oid idxoid;
idxoid = RelationGetReplicaIndex(rel);
if (!OidIsValid(idxoid))
idxoid = RelationGetPrimaryKeyIndex(rel);
return idxoid;
}
/*
* Check that we (the subscription owner) have sufficient privileges on the
* target relation to perform the given operation.
@ -2627,7 +2612,7 @@ apply_handle_update(StringInfo s)
remoteslot, &newtup, CMD_UPDATE);
else
apply_handle_update_internal(edata, edata->targetRelInfo,
remoteslot, &newtup);
remoteslot, &newtup, rel->localindexoid);
finish_edata(edata);
@ -2648,7 +2633,8 @@ static void
apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup)
LogicalRepTupleData *newtup,
Oid localindexoid)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
@ -2663,6 +2649,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(estate, localrel,
&relmapentry->remoterel,
localindexoid,
remoteslot, &localslot);
ExecClearTuple(remoteslot);
@ -2767,7 +2754,7 @@ apply_handle_delete(StringInfo s)
remoteslot, NULL, CMD_DELETE);
else
apply_handle_delete_internal(edata, edata->targetRelInfo,
remoteslot);
remoteslot, rel->localindexoid);
finish_edata(edata);
@ -2787,7 +2774,8 @@ apply_handle_delete(StringInfo s)
static void
apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot)
TupleTableSlot *remoteslot,
Oid localindexoid)
{
EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
@ -2799,7 +2787,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
found = FindReplTupleInLocalRel(estate, localrel, remoterel,
found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
remoteslot, &localslot);
/* If found delete it. */
@ -2833,17 +2821,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
* Try to find a tuple received from the publication side (in 'remoteslot') in
* the corresponding local relation using either replica identity index,
* primary key or if needed, sequential scan.
* primary key, index or if needed, sequential scan.
*
* Local tuple, if found, is returned in '*localslot'.
*/
static bool
FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot)
{
Oid idxoid;
bool found;
/*
@ -2854,12 +2842,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
Assert(OidIsValid(localidxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
found = RelationFindReplTupleByIndex(localrel, idxoid,
if (OidIsValid(localidxoid))
found = RelationFindReplTupleByIndex(localrel, localidxoid,
LockTupleExclusive,
remoteslot, *localslot);
else
@ -2960,7 +2947,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
case CMD_DELETE:
apply_handle_delete_internal(edata, partrelinfo,
remoteslot_part);
remoteslot_part,
part_entry->localindexoid);
break;
case CMD_UPDATE:
@ -2980,6 +2968,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
part_entry->localindexoid,
remoteslot_part, &localslot);
if (!found)
{
@ -3076,7 +3065,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* DELETE old tuple found in the old partition. */
apply_handle_delete_internal(edata, partrelinfo,
localslot);
localslot,
part_entry->localindexoid);
/* INSERT new tuple into the new partition. */

View File

@ -13,6 +13,7 @@
#define LOGICALRELATION_H
#include "access/attmap.h"
#include "catalog/index.h"
#include "replication/logicalproto.h"
typedef struct LogicalRepRelMapEntry
@ -31,6 +32,7 @@ typedef struct LogicalRepRelMapEntry
Relation localrel; /* relcache entry (NULL when closed) */
AttrMap *attrmap; /* map of local attributes to remote ones */
bool updatable; /* Can apply updates/deletes? */
Oid localindexoid; /* which index to use, or InvalidOid if none */
/* Sync state. */
char state;
@ -46,5 +48,7 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
Relation partrel, AttrMap *map);
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
extern Oid GetRelationIdentityOrPK(Relation rel);
#endif /* LOGICALRELATION_H */

View File

@ -38,6 +38,7 @@ tests += {
't/029_on_error.pl',
't/030_origin.pl',
't/031_column_list.pl',
't/032_subscribe_use_index.pl',
't/100_bugs.pl',
],
},