/*------------------------------------------------------------------------- * * heapam_handler.c * heap table access method code * * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/access/heap/heapam_handler.c * * * NOTES * This files wires up the lower level heapam.c et routines with the * tableam abstraction. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/heapam.h" #include "access/tableam.h" #include "access/xact.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "utils/builtins.h" static const TableAmRoutine heapam_methods; /* ------------------------------------------------------------------------ * Slot related callbacks for heap AM * ------------------------------------------------------------------------ */ static const TupleTableSlotOps * heapam_slot_callbacks(Relation relation) { return &TTSOpsBufferHeapTuple; } /* ------------------------------------------------------------------------ * Index Scan Callbacks for heap AM * ------------------------------------------------------------------------ */ static IndexFetchTableData * heapam_index_fetch_begin(Relation rel) { IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData)); hscan->xs_base.rel = rel; hscan->xs_cbuf = InvalidBuffer; return &hscan->xs_base; } static void heapam_index_fetch_reset(IndexFetchTableData *scan) { IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; if (BufferIsValid(hscan->xs_cbuf)) { ReleaseBuffer(hscan->xs_cbuf); hscan->xs_cbuf = InvalidBuffer; } } static void heapam_index_fetch_end(IndexFetchTableData *scan) { IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; heapam_index_fetch_reset(scan); pfree(hscan); } static bool heapam_index_fetch_tuple(struct IndexFetchTableData *scan, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, bool *call_again, bool *all_dead) { IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; bool got_heap_tuple; Assert(TTS_IS_BUFFERTUPLE(slot)); /* We can skip the buffer-switching logic if we're in mid-HOT chain. */ if (!*call_again) { /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = hscan->xs_cbuf; hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, hscan->xs_base.rel, ItemPointerGetBlockNumber(tid)); /* * Prune page, but only if we weren't already on this page */ if (prev_buf != hscan->xs_cbuf) heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf); } /* Obtain share-lock on the buffer so we can examine visibility */ LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_SHARE); got_heap_tuple = heap_hot_search_buffer(tid, hscan->xs_base.rel, hscan->xs_cbuf, snapshot, &bslot->base.tupdata, all_dead, !*call_again); bslot->base.tupdata.t_self = *tid; LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK); if (got_heap_tuple) { /* * Only in a non-MVCC snapshot can more than one member of the HOT * chain be visible. */ *call_again = !IsMVCCSnapshot(snapshot); slot->tts_tableOid = RelationGetRelid(scan->rel); ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf); } else { /* We've reached the end of the HOT chain. */ *call_again = false; } return got_heap_tuple; } /* ------------------------------------------------------------------------ * Callbacks for non-modifying operations on individual tuples for heap AM * ------------------------------------------------------------------------ */ static bool heapam_fetch_row_version(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; Buffer buffer; Assert(TTS_IS_BUFFERTUPLE(slot)); bslot->base.tupdata.t_self = *tid; if (heap_fetch(relation, snapshot, &bslot->base.tupdata, &buffer)) { /* store in slot, transferring existing pin */ ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata, slot, buffer); slot->tts_tableOid = RelationGetRelid(relation); return true; } return false; } static bool heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; bool res; Assert(TTS_IS_BUFFERTUPLE(slot)); Assert(BufferIsValid(bslot->buffer)); /* * We need buffer pin and lock to call HeapTupleSatisfiesVisibility. * Caller should be holding pin, but not lock. */ LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE); res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot, bslot->buffer); LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK); return res; } /* ---------------------------------------------------------------------------- * Functions for manipulations of physical tuples for heap AM. * ---------------------------------------------------------------------------- */ static void heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); /* Update the tuple with table oid */ slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; /* Perform the insertion, and copy the resulting ItemPointer */ heap_insert(relation, tuple, cid, options, bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) pfree(tuple); } static void heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate, uint32 specToken) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); /* Update the tuple with table oid */ slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); options |= HEAP_INSERT_SPECULATIVE; /* Perform the insertion, and copy the resulting ItemPointer */ heap_insert(relation, tuple, cid, options, bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) pfree(tuple); } static void heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 spekToken, bool succeeded) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); /* adjust the tuple's state accordingly */ if (!succeeded) heap_finish_speculative(relation, &slot->tts_tid); else heap_abort_speculative(relation, &slot->tts_tid); if (shouldFree) pfree(tuple); } static TM_Result heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, bool changingPart) { /* * Currently Deleting of index tuples are handled at vacuum, in case if * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); } static TM_Result heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, bool *update_indexes) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); TM_Result result; /* Update the tuple with table oid */ slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; result = heap_update(relation, otid, tuple, cid, crosscheck, wait, tmfd, lockmode); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); /* * Decide whether new index entries are needed for the tuple * * Note: heap_update returns the tid (location) of the new tuple in the * t_self field. * * If it's a HOT update, we mustn't insert new index entries. */ *update_indexes = result == TM_Ok && !HeapTupleIsHeapOnly(tuple); if (shouldFree) pfree(tuple); return result; } static TM_Result heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; TM_Result result; Buffer buffer; HeapTuple tuple = &bslot->base.tupdata; bool follow_updates; follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; tmfd->traversed = false; Assert(TTS_IS_BUFFERTUPLE(slot)); tuple_lock_retry: tuple->t_self = *tid; result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, follow_updates, &buffer, tmfd); if (result == TM_Updated && (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) { ReleaseBuffer(buffer); /* Should not encounter speculative tuple on recheck */ Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) { SnapshotData SnapshotDirty; TransactionId priorXmax; /* it was updated, so look at the updated version */ *tid = tmfd->ctid; /* updated row should have xmin matching this xmax */ priorXmax = tmfd->xmax; /* signal that a tuple later in the chain is getting locked */ tmfd->traversed = true; /* * fetch target tuple * * Loop here to deal with updated or busy tuples */ InitDirtySnapshot(SnapshotDirty); for (;;) { if (ItemPointerIndicatesMovedPartitions(tid)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update"))); tuple->t_self = *tid; if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer)) { /* * If xmin isn't what we're expecting, the slot must have * been recycled and reused for an unrelated tuple. This * implies that the latest version of the row was deleted, * so we need do nothing. (Should be safe to examine xmin * without getting buffer's content lock. We assume * reading a TransactionId to be atomic, and Xmin never * changes in an existing tuple, except to invalid or * frozen, and neither of those can match priorXmax.) */ if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), priorXmax)) { ReleaseBuffer(buffer); return TM_Deleted; } /* otherwise xmin should not be dirty... */ if (TransactionIdIsValid(SnapshotDirty.xmin)) elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); /* * If tuple is being updated by other transaction then we * have to wait for its commit/abort, or die trying. */ if (TransactionIdIsValid(SnapshotDirty.xmax)) { ReleaseBuffer(buffer); switch (wait_policy) { case LockWaitBlock: XactLockTableWait(SnapshotDirty.xmax, relation, &tuple->t_self, XLTW_FetchUpdated); break; case LockWaitSkip: if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) /* skip instead of waiting */ return TM_WouldBlock; break; case LockWaitError: if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); break; } continue; /* loop back to repeat heap_fetch */ } /* * If tuple was inserted by our own transaction, we have * to check cmin against cid: cmin >= current CID means * our command cannot see the tuple, so we should ignore * it. Otherwise heap_lock_tuple() will throw an error, * and so would any later attempt to update or delete the * tuple. (We need not check cmax because * HeapTupleSatisfiesDirty will consider a tuple deleted * by our transaction dead, regardless of cmax.) We just * checked that priorXmax == xmin, so we can test that * variable instead of doing HeapTupleHeaderGetXmin again. */ if (TransactionIdIsCurrentTransactionId(priorXmax) && HeapTupleHeaderGetCmin(tuple->t_data) >= cid) { ReleaseBuffer(buffer); return TM_Invisible; } /* * This is a live tuple, so try to lock it again. */ ReleaseBuffer(buffer); goto tuple_lock_retry; } /* * If the referenced slot was actually empty, the latest * version of the row must have been deleted, so we need do * nothing. */ if (tuple->t_data == NULL) { return TM_Deleted; } /* * As above, if xmin isn't what we're expecting, do nothing. */ if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data), priorXmax)) { if (BufferIsValid(buffer)) ReleaseBuffer(buffer); return TM_Deleted; } /* * If we get here, the tuple was found but failed * SnapshotDirty. Assuming the xmin is either a committed xact * or our own xact (as it certainly should be if we're trying * to modify the tuple), this must mean that the row was * updated or deleted by either a committed xact or our own * xact. If it was deleted, we can ignore it; if it was * updated then chain up to the next version and repeat the * whole process. * * As above, it should be safe to examine xmax and t_ctid * without the buffer content lock, because they can't be * changing. */ if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) { /* deleted, so forget about it */ if (BufferIsValid(buffer)) ReleaseBuffer(buffer); return TM_Deleted; } /* updated, so look at the updated row */ *tid = tuple->t_data->t_ctid; /* updated row should have xmin matching this xmax */ priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); if (BufferIsValid(buffer)) ReleaseBuffer(buffer); /* loop back to fetch next in chain */ } } else { /* tuple was deleted, so give up */ return TM_Deleted; } } slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; /* store in slot, transferring existing pin */ ExecStorePinnedBufferHeapTuple(tuple, slot, buffer); return result; } /* ------------------------------------------------------------------------ * Definition of the heap table access method. * ------------------------------------------------------------------------ */ static const TableAmRoutine heapam_methods = { .type = T_TableAmRoutine, .slot_callbacks = heapam_slot_callbacks, .scan_begin = heap_beginscan, .scan_end = heap_endscan, .scan_rescan = heap_rescan, .scan_getnextslot = heap_getnextslot, .parallelscan_estimate = table_block_parallelscan_estimate, .parallelscan_initialize = table_block_parallelscan_initialize, .parallelscan_reinitialize = table_block_parallelscan_reinitialize, .index_fetch_begin = heapam_index_fetch_begin, .index_fetch_reset = heapam_index_fetch_reset, .index_fetch_end = heapam_index_fetch_end, .index_fetch_tuple = heapam_index_fetch_tuple, .tuple_insert = heapam_tuple_insert, .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, .tuple_fetch_row_version = heapam_fetch_row_version, .tuple_get_latest_tid = heap_get_latest_tid, .tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot, }; const TableAmRoutine * GetHeapamTableAmRoutine(void) { return &heapam_methods; } Datum heap_tableam_handler(PG_FUNCTION_ARGS) { PG_RETURN_POINTER(&heapam_methods); }