Compare commits

...

6 Commits

Author SHA1 Message Date
Michael Paquier 4f1882b960 Improve a bit the tests of pg_walinspect
This commit improves the tests of pg_walinspect on a few things:
- Remove aggregates for queries that should fail.  If the code is
reworked in such a way that the behavior of these queries is changed,
we would get more input from them, written this way.
- Expect at least one record reported in the valid queries doing scans
across ranges, rather than zero records.
- Adjust a few comments, for consistency.

Author: Bharath Rupireddy
Discussion: https://postgr.es/m/CALj2ACVaoXW3nJD9zq8E66BEf-phgJfFcKRVJq9GXkuX0b3ULQ@mail.gmail.com
2023-03-23 11:50:35 +09:00
Thomas Munro 8fba928fd7 Improve the naming of Parallel Hash Join phases.
* Commit 3048898e dropped -ING from PHJ wait event names.  Update the
  corresponding barrier phases names to match.

* Rename the "DONE" phases to "FREE".  That's symmetrical with
  "ALLOCATE", and names the activity that actually happens in that phase
  (as we do for the other phases) rather than a state.  The bug fixed by
  commit 8d578b9b might have been more obvious with this name.

* Rename the batch/bucket growth barriers' "ALLOCATE" phases to
  "REALLOCATE", a better description of what they do.

* Update the high level comments about phases to highlight phases
  are executed by a single process with an asterisk (mostly memory
  management phases).

No behavior change, as this is just improving internal identifiers.  The
only user-visible sign of this is that a couple of wait events' display
names change from "...Allocate" to "...Reallocate" in pg_stat_activity,
to stay in sync with the internal names.

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BMDpwF2Eo2LAvzd%3DpOh81wUTsrwU1uAwR-v6OGBB6%2B7g%40mail.gmail.com
2023-03-23 13:14:25 +13:00
Alexander Korotkov 11470f544e Allow locking updated tuples in tuple_update() and tuple_delete()
Currently, in read committed transaction isolation mode (default), we have the
following sequence of actions when tuple_update()/tuple_delete() finds
the tuple updated by concurrent transaction.

1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which
   returns TM_Updated.
2. Lock tuple with tuple_lock().
3. Re-evaluate plan qual (recheck if we still need to update/delete and
   calculate the new tuple for update).
4. Second attempt to update/delete tuple with tuple_update()/tuple_delete().
   This attempt should be successful, since the tuple was previously locked.

This patch eliminates step 2 by taking the lock during first
tuple_update()/tuple_delete() call.  Heap table access method saves some
efforts by checking the updated tuple once instead of twice.  Future
undo-based table access methods, which will start from the latest row version,
can immediately place a lock there.

The code in nodeModifyTable.c is simplified by removing the nested switch/case.

Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com
Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp
Reviewed-by: Andres Freund, Chris Travers
2023-03-23 00:26:59 +03:00
Alexander Korotkov 764da7710b Evade extra table_tuple_fetch_row_version() in ExecUpdate()/ExecDelete()
When we lock tuple using table_tuple_lock() then we at the same time fetch
the locked tuple to the slot.  In this case we can skip extra
table_tuple_fetch_row_version() thank to we've already fetched the 'old' tuple
and nobody can change it concurrently since it's locked.

Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com
Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp
Reviewed-by: Andres Freund, Chris Travers
2023-03-23 00:26:59 +03:00
Tom Lane c75a623304 Fix new test case to work on (some?) big-endian architectures.
Use of pack("L") gets around the basic endian problem, but it doesn't
deal with the fact that the order of the bitfields within the struct
may differ.  This patch fixes it to work with gcc on NetBSD/macppc,
but I wonder whether that will be enough --- in principle, there
could be four different combinations of bitpatterns needed here.

Discussion: https://postgr.es/m/1650745.1679513221@sss.pgh.pa.us
2023-03-22 17:14:21 -04:00
Tom Lane b48af6d174 Fix initdb's handling of min_wal_size and max_wal_size.
In commit 3e51b278d, I misinterpreted the coding in setup_config()
as setting min_wal_size and max_wal_size to compile-time-constant
values.  But it's not: there's a hidden dependency on --wal-segsize.
Therefore leaving these variables commented out is the wrong thing.
Per report from Andres Freund.

Discussion: https://postgr.es/m/20230322200751.jvfvsuuhd3hgm6vv@awork3.anarazel.de
2023-03-22 16:37:41 -04:00
18 changed files with 466 additions and 341 deletions

View File

@ -1,4 +1,4 @@
-- test old extension version entry points
-- Test old extension version entry points.
CREATE EXTENSION pg_walinspect WITH VERSION '1.0';
-- Mask DETAIL messages as these could refer to current LSN positions.
\set VERBOSITY terse
@ -28,7 +28,7 @@ SELECT 'init' FROM pg_create_physical_replication_slot('regress_pg_walinspect_sl
CREATE TABLE sample_tbl(col1 int, col2 int);
SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
-- Check bounds for these past functions.
-- Tests for the past functions.
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info_till_end_of_wal(:'wal_lsn1');
ok
----
@ -41,13 +41,14 @@ SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats_till_end_of_wal(:'wal_lsn1');
t
(1 row)
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info_till_end_of_wal('FFFFFFFF/FFFFFFFF');
-- Failures with start LSNs.
SELECT * FROM pg_get_wal_records_info_till_end_of_wal('FFFFFFFF/FFFFFFFF');
ERROR: WAL start LSN must be less than current LSN
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats_till_end_of_wal('FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_stats_till_end_of_wal('FFFFFFFF/FFFFFFFF');
ERROR: WAL start LSN must be less than current LSN
-- Move to new version 1.1
-- Move to new version 1.1.
ALTER EXTENSION pg_walinspect UPDATE TO '1.1';
-- List what version 1.1 contains
-- List what version 1.1 contains.
\dx+ pg_walinspect
Objects in extension "pg_walinspect"
Object description

View File

@ -9,7 +9,7 @@ SELECT 'init' FROM pg_create_physical_replication_slot('regress_pg_walinspect_sl
(1 row)
CREATE TABLE sample_tbl(col1 int, col2 int);
-- Save some LSNs for comparisons
-- Save some LSNs for comparisons.
SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
SELECT pg_current_wal_lsn() AS wal_lsn2 \gset
@ -35,56 +35,56 @@ ERROR: WAL start LSN must be less than end LSN
SELECT * FROM pg_get_wal_block_info(:'wal_lsn2', :'wal_lsn1');
ERROR: WAL start LSN must be less than end LSN
-- LSNs with the highest value possible.
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_record_info('FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_record_info('FFFFFFFF/FFFFFFFF');
ERROR: WAL input LSN must be less than current LSN
-- Success with end LSNs.
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
ok
----
t
(1 row)
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
ok
----
t
(1 row)
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
ok
----
t
(1 row)
-- failures with start LSNs
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
-- Failures with start LSNs.
SELECT * FROM pg_get_wal_records_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
ERROR: WAL start LSN must be less than current LSN
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_stats('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
ERROR: WAL start LSN must be less than current LSN
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_block_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
ERROR: WAL start LSN must be less than current LSN
-- ===================================================================
-- Tests for all function executions
-- ===================================================================
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_record_info(:'wal_lsn1');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_record_info(:'wal_lsn1');
ok
----
t
(1 row)
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2');
ok
----
t
(1 row)
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats(:'wal_lsn1', :'wal_lsn2');
ok
----
t
(1 row)
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', :'wal_lsn2');
ok
----
t
@ -115,7 +115,7 @@ SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2'
-- ===================================================================
-- Tests to get block information from WAL record
-- ===================================================================
-- Update table to generate some block data
-- Update table to generate some block data.
SELECT pg_current_wal_lsn() AS wal_lsn3 \gset
UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 1;
SELECT pg_current_wal_lsn() AS wal_lsn4 \gset
@ -172,7 +172,7 @@ SELECT has_function_privilege('regress_pg_walinspect',
f
(1 row)
-- Functions accessible by users with role pg_read_server_files
-- Functions accessible by users with role pg_read_server_files.
GRANT pg_read_server_files TO regress_pg_walinspect;
SELECT has_function_privilege('regress_pg_walinspect',
'pg_get_wal_record_info(pg_lsn)', 'EXECUTE'); -- yes
@ -203,7 +203,7 @@ SELECT has_function_privilege('regress_pg_walinspect',
(1 row)
REVOKE pg_read_server_files FROM regress_pg_walinspect;
-- Superuser can grant execute to other users
-- Superuser can grant execute to other users.
GRANT EXECUTE ON FUNCTION pg_get_wal_record_info(pg_lsn)
TO regress_pg_walinspect;
GRANT EXECUTE ON FUNCTION pg_get_wal_records_info(pg_lsn, pg_lsn)

View File

@ -1,4 +1,4 @@
-- test old extension version entry points
-- Test old extension version entry points.
CREATE EXTENSION pg_walinspect WITH VERSION '1.0';
@ -20,16 +20,17 @@ CREATE TABLE sample_tbl(col1 int, col2 int);
SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
-- Check bounds for these past functions.
-- Tests for the past functions.
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info_till_end_of_wal(:'wal_lsn1');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats_till_end_of_wal(:'wal_lsn1');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info_till_end_of_wal('FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats_till_end_of_wal('FFFFFFFF/FFFFFFFF');
-- Failures with start LSNs.
SELECT * FROM pg_get_wal_records_info_till_end_of_wal('FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_stats_till_end_of_wal('FFFFFFFF/FFFFFFFF');
-- Move to new version 1.1
-- Move to new version 1.1.
ALTER EXTENSION pg_walinspect UPDATE TO '1.1';
-- List what version 1.1 contains
-- List what version 1.1 contains.
\dx+ pg_walinspect
SELECT pg_drop_replication_slot('regress_pg_walinspect_slot');

View File

@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_physical_replication_slot('regress_pg_walinspect_sl
CREATE TABLE sample_tbl(col1 int, col2 int);
-- Save some LSNs for comparisons
-- Save some LSNs for comparisons.
SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
SELECT pg_current_wal_lsn() AS wal_lsn2 \gset
@ -32,24 +32,24 @@ SELECT * FROM pg_get_wal_stats(:'wal_lsn2', :'wal_lsn1');
SELECT * FROM pg_get_wal_block_info(:'wal_lsn2', :'wal_lsn1');
-- LSNs with the highest value possible.
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_record_info('FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_record_info('FFFFFFFF/FFFFFFFF');
-- Success with end LSNs.
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
-- failures with start LSNs
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
-- Failures with start LSNs.
SELECT * FROM pg_get_wal_records_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_stats('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
SELECT * FROM pg_get_wal_block_info('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
-- ===================================================================
-- Tests for all function executions
-- ===================================================================
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_record_info(:'wal_lsn1');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_stats(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 0 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_record_info(:'wal_lsn1');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_stats(:'wal_lsn1', :'wal_lsn2');
SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn1', :'wal_lsn2');
-- ===================================================================
-- Test for filtering out WAL records of a particular table
@ -72,7 +72,7 @@ SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_info(:'wal_lsn1', :'wal_lsn2'
-- Tests to get block information from WAL record
-- ===================================================================
-- Update table to generate some block data
-- Update table to generate some block data.
SELECT pg_current_wal_lsn() AS wal_lsn3 \gset
UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 1;
SELECT pg_current_wal_lsn() AS wal_lsn4 \gset
@ -103,9 +103,9 @@ SELECT has_function_privilege('regress_pg_walinspect',
SELECT has_function_privilege('regress_pg_walinspect',
'pg_get_wal_block_info(pg_lsn, pg_lsn) ', 'EXECUTE'); -- no
-- Functions accessible by users with role pg_read_server_files
-- Functions accessible by users with role pg_read_server_files.
GRANT pg_read_server_files TO regress_pg_walinspect;
SELECT has_function_privilege('regress_pg_walinspect',
'pg_get_wal_record_info(pg_lsn)', 'EXECUTE'); -- yes
SELECT has_function_privilege('regress_pg_walinspect',
@ -117,7 +117,7 @@ SELECT has_function_privilege('regress_pg_walinspect',
REVOKE pg_read_server_files FROM regress_pg_walinspect;
-- Superuser can grant execute to other users
-- Superuser can grant execute to other users.
GRANT EXECUTE ON FUNCTION pg_get_wal_record_info(pg_lsn)
TO regress_pg_walinspect;
GRANT EXECUTE ON FUNCTION pg_get_wal_records_info(pg_lsn, pg_lsn)

View File

@ -1700,11 +1700,6 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting for other Parallel Hash participants to finish partitioning
the outer relation.</entry>
</row>
<row>
<entry><literal>HashGrowBatchesAllocate</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to allocate more
batches.</entry>
</row>
<row>
<entry><literal>HashGrowBatchesDecide</literal></entry>
<entry>Waiting to elect a Parallel Hash participant to decide on future
@ -1720,21 +1715,26 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting for an elected Parallel Hash participant to decide on
future batch growth.</entry>
</row>
<row>
<entry><literal>HashGrowBatchesReallocate</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to allocate more
batches.</entry>
</row>
<row>
<entry><literal>HashGrowBatchesRepartition</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish
repartitioning.</entry>
</row>
<row>
<entry><literal>HashGrowBucketsAllocate</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to finish
allocating more buckets.</entry>
</row>
<row>
<entry><literal>HashGrowBucketsElect</literal></entry>
<entry>Waiting to elect a Parallel Hash participant to allocate more
buckets.</entry>
</row>
<row>
<entry><literal>HashGrowBucketsReallocate</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to finish
allocating more buckets.</entry>
</row>
<row>
<entry><literal>HashGrowBucketsReinsert</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish inserting

View File

@ -45,6 +45,12 @@
#include "utils/builtins.h"
#include "utils/rel.h"
static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
Snapshot snapshot, TupleTableSlot *slot,
CommandId cid, LockTupleMode mode,
LockWaitPolicy wait_policy, uint8 flags,
TM_FailureData *tmfd, bool updated);
static void reform_and_rewrite_tuple(HeapTuple tuple,
Relation OldHeap, Relation NewHeap,
Datum *values, bool *isnull, RewriteState rwstate);
@ -299,14 +305,46 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
static TM_Result
heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
Snapshot snapshot, Snapshot crosscheck, bool wait,
TM_FailureData *tmfd, bool changingPart)
TM_FailureData *tmfd, bool changingPart,
LazyTupleTableSlot *lockedSlot)
{
TM_Result result;
/*
* Currently Deleting of index tuples are handled at vacuum, in case if
* the storage itself is cleaning the dead tuples by itself, it is the
* time to call the index tuple deletion also.
*/
return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
result = heap_delete(relation, tid, cid, crosscheck, wait,
tmfd, changingPart);
/*
* If the tuple has been concurrently updated, then get the lock on it.
* (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
* lock held retry of delete should succeed even if there are more
* concurrent update attempts.
*/
if (result == TM_Updated && lockedSlot)
{
TupleTableSlot *evalSlot;
Assert(wait);
evalSlot = LAZY_TTS_EVAL(lockedSlot);
result = heapam_tuple_lock_internal(relation, tid, snapshot,
evalSlot, cid, LockTupleExclusive,
LockWaitBlock,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
tmfd, true);
if (result == TM_Ok)
{
tmfd->traversed = true;
return TM_Updated;
}
}
return result;
}
@ -314,7 +352,8 @@ static TM_Result
heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
bool wait, TM_FailureData *tmfd,
LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes)
LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes,
LazyTupleTableSlot *lockedSlot)
{
bool shouldFree = true;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@ -352,6 +391,32 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
if (shouldFree)
pfree(tuple);
/*
* If the tuple has been concurrently updated, then get the lock on it.
* (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
* lock held retry of update should succeed even if there are more
* concurrent update attempts.
*/
if (result == TM_Updated && lockedSlot)
{
TupleTableSlot *evalSlot;
Assert(wait);
evalSlot = LAZY_TTS_EVAL(lockedSlot);
result = heapam_tuple_lock_internal(relation, otid, snapshot,
evalSlot, cid, *lockmode,
LockWaitBlock,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
tmfd, true);
if (result == TM_Ok)
{
tmfd->traversed = true;
return TM_Updated;
}
}
return result;
}
@ -360,10 +425,26 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
LockWaitPolicy wait_policy, uint8 flags,
TM_FailureData *tmfd)
{
return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid,
mode, wait_policy, flags, tmfd, false);
}
/*
* This routine does the work for heapam_tuple_lock(), but also support
* `updated` argument to re-use the work done by heapam_tuple_update() or
* heapam_tuple_delete() on figuring out that tuple was concurrently updated.
*/
static TM_Result
heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
Snapshot snapshot, TupleTableSlot *slot,
CommandId cid, LockTupleMode mode,
LockWaitPolicy wait_policy, uint8 flags,
TM_FailureData *tmfd, bool updated)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
TM_Result result;
Buffer buffer;
Buffer buffer = InvalidBuffer;
HeapTuple tuple = &bslot->base.tupdata;
bool follow_updates;
@ -374,16 +455,26 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
tuple_lock_retry:
tuple->t_self = *tid;
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
follow_updates, &buffer, tmfd);
if (!updated)
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
follow_updates, &buffer, tmfd);
else
result = TM_Updated;
if (result == TM_Updated &&
(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
{
/* Should not encounter speculative tuple on recheck */
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
if (!updated)
{
/* Should not encounter speculative tuple on recheck */
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
ReleaseBuffer(buffer);
ReleaseBuffer(buffer);
}
else
{
updated = false;
}
if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
{

View File

@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
GetCurrentCommandId(true),
snapshot, InvalidSnapshot,
true /* wait for commit */ ,
&tmfd, false /* changingPart */ );
&tmfd, false /* changingPart */ ,
NULL);
switch (result)
{
@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
GetCurrentCommandId(true),
snapshot, InvalidSnapshot,
true /* wait for commit */ ,
&tmfd, &lockmode, update_indexes);
&tmfd, &lockmode, update_indexes,
NULL);
switch (result)
{

View File

@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
*/
pstate = hashtable->parallel_state;
build_barrier = &pstate->build_barrier;
Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
switch (BarrierPhase(build_barrier))
{
case PHJ_BUILD_ALLOCATING:
case PHJ_BUILD_ALLOCATE:
/*
* Either I just allocated the initial hash table in
@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
/* Fall through. */
case PHJ_BUILD_HASHING_INNER:
case PHJ_BUILD_HASH_INNER:
/*
* It's time to begin hashing, or if we just arrived here then
@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
* below.
*/
if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
PHJ_GROW_BATCHES_ELECTING)
PHJ_GROW_BATCHES_ELECT)
ExecParallelHashIncreaseNumBatches(hashtable);
if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
PHJ_GROW_BUCKETS_ELECTING)
PHJ_GROW_BUCKETS_ELECT)
ExecParallelHashIncreaseNumBuckets(hashtable);
ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@ -338,17 +338,17 @@ MultiExecParallelHash(HashState *node)
* Unless we're completely done and the batch state has been freed, make
* sure we have accessors.
*/
if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
ExecParallelHashEnsureBatchAccessors(hashtable);
/*
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
* case, which will bring the build phase to PHJ_BUILD_RUNNING (if it
* isn't there already).
* case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
* there already).
*/
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
}
/* ----------------------------------------------------------------
@ -592,8 +592,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
* Attach to the build barrier. The corresponding detach operation is
* in ExecHashTableDetach. Note that we won't attach to the
* batch_barrier for batch 0 yet. We'll attach later and start it out
* in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
* and then loaded while hashing (the standard hybrid hash join
* in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
* then loaded while hashing (the standard hybrid hash join
* algorithm), and we'll coordinate that using build_barrier.
*/
build_barrier = &pstate->build_barrier;
@ -606,7 +606,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
* SharedHashJoinBatch objects and the hash table for batch 0. One
* backend will be elected to do that now if necessary.
*/
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
{
pstate->nbatch = nbatch;
@ -627,7 +627,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
/*
* The next Parallel Hash synchronization point is in
* MultiExecParallelHash(), which will progress it all the way to
* PHJ_BUILD_RUNNING. The caller must not return control from this
* PHJ_BUILD_RUN. The caller must not return control from this
* executor node between now and then.
*/
}
@ -1075,7 +1075,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
{
ParallelHashJoinState *pstate = hashtable->parallel_state;
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
/*
* It's unlikely, but we need to be prepared for new participants to show
@ -1084,7 +1084,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
*/
switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
{
case PHJ_GROW_BATCHES_ELECTING:
case PHJ_GROW_BATCHES_ELECT:
/*
* Elect one participant to prepare to grow the number of batches.
@ -1200,13 +1200,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
}
/* Fall through. */
case PHJ_GROW_BATCHES_ALLOCATING:
case PHJ_GROW_BATCHES_REALLOCATE:
/* Wait for the above to be finished. */
BarrierArriveAndWait(&pstate->grow_batches_barrier,
WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE);
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
/* Fall through. */
case PHJ_GROW_BATCHES_REPARTITIONING:
case PHJ_GROW_BATCHES_REPARTITION:
/* Make sure that we have the current dimensions and buckets. */
ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@ -1219,7 +1219,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
/* Fall through. */
case PHJ_GROW_BATCHES_DECIDING:
case PHJ_GROW_BATCHES_DECIDE:
/*
* Elect one participant to clean up and decide whether further
@ -1274,7 +1274,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
}
/* Fall through. */
case PHJ_GROW_BATCHES_FINISHING:
case PHJ_GROW_BATCHES_FINISH:
/* Wait for the above to complete. */
BarrierArriveAndWait(&pstate->grow_batches_barrier,
WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
@ -1514,7 +1514,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
HashMemoryChunk chunk;
dsa_pointer chunk_s;
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
/*
* It's unlikely, but we need to be prepared for new participants to show
@ -1523,7 +1523,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
*/
switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
{
case PHJ_GROW_BUCKETS_ELECTING:
case PHJ_GROW_BUCKETS_ELECT:
/* Elect one participant to prepare to increase nbuckets. */
if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
@ -1552,13 +1552,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
}
/* Fall through. */
case PHJ_GROW_BUCKETS_ALLOCATING:
case PHJ_GROW_BUCKETS_REALLOCATE:
/* Wait for the above to complete. */
BarrierArriveAndWait(&pstate->grow_buckets_barrier,
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE);
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
/* Fall through. */
case PHJ_GROW_BUCKETS_REINSERTING:
case PHJ_GROW_BUCKETS_REINSERT:
/* Reinsert all tuples into the hash table. */
ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@ -1714,7 +1714,7 @@ retry:
/* Try to load it into memory. */
Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
PHJ_BUILD_HASHING_INNER);
PHJ_BUILD_HASH_INNER);
hashTuple = ExecParallelHashTupleAlloc(hashtable,
HJTUPLE_OVERHEAD + tuple->t_len,
&shared);
@ -2868,7 +2868,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
if (pstate->growth != PHJ_GROWTH_DISABLED)
{
Assert(curbatch == 0);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
/*
* Check if our space limit would be exceeded. To avoid choking on
@ -2988,7 +2988,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
{
/* Batch 0 doesn't need to be loaded. */
BarrierAttach(&shared->batch_barrier);
while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE)
BarrierArriveAndWait(&shared->batch_barrier, 0);
BarrierDetach(&shared->batch_barrier);
}
@ -3063,7 +3063,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
/*
* We should never see a state where the batch-tracking array is freed,
* because we should have given up sooner if we join when the build
* barrier has reached the PHJ_BUILD_DONE phase.
* barrier has reached the PHJ_BUILD_FREE phase.
*/
Assert(DsaPointerIsValid(pstate->batches));
@ -3146,7 +3146,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
* longer attached, but since there is no way it's moving after
* this point it seems safe to make the following assertion.
*/
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
@ -3189,13 +3189,12 @@ ExecHashTableDetach(HashJoinTable hashtable)
/*
* If we're involved in a parallel query, we must either have gotten all
* the way to PHJ_BUILD_RUNNING, or joined too late and be in
* PHJ_BUILD_DONE.
* the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
*/
Assert(!pstate ||
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUN);
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
{
int i;
@ -3218,7 +3217,7 @@ ExecHashTableDetach(HashJoinTable hashtable)
* Late joining processes will see this state and give up
* immediately.
*/
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_FREE);
if (DsaPointerIsValid(pstate->batches))
{

View File

@ -39,27 +39,30 @@
*
* One barrier called build_barrier is used to coordinate the hashing phases.
* The phase is represented by an integer which begins at zero and increments
* one by one, but in the code it is referred to by symbolic names as follows:
* one by one, but in the code it is referred to by symbolic names as follows.
* An asterisk indicates a phase that is performed by a single arbitrarily
* chosen process.
*
* PHJ_BUILD_ELECTING -- initial state
* PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
* PHJ_BUILD_HASHING_INNER -- all hash the inner rel
* PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
* PHJ_BUILD_RUNNING -- building done, probing can begin
* PHJ_BUILD_DONE -- all work complete, one frees batches
* PHJ_BUILD_ELECT -- initial state
* PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
* PHJ_BUILD_HASH_INNER -- all hash the inner rel
* PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
* PHJ_BUILD_RUN -- building done, probing can begin
* PHJ_BUILD_FREE* -- all work complete, one frees batches
*
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
* While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
* be used repeatedly as required to coordinate expansions in the number of
* batches or buckets. Their phases are as follows:
*
* PHJ_GROW_BATCHES_ELECTING -- initial state
* PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
* PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
* PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
* PHJ_GROW_BATCHES_ELECT -- initial state
* PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
* PHJ_GROW_BATCHES_REPARTITION -- all repartition
* PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
* PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
*
* PHJ_GROW_BUCKETS_ELECTING -- initial state
* PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
* PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
* PHJ_GROW_BUCKETS_ELECT -- initial state
* PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
* PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
*
* If the planner got the number of batches and buckets right, those won't be
* necessary, but on the other hand we might finish up needing to expand the
@ -67,27 +70,27 @@
* within our memory budget and load factor target. For that reason it's a
* separate pair of barriers using circular phases.
*
* The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
* The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
* because we need to divide the outer relation into batches up front in order
* to be able to process batches entirely independently. In contrast, the
* parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
* batches whenever it encounters them while scanning and probing, which it
* can do because it processes batches in serial order.
*
* Once PHJ_BUILD_RUNNING is reached, backends then split up and process
* Once PHJ_BUILD_RUN is reached, backends then split up and process
* different batches, or gang up and work together on probing batches if there
* aren't enough to go around. For each batch there is a separate barrier
* with the following phases:
*
* PHJ_BATCH_ELECTING -- initial state
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
* PHJ_BATCH_DONE -- end
* PHJ_BATCH_ELECT -- initial state
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
* PHJ_BATCH_LOAD -- all load the hash table from disk
* PHJ_BATCH_PROBE -- all probe
* PHJ_BATCH_FREE* -- one frees memory
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
* PHJ_BUILD_HASHING_INNER so we can skip loading.
* PHJ_BATCH_PROBE; populating batch 0's hash table is done during
* PHJ_BUILD_HASH_INNER so we can skip loading.
*
* Initially we try to plan for a single-batch hash join using the combined
* hash_mem of all participants to create a large shared hash table. If that
@ -99,8 +102,8 @@
* finished. Practically, that means that we never emit a tuple while attached
* to a barrier, unless the barrier has reached a phase that means that no
* process will wait on it again. We emit tuples while attached to the build
* barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
* PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
* respectively without waiting, using BarrierArriveAndDetach(). The last to
* detach receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
@ -306,13 +309,12 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (parallel)
{
/*
* Advance the build barrier to PHJ_BUILD_RUNNING
* before proceeding so we can negotiate resource
* cleanup.
* Advance the build barrier to PHJ_BUILD_RUN before
* proceeding so we can negotiate resource cleanup.
*/
Barrier *build_barrier = &parallel_state->build_barrier;
while (BarrierPhase(build_barrier) < PHJ_BUILD_RUNNING)
while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
BarrierArriveAndWait(build_barrier, 0);
}
return NULL;
@ -336,10 +338,10 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
Barrier *build_barrier;
build_barrier = &parallel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
{
/*
* If multi-batch, we need to hash the outer relation
@ -350,7 +352,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASH_OUTER);
}
else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
{
/*
* If we attached so late that the job is finished and
@ -361,7 +363,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
}
/* Each backend should now select a batch to work on. */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@ -1153,7 +1155,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
switch (BarrierAttach(batch_barrier))
{
case PHJ_BATCH_ELECTING:
case PHJ_BATCH_ELECT:
/* One backend allocates the hash table. */
if (BarrierArriveAndWait(batch_barrier,
@ -1161,13 +1163,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
ExecParallelHashTableAlloc(hashtable, batchno);
/* Fall through. */
case PHJ_BATCH_ALLOCATING:
case PHJ_BATCH_ALLOCATE:
/* Wait for allocation to complete. */
BarrierArriveAndWait(batch_barrier,
WAIT_EVENT_HASH_BATCH_ALLOCATE);
/* Fall through. */
case PHJ_BATCH_LOADING:
case PHJ_BATCH_LOAD:
/* Start (or join in) loading tuples. */
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
inner_tuples = hashtable->batches[batchno].inner_tuples;
@ -1187,7 +1189,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
WAIT_EVENT_HASH_BATCH_LOAD);
/* Fall through. */
case PHJ_BATCH_PROBING:
case PHJ_BATCH_PROBE:
/*
* This batch is ready to probe. Return control to
@ -1197,13 +1199,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* this barrier again (or else a deadlock could occur).
* All attached participants must eventually call
* BarrierArriveAndDetach() so that the final phase
* PHJ_BATCH_DONE can be reached.
* PHJ_BATCH_FREE can be reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
case PHJ_BATCH_DONE:
case PHJ_BATCH_FREE:
/*
* Already done. Detach and go around again (if any
@ -1523,7 +1525,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
/*
* It would be possible to reuse the shared hash table in single-batch
* cases by resetting and then fast-forwarding build_barrier to
* PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
* PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
* currently shared hash tables are already freed by now (by the last
* participant to detach from the batch). We could consider keeping it
* around for single-batch joins. We'd also need to adjust
@ -1542,7 +1544,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
/* Clear any shared batch files. */
SharedFileSetDeleteAll(&pstate->fileset);
/* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
/* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
BarrierInit(&pstate->build_barrier, 0);
}

View File

@ -1324,26 +1324,62 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
return true;
}
/*
* The implementation for LazyTupleTableSlot wrapper for EPQ slot to be passed
* to table_tuple_update()/table_tuple_delete().
*/
typedef struct
{
EPQState *epqstate;
ResultRelInfo *resultRelInfo;
} GetEPQSlotArg;
static TupleTableSlot *
GetEPQSlot(void *arg)
{
GetEPQSlotArg *slotArg = (GetEPQSlotArg *) arg;
return EvalPlanQualSlot(slotArg->epqstate,
slotArg->resultRelInfo->ri_RelationDesc,
slotArg->resultRelInfo->ri_RangeTableIndex);
}
/*
* ExecDeleteAct -- subroutine for ExecDelete
*
* Actually delete the tuple from a plain table.
*
* If the 'lockUpdated' flag is set and the target tuple is updated, then
* the latest version gets locked and fetched into the EPQ slot.
*
* Caller is in charge of doing EvalPlanQual as necessary
*/
static TM_Result
ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ItemPointer tupleid, bool changingPart)
ItemPointer tupleid, bool changingPart, bool lockUpdated)
{
EState *estate = context->estate;
GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
LazyTupleTableSlot lazyEPQSlot,
*lazyEPQSlotPtr;
if (lockUpdated)
{
MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
lazyEPQSlotPtr = &lazyEPQSlot;
}
else
{
lazyEPQSlotPtr = NULL;
}
return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
estate->es_output_cid,
estate->es_snapshot,
estate->es_crosscheck_snapshot,
true /* wait for commit */ ,
&context->tmfd,
changingPart);
changingPart,
lazyEPQSlotPtr);
}
/*
@ -1488,7 +1524,8 @@ ExecDelete(ModifyTableContext *context,
* transaction-snapshot mode transactions.
*/
ldelete:
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart);
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart,
!IsolationUsesXactSnapshot());
switch (result)
{
@ -1541,87 +1578,49 @@ ldelete:
errmsg("could not serialize access due to concurrent update")));
/*
* Already know that we're going to need to do EPQ, so
* fetch tuple directly into the right slot.
* ExecDeleteAct() has already locked the old tuple for
* us. Now we need to copy it to the right slot.
*/
EvalPlanQualBegin(context->epqstate);
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
resultRelInfo->ri_RangeTableIndex);
result = table_tuple_lock(resultRelationDesc, tupleid,
estate->es_snapshot,
inputslot, estate->es_output_cid,
LockTupleExclusive, LockWaitBlock,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
&context->tmfd);
switch (result)
/*
* Save locked table for further processing for RETURNING
* clause.
*/
if (processReturning &&
resultRelInfo->ri_projectReturning &&
!resultRelInfo->ri_FdwRoutine)
{
case TM_Ok:
Assert(context->tmfd.traversed);
epqslot = EvalPlanQual(context->epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
inputslot);
if (TupIsNull(epqslot))
/* Tuple not passing quals anymore, exiting... */
return NULL;
TupleTableSlot *returningSlot;
/*
* If requested, skip delete and pass back the
* updated row.
*/
if (epqreturnslot)
{
*epqreturnslot = epqslot;
return NULL;
}
else
goto ldelete;
case TM_SelfModified:
/*
* This can be reached when following an update
* chain from a tuple updated by another session,
* reaching a tuple that was already updated in
* this transaction. If previously updated by this
* command, ignore the delete, otherwise error
* out.
*
* See also TM_SelfModified response to
* table_tuple_delete() above.
*/
if (context->tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be deleted was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
return NULL;
case TM_Deleted:
/* tuple already deleted; nothing to do */
return NULL;
default:
/*
* TM_Invisible should be impossible because we're
* waiting for updated row versions, and would
* already have errored out if the first version
* is invisible.
*
* TM_Updated should be impossible, because we're
* locking the latest version via
* TUPLE_LOCK_FLAG_FIND_LAST_VERSION.
*/
elog(ERROR, "unexpected table_tuple_lock status: %u",
result);
return NULL;
returningSlot = ExecGetReturningSlot(estate,
resultRelInfo);
ExecCopySlot(returningSlot, inputslot);
ExecMaterializeSlot(returningSlot);
}
Assert(false);
break;
Assert(context->tmfd.traversed);
epqslot = EvalPlanQual(context->epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
inputslot);
if (TupIsNull(epqslot))
/* Tuple not passing quals anymore, exiting... */
return NULL;
/*
* If requested, skip delete and pass back the updated
* row.
*/
if (epqreturnslot)
{
*epqreturnslot = epqslot;
return NULL;
}
else
goto ldelete;
}
case TM_Deleted:
@ -1673,12 +1672,17 @@ ldelete:
}
else
{
/*
* Tuple can be already fetched to the returning slot in case
* we've previously locked it. Fetch the tuple only if the slot
* is empty.
*/
slot = ExecGetReturningSlot(estate, resultRelInfo);
if (oldtuple != NULL)
{
ExecForceStoreHeapTuple(oldtuple, slot, false);
}
else
else if (TupIsNull(slot))
{
if (!table_tuple_fetch_row_version(resultRelationDesc, tupleid,
SnapshotAny, slot))
@ -1961,12 +1965,15 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo,
static TM_Result
ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
bool canSetTag, UpdateContext *updateCxt)
bool canSetTag, bool lockUpdated, UpdateContext *updateCxt)
{
EState *estate = context->estate;
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
bool partition_constraint_failed;
TM_Result result;
GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
LazyTupleTableSlot lazyEPQSlot,
*lazyEPQSlotPtr;
updateCxt->crossPartUpdate = false;
@ -2092,13 +2099,23 @@ lreplace:
* for referential integrity updates in transaction-snapshot mode
* transactions.
*/
if (lockUpdated)
{
MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
lazyEPQSlotPtr = &lazyEPQSlot;
}
else
{
lazyEPQSlotPtr = NULL;
}
result = table_tuple_update(resultRelationDesc, tupleid, slot,
estate->es_output_cid,
estate->es_snapshot,
estate->es_crosscheck_snapshot,
true /* wait for commit */ ,
&context->tmfd, &updateCxt->lockmode,
&updateCxt->updateIndexes);
&updateCxt->updateIndexes,
lazyEPQSlotPtr);
if (result == TM_Ok)
updateCxt->updated = true;
@ -2252,7 +2269,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
static TupleTableSlot *
ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
bool canSetTag)
bool canSetTag, bool locked)
{
EState *estate = context->estate;
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
@ -2314,7 +2331,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
*/
redo_act:
result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
canSetTag, &updateCxt);
canSetTag, !IsolationUsesXactSnapshot(),
&updateCxt);
/*
* If ExecUpdateAct reports that a cross-partition update was done,
@ -2373,80 +2391,39 @@ redo_act:
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
Assert(!locked);
/*
* Already know that we're going to need to do EPQ, so
* fetch tuple directly into the right slot.
* ExecUpdateAct() has already locked the old tuple for
* us. Now we need to copy it to the right slot.
*/
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
resultRelInfo->ri_RangeTableIndex);
result = table_tuple_lock(resultRelationDesc, tupleid,
estate->es_snapshot,
inputslot, estate->es_output_cid,
updateCxt.lockmode, LockWaitBlock,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
&context->tmfd);
/* Make sure ri_oldTupleSlot is initialized. */
if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
ExecInitUpdateProjection(context->mtstate,
resultRelInfo);
switch (result)
{
case TM_Ok:
Assert(context->tmfd.traversed);
/*
* Save the locked tuple for further calculation of the
* new tuple.
*/
oldSlot = resultRelInfo->ri_oldTupleSlot;
ExecCopySlot(oldSlot, inputslot);
ExecMaterializeSlot(oldSlot);
Assert(context->tmfd.traversed);
epqslot = EvalPlanQual(context->epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
inputslot);
if (TupIsNull(epqslot))
/* Tuple not passing quals anymore, exiting... */
return NULL;
/* Make sure ri_oldTupleSlot is initialized. */
if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
ExecInitUpdateProjection(context->mtstate,
resultRelInfo);
/* Fetch the most recent version of old tuple. */
oldSlot = resultRelInfo->ri_oldTupleSlot;
if (!table_tuple_fetch_row_version(resultRelationDesc,
tupleid,
SnapshotAny,
oldSlot))
elog(ERROR, "failed to fetch tuple being updated");
slot = ExecGetUpdateNewTuple(resultRelInfo,
epqslot, oldSlot);
goto redo_act;
case TM_Deleted:
/* tuple already deleted; nothing to do */
return NULL;
case TM_SelfModified:
/*
* This can be reached when following an update
* chain from a tuple updated by another session,
* reaching a tuple that was already updated in
* this transaction. If previously modified by
* this command, ignore the redundant update,
* otherwise error out.
*
* See also TM_SelfModified response to
* table_tuple_update() above.
*/
if (context->tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
return NULL;
default:
/* see table_tuple_lock call in ExecDelete() */
elog(ERROR, "unexpected table_tuple_lock status: %u",
result);
return NULL;
}
epqslot = EvalPlanQual(context->epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
inputslot);
if (TupIsNull(epqslot))
/* Tuple not passing quals anymore, exiting... */
return NULL;
slot = ExecGetUpdateNewTuple(resultRelInfo,
epqslot, oldSlot);
goto redo_act;
}
break;
@ -2688,7 +2665,7 @@ ExecOnConflictUpdate(ModifyTableContext *context,
*returning = ExecUpdate(context, resultRelInfo,
conflictTid, NULL,
resultRelInfo->ri_onConflict->oc_ProjSlot,
canSetTag);
canSetTag, true);
/*
* Clear out existing tuple, as there might not be another conflict among
@ -2891,7 +2868,7 @@ lmerge_matched:
break; /* concurrent update/delete */
}
result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
newslot, false, &updateCxt);
newslot, false, false, &updateCxt);
if (result == TM_Ok && updateCxt.updated)
{
ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
@ -2909,7 +2886,8 @@ lmerge_matched:
return true; /* "do nothing" */
break; /* concurrent update/delete */
}
result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
result = ExecDeleteAct(context, resultRelInfo, tupleid,
false, false);
if (result == TM_Ok)
{
ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
@ -3815,7 +3793,7 @@ ExecModifyTable(PlanState *pstate)
/* Now apply the update. */
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
slot, node->canSetTag);
slot, node->canSetTag, false);
break;
case CMD_DELETE:

View File

@ -367,9 +367,6 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BUILD_HASH_OUTER:
event_name = "HashBuildHashOuter";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE:
event_name = "HashGrowBatchesAllocate";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_DECIDE:
event_name = "HashGrowBatchesDecide";
break;
@ -379,15 +376,18 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BATCHES_FINISH:
event_name = "HashGrowBatchesFinish";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE:
event_name = "HashGrowBatchesReallocate";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION:
event_name = "HashGrowBatchesRepartition";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE:
event_name = "HashGrowBucketsAllocate";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_ELECT:
event_name = "HashGrowBucketsElect";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE:
event_name = "HashGrowBucketsReallocate";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
event_name = "HashGrowBucketsReinsert";
break;

View File

@ -1279,6 +1279,13 @@ setup_config(void)
conflines = replace_guc_value(conflines, "dynamic_shared_memory_type",
dynamic_shared_memory_type, false);
/* Caution: these depend on wal_segment_size_mb, they're not constants */
conflines = replace_guc_value(conflines, "min_wal_size",
pretty_wal_size(DEFAULT_MIN_WAL_SEGS), false);
conflines = replace_guc_value(conflines, "max_wal_size",
pretty_wal_size(DEFAULT_MAX_WAL_SEGS), false);
/*
* Fix up various entries to match the true compile-time defaults. Since
* these are indeed defaults, keep the postgresql.conf lines commented.
@ -1289,12 +1296,6 @@ setup_config(void)
conflines = replace_guc_value(conflines, "port",
DEF_PGPORT_STR, true);
conflines = replace_guc_value(conflines, "min_wal_size",
pretty_wal_size(DEFAULT_MIN_WAL_SEGS), true);
conflines = replace_guc_value(conflines, "max_wal_size",
pretty_wal_size(DEFAULT_MAX_WAL_SEGS), true);
#if DEFAULT_BACKEND_FLUSH_AFTER > 0
snprintf(repltok, sizeof(repltok), "%dkB",
DEFAULT_BACKEND_FLUSH_AFTER * (BLCKSZ / 1024));

View File

@ -630,7 +630,7 @@ for (my $tupidx = 0; $tupidx < $ROWCOUNT; $tupidx++)
die "offnum $offnum should be a redirect" if defined $tup;
sysseek($file, 92, 0) or BAIL_OUT("sysseek failed: $!");
syswrite($file,
pack("L", $ENDIANNESS eq 'little' ? 0x00010011 : 0x11000100))
pack("L", $ENDIANNESS eq 'little' ? 0x00010011 : 0x00230000))
or BAIL_OUT("syswrite failed: $!");
push @expected,
qr/${header}redirected line pointer points to another redirected line pointer at offset \d+/;
@ -647,7 +647,7 @@ for (my $tupidx = 0; $tupidx < $ROWCOUNT; $tupidx++)
# rewrite line pointer with lp.off = 25, lp_flags = 2, lp_len = 0
sysseek($file, 108, 0) or BAIL_OUT("sysseek failed: $!");
syswrite($file,
pack("L", $ENDIANNESS eq 'little' ? 0x00010019 : 0x19000100))
pack("L", $ENDIANNESS eq 'little' ? 0x00010019 : 0x00330000))
or BAIL_OUT("syswrite failed: $!");
push @expected,
qr/${header}redirect line pointer points to offset \d+, but offset \d+ also points there/;

View File

@ -530,7 +530,8 @@ typedef struct TableAmRoutine
Snapshot crosscheck,
bool wait,
TM_FailureData *tmfd,
bool changingPart);
bool changingPart,
LazyTupleTableSlot *lockedSlot);
/* see table_tuple_update() for reference about parameters */
TM_Result (*tuple_update) (Relation rel,
@ -542,7 +543,8 @@ typedef struct TableAmRoutine
bool wait,
TM_FailureData *tmfd,
LockTupleMode *lockmode,
TU_UpdateIndexes *update_indexes);
TU_UpdateIndexes *update_indexes,
LazyTupleTableSlot *lockedSlot);
/* see table_tuple_lock() for reference about parameters */
TM_Result (*tuple_lock) (Relation rel,
@ -1457,7 +1459,7 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
}
/*
* Delete a tuple.
* Delete a tuple (or lock last tuple version if lockedSlot is given).
*
* NB: do not call this directly unless prepared to deal with
* concurrent-update conditions. Use simple_table_tuple_delete instead.
@ -1473,6 +1475,8 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
* tmfd - filled in failure cases (see below)
* changingPart - true iff the tuple is being moved to another partition
* table due to an update of the partition key. Otherwise, false.
* lockedSlot - lazy slot to save the locked tuple if should lock the last
* row version during the concurrent update. NULL if not needed.
*
* Normal, successful return value is TM_Ok, which means we did actually
* delete it. Failure return codes are TM_SelfModified, TM_Updated, and
@ -1485,15 +1489,17 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
static inline TM_Result
table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
Snapshot snapshot, Snapshot crosscheck, bool wait,
TM_FailureData *tmfd, bool changingPart)
TM_FailureData *tmfd, bool changingPart,
LazyTupleTableSlot *lockedSlot)
{
return rel->rd_tableam->tuple_delete(rel, tid, cid,
snapshot, crosscheck,
wait, tmfd, changingPart);
wait, tmfd, changingPart,
lockedSlot);
}
/*
* Update a tuple.
* Update a tuple (or lock last tuple version if lockedSlot is given).
*
* NB: do not call this directly unless you are prepared to deal with
* concurrent-update conditions. Use simple_table_tuple_update instead.
@ -1511,7 +1517,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
* lockmode - filled with lock mode acquired on tuple
* update_indexes - in success cases this is set to true if new index entries
* are required for this tuple
*
* lockedSlot - lazy slot to save the locked tuple if should lock the last
* row version during the concurrent update. NULL if not needed.
* Normal, successful return value is TM_Ok, which means we did actually
* update it. Failure return codes are TM_SelfModified, TM_Updated, and
* TM_BeingModified (the last only possible if wait == false).
@ -1530,12 +1538,14 @@ static inline TM_Result
table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
TU_UpdateIndexes *update_indexes)
TU_UpdateIndexes *update_indexes,
LazyTupleTableSlot *lockedSlot)
{
return rel->rd_tableam->tuple_update(rel, otid, slot,
cid, snapshot, crosscheck,
wait, tmfd,
lockmode, update_indexes);
lockmode, update_indexes,
lockedSlot);
}
/*

View File

@ -254,32 +254,32 @@ typedef struct ParallelHashJoinState
} ParallelHashJoinState;
/* The phases for building batches, used by build_barrier. */
#define PHJ_BUILD_ELECTING 0
#define PHJ_BUILD_ALLOCATING 1
#define PHJ_BUILD_HASHING_INNER 2
#define PHJ_BUILD_HASHING_OUTER 3
#define PHJ_BUILD_RUNNING 4
#define PHJ_BUILD_DONE 5
#define PHJ_BUILD_ELECT 0
#define PHJ_BUILD_ALLOCATE 1
#define PHJ_BUILD_HASH_INNER 2
#define PHJ_BUILD_HASH_OUTER 3
#define PHJ_BUILD_RUN 4
#define PHJ_BUILD_FREE 5
/* The phases for probing each batch, used by for batch_barrier. */
#define PHJ_BATCH_ELECTING 0
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
#define PHJ_BATCH_DONE 4
#define PHJ_BATCH_ELECT 0
#define PHJ_BATCH_ALLOCATE 1
#define PHJ_BATCH_LOAD 2
#define PHJ_BATCH_PROBE 3
#define PHJ_BATCH_FREE 4
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
#define PHJ_GROW_BATCHES_ALLOCATING 1
#define PHJ_GROW_BATCHES_REPARTITIONING 2
#define PHJ_GROW_BATCHES_DECIDING 3
#define PHJ_GROW_BATCHES_FINISHING 4
#define PHJ_GROW_BATCHES_ELECT 0
#define PHJ_GROW_BATCHES_REALLOCATE 1
#define PHJ_GROW_BATCHES_REPARTITION 2
#define PHJ_GROW_BATCHES_DECIDE 3
#define PHJ_GROW_BATCHES_FINISH 4
#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
#define PHJ_GROW_BUCKETS_ELECTING 0
#define PHJ_GROW_BUCKETS_ALLOCATING 1
#define PHJ_GROW_BUCKETS_REINSERTING 2
#define PHJ_GROW_BUCKETS_ELECT 0
#define PHJ_GROW_BUCKETS_REALLOCATE 1
#define PHJ_GROW_BUCKETS_REINSERT 2
#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
typedef struct HashJoinTableData

View File

@ -300,6 +300,44 @@ typedef struct MinimalTupleTableSlot
#define TupIsNull(slot) \
((slot) == NULL || TTS_EMPTY(slot))
/*----------
* LazyTupleTableSlot -- a lazy version of TupleTableSlot.
*
* Sometimes caller might need to pass to the function a slot, which most
* likely will reain undemanded. Preallocating such slot would be a waste of
* resources in the majority of cases. Lazy slot is aimed to resolve this
* problem. It is basically a promise to allocate the slot once it's needed.
* Once callee needs the slot, it could get it using LAZY_TTS_EVAL(lazySlot)
* macro.
*/
typedef struct
{
TupleTableSlot *slot; /* cached slot or NULL if not yet allocated */
TupleTableSlot *(*getSlot) (void *arg); /* callback for slot allocation */
void *getSlotArg; /* argument for the callback above */
} LazyTupleTableSlot;
/*
* A constructor for the lazy slot.
*/
#define MAKE_LAZY_TTS(lazySlot, callback, arg) \
do { \
(lazySlot)->slot = NULL; \
(lazySlot)->getSlot = callback; \
(lazySlot)->getSlotArg = arg; \
} while (false)
/*
* Macro for lazy slot evaluation. NULL lazy slot evaluates to NULL slot.
* Cached version is used if present. Use the callback otherwise.
*/
#define LAZY_TTS_EVAL(lazySlot) \
((lazySlot) ? \
((lazySlot)->slot ? \
(lazySlot)->slot : \
((lazySlot)->slot = (lazySlot)->getSlot((lazySlot)->getSlotArg))) : \
NULL)
/* in executor/execTuples.c */
extern TupleTableSlot *MakeTupleTableSlot(TupleDesc tupleDesc,
const TupleTableSlotOps *tts_ops);

View File

@ -98,13 +98,13 @@ typedef enum
WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER,
WAIT_EVENT_HASH_BUILD_HASH_OUTER,
WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE,
WAIT_EVENT_HASH_GROW_BATCHES_DECIDE,
WAIT_EVENT_HASH_GROW_BATCHES_ELECT,
WAIT_EVENT_HASH_GROW_BATCHES_FINISH,
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE,
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION,
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE,
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
WAIT_EVENT_LOGICAL_APPLY_SEND_DATA,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE,

View File

@ -955,6 +955,7 @@ GenerationPointer
GenericCosts
GenericXLogState
GeqoPrivateData
GetEPQSlotArg
GetForeignJoinPaths_function
GetForeignModifyBatchSize_function
GetForeignPaths_function
@ -1399,6 +1400,7 @@ LagTracker
LargeObjectDesc
LastAttnumInfo
Latch
LazyTupleTableSlot
LerpFunc
LexDescr
LexemeEntry