pgstat: Split out relation stats handling from AtEO[Sub]Xact_PgStat() etc.

An upcoming patch will add additional work to these functions. To avoid the
functions getting too complicated / doing too many things at once, split out
sub-tasks into their own functions.

Author: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/20210405092914.mmxqe7j56lsjfsej@alap3.anarazel.de
This commit is contained in:
Andres Freund 2021-09-20 13:56:16 -07:00
parent d3014fff4c
commit e1f958d759
1 changed files with 217 additions and 170 deletions

View File

@ -2374,18 +2374,68 @@ pgstat_update_heap_dead_tuples(Relation rel, int delta)
pgstat_info->t_counts.t_delta_dead_tuples -= delta;
}
/* ----------
* AtEOXact_PgStat
/*
* Perform relation stats specific end-of-transaction work. Helper for
* AtEOXact_PgStat.
*
* Called from access/transam/xact.c at top-level transaction commit/abort.
* ----------
* Transfer transactional insert/update counts into the base tabstat entries.
* We don't bother to free any of the transactional state, since it's all in
* TopTransactionContext and will go away anyway.
*/
void
AtEOXact_PgStat(bool isCommit, bool parallel)
static void
AtEOXact_PgStat_Relations(PgStat_SubXactStatus *xact_state, bool isCommit)
{
PgStat_SubXactStatus *xact_state;
PgStat_TableXactStatus *trans;
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
/* restore pre-truncate stats (if any) in case of aborted xact */
if (!isCommit)
pgstat_truncate_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
if (isCommit)
{
tabstat->t_counts.t_truncated = trans->truncated;
if (trans->truncated)
{
/* forget live/dead stats seen by backend thus far */
tabstat->t_counts.t_delta_live_tuples = 0;
tabstat->t_counts.t_delta_dead_tuples = 0;
}
/* insert adds a live tuple, delete removes one */
tabstat->t_counts.t_delta_live_tuples +=
trans->tuples_inserted - trans->tuples_deleted;
/* update and delete each create a dead tuple */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_updated + trans->tuples_deleted;
/* insert, update, delete each count as one change event */
tabstat->t_counts.t_changed_tuples +=
trans->tuples_inserted + trans->tuples_updated +
trans->tuples_deleted;
}
else
{
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
/* an aborted xact generates no changed_tuple events */
}
tabstat->trans = NULL;
}
}
static void
AtEOXact_PgStat_Database(bool isCommit, bool parallel)
{
/* Don't count parallel worker transaction stats */
if (!parallel)
{
@ -2398,63 +2448,29 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
else
pgStatXactRollback++;
}
}
/*
* Transfer transactional insert/update counts into the base tabstat
* entries. We don't bother to free any of the transactional state, since
* it's all in TopTransactionContext and will go away anyway.
*/
/* ----------
* AtEOXact_PgStat
*
* Called from access/transam/xact.c at top-level transaction commit/abort.
* ----------
*/
void
AtEOXact_PgStat(bool isCommit, bool parallel)
{
PgStat_SubXactStatus *xact_state;
AtEOXact_PgStat_Database(isCommit, parallel);
/* handle transactional stats information */
xact_state = pgStatXactStack;
if (xact_state != NULL)
{
PgStat_TableXactStatus *trans;
Assert(xact_state->nest_level == 1);
Assert(xact_state->prev == NULL);
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
/* restore pre-truncate stats (if any) in case of aborted xact */
if (!isCommit)
pgstat_truncate_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
if (isCommit)
{
tabstat->t_counts.t_truncated = trans->truncated;
if (trans->truncated)
{
/* forget live/dead stats seen by backend thus far */
tabstat->t_counts.t_delta_live_tuples = 0;
tabstat->t_counts.t_delta_dead_tuples = 0;
}
/* insert adds a live tuple, delete removes one */
tabstat->t_counts.t_delta_live_tuples +=
trans->tuples_inserted - trans->tuples_deleted;
/* update and delete each create a dead tuple */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_updated + trans->tuples_deleted;
/* insert, update, delete each count as one change event */
tabstat->t_counts.t_changed_tuples +=
trans->tuples_inserted + trans->tuples_updated +
trans->tuples_deleted;
}
else
{
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
/* an aborted xact generates no changed_tuple events */
}
tabstat->trans = NULL;
}
AtEOXact_PgStat_Relations(xact_state, isCommit);
}
pgStatXactStack = NULL;
@ -2462,6 +2478,90 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
pgstat_clear_snapshot();
}
/*
* Perform relation stats specific end-of-sub-transaction work. Helper for
* AtEOSubXact_PgStat.
*
* Transfer transactional insert/update counts into the next higher
* subtransaction state.
*/
static void
AtEOSubXact_PgStat_Relations(PgStat_SubXactStatus *xact_state, bool isCommit, int nestDepth)
{
PgStat_TableXactStatus *trans;
PgStat_TableXactStatus *next_trans;
for (trans = xact_state->first; trans != NULL; trans = next_trans)
{
PgStat_TableStatus *tabstat;
next_trans = trans->next;
Assert(trans->nest_level == nestDepth);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
if (isCommit)
{
if (trans->upper && trans->upper->nest_level == nestDepth - 1)
{
if (trans->truncated)
{
/* propagate the truncate status one level up */
pgstat_truncate_save_counters(trans->upper);
/* replace upper xact stats with ours */
trans->upper->tuples_inserted = trans->tuples_inserted;
trans->upper->tuples_updated = trans->tuples_updated;
trans->upper->tuples_deleted = trans->tuples_deleted;
}
else
{
trans->upper->tuples_inserted += trans->tuples_inserted;
trans->upper->tuples_updated += trans->tuples_updated;
trans->upper->tuples_deleted += trans->tuples_deleted;
}
tabstat->trans = trans->upper;
pfree(trans);
}
else
{
/*
* When there isn't an immediate parent state, we can just
* reuse the record instead of going through a
* palloc/pfree pushup (this works since it's all in
* TopTransactionContext anyway). We have to re-link it
* into the parent level, though, and that might mean
* pushing a new entry into the pgStatXactStack.
*/
PgStat_SubXactStatus *upper_xact_state;
upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
trans->next = upper_xact_state->first;
upper_xact_state->first = trans;
trans->nest_level = nestDepth - 1;
}
}
else
{
/*
* On abort, update top-level tabstat counts, then forget the
* subtransaction
*/
/* first restore values obliterated by truncate */
pgstat_truncate_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
tabstat->trans = trans->upper;
pfree(trans);
}
}
}
/* ----------
* AtEOSubXact_PgStat
*
@ -2473,99 +2573,57 @@ AtEOSubXact_PgStat(bool isCommit, int nestDepth)
{
PgStat_SubXactStatus *xact_state;
/*
* Transfer transactional insert/update counts into the next higher
* subtransaction state.
*/
/* merge the sub-transaction's transactional stats into the parent */
xact_state = pgStatXactStack;
if (xact_state != NULL &&
xact_state->nest_level >= nestDepth)
{
PgStat_TableXactStatus *trans;
PgStat_TableXactStatus *next_trans;
/* delink xact_state from stack immediately to simplify reuse case */
pgStatXactStack = xact_state->prev;
for (trans = xact_state->first; trans != NULL; trans = next_trans)
{
PgStat_TableStatus *tabstat;
AtEOSubXact_PgStat_Relations(xact_state, isCommit, nestDepth);
next_trans = trans->next;
Assert(trans->nest_level == nestDepth);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
if (isCommit)
{
if (trans->upper && trans->upper->nest_level == nestDepth - 1)
{
if (trans->truncated)
{
/* propagate the truncate status one level up */
pgstat_truncate_save_counters(trans->upper);
/* replace upper xact stats with ours */
trans->upper->tuples_inserted = trans->tuples_inserted;
trans->upper->tuples_updated = trans->tuples_updated;
trans->upper->tuples_deleted = trans->tuples_deleted;
}
else
{
trans->upper->tuples_inserted += trans->tuples_inserted;
trans->upper->tuples_updated += trans->tuples_updated;
trans->upper->tuples_deleted += trans->tuples_deleted;
}
tabstat->trans = trans->upper;
pfree(trans);
}
else
{
/*
* When there isn't an immediate parent state, we can just
* reuse the record instead of going through a
* palloc/pfree pushup (this works since it's all in
* TopTransactionContext anyway). We have to re-link it
* into the parent level, though, and that might mean
* pushing a new entry into the pgStatXactStack.
*/
PgStat_SubXactStatus *upper_xact_state;
upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
trans->next = upper_xact_state->first;
upper_xact_state->first = trans;
trans->nest_level = nestDepth - 1;
}
}
else
{
/*
* On abort, update top-level tabstat counts, then forget the
* subtransaction
*/
/* first restore values obliterated by truncate */
pgstat_truncate_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
tabstat->trans = trans->upper;
pfree(trans);
}
}
pfree(xact_state);
}
}
/*
* Generate 2PC records for all the pending transaction-dependent relation
* stats.
*/
static void
AtPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state)
{
PgStat_TableXactStatus *trans;
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
TwoPhasePgStatRecord record;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
record.tuples_inserted = trans->tuples_inserted;
record.tuples_updated = trans->tuples_updated;
record.tuples_deleted = trans->tuples_deleted;
record.inserted_pre_trunc = trans->inserted_pre_trunc;
record.updated_pre_trunc = trans->updated_pre_trunc;
record.deleted_pre_trunc = trans->deleted_pre_trunc;
record.t_id = tabstat->t_id;
record.t_shared = tabstat->t_shared;
record.t_truncated = trans->truncated;
RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
&record, sizeof(TwoPhasePgStatRecord));
}
}
/*
* AtPrepare_PgStat
* Save the transactional stats state at 2PC transaction prepare.
*
* In this phase we just generate 2PC records for all the pending
* transaction-dependent stats work.
*/
void
AtPrepare_PgStat(void)
@ -2575,33 +2633,32 @@ AtPrepare_PgStat(void)
xact_state = pgStatXactStack;
if (xact_state != NULL)
{
PgStat_TableXactStatus *trans;
Assert(xact_state->nest_level == 1);
Assert(xact_state->prev == NULL);
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
TwoPhasePgStatRecord record;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
AtPrepare_PgStat_Relations(xact_state);
}
}
record.tuples_inserted = trans->tuples_inserted;
record.tuples_updated = trans->tuples_updated;
record.tuples_deleted = trans->tuples_deleted;
record.inserted_pre_trunc = trans->inserted_pre_trunc;
record.updated_pre_trunc = trans->updated_pre_trunc;
record.deleted_pre_trunc = trans->deleted_pre_trunc;
record.t_id = tabstat->t_id;
record.t_shared = tabstat->t_shared;
record.t_truncated = trans->truncated;
/*
* All we need do here is unlink the transaction stats state from the
* nontransactional state. The nontransactional action counts will be
* reported to the stats collector immediately, while the effects on
* live and dead tuple counts are preserved in the 2PC state file.
*
* Note: AtEOXact_PgStat_Relations is not called during PREPARE.
*/
static void
PostPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state)
{
PgStat_TableXactStatus *trans;
RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
&record, sizeof(TwoPhasePgStatRecord));
}
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
tabstat = trans->parent;
tabstat->trans = NULL;
}
}
@ -2609,11 +2666,6 @@ AtPrepare_PgStat(void)
* PostPrepare_PgStat
* Clean up after successful PREPARE.
*
* All we need do here is unlink the transaction stats state from the
* nontransactional state. The nontransactional action counts will be
* reported to the stats collector immediately, while the effects on live
* and dead tuple counts are preserved in the 2PC state file.
*
* Note: AtEOXact_PgStat is not called during PREPARE.
*/
void
@ -2628,15 +2680,10 @@ PostPrepare_PgStat(void)
xact_state = pgStatXactStack;
if (xact_state != NULL)
{
PgStat_TableXactStatus *trans;
Assert(xact_state->nest_level == 1);
Assert(xact_state->prev == NULL);
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
tabstat = trans->parent;
tabstat->trans = NULL;
}
PostPrepare_PgStat_Relations(xact_state);
}
pgStatXactStack = NULL;