/*------------------------------------------------------------------------- * * heapam.c * heap access method code * * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.130 2002/03/02 21:39:17 momjian Exp $ * * * INTERFACE ROUTINES * relation_open - open any relation by relation OID * relation_openr - open any relation by name * relation_close - close any relation * heap_open - open a heap relation by relation OID * heap_openr - open a heap relation by name * heap_close - (now just a macro for relation_close) * heap_beginscan - begin relation scan * heap_rescan - restart a relation scan * heap_endscan - end relation scan * heap_getnext - retrieve next tuple in scan * heap_fetch - retrive tuple with tid * heap_insert - insert tuple into a relation * heap_delete - delete a tuple from a relation * heap_update - replace a tuple in a relation with another tuple * heap_markpos - mark scan position * heap_restrpos - restore position to marked location * * NOTES * This file contains the heap_ routines which implement * the POSTGRES heap access method used for all POSTGRES * relations. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/heapam.h" #include "access/hio.h" #include "access/tuptoaster.h" #include "access/valid.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "miscadmin.h" #include "utils/inval.h" #include "utils/relcache.h" #include "pgstat.h" /* comments are in heap_update */ static xl_heaptid _locked_tuple_; static void _heap_unlock_tuple(void *data); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move); /* ---------------------------------------------------------------- * heap support routines * ---------------------------------------------------------------- */ /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- */ static void initscan(HeapScanDesc scan, Relation relation, int atend, unsigned nkeys, ScanKey key) { /* * Make sure we have up-to-date idea of number of blocks in relation. * It is sufficient to do this once at scan start, since any tuples * added while the scan is in progress will be invisible to my * transaction anyway... */ relation->rd_nblocks = RelationGetNumberOfBlocks(relation); scan->rs_ctup.t_datamcxt = NULL; scan->rs_ctup.t_data = NULL; scan->rs_cbuf = InvalidBuffer; /* we don't have a marked position... */ ItemPointerSetInvalid(&(scan->rs_mctid)); /* * copy the scan key, if appropriate */ if (key != NULL) memmove(scan->rs_key, key, nkeys * sizeof(ScanKeyData)); } /* ---------------- * heapgettup - fetch next heap tuple * * routine used by heap_getnext() which does most of the * real work in scanning tuples. * * The passed-in *buffer must be either InvalidBuffer or the pinned * current page of the scan. If we have to move to another page, * we will unpin this buffer (if valid). On return, *buffer is either * InvalidBuffer or the ID of a pinned buffer. * ---------------- */ static void heapgettup(Relation relation, int dir, HeapTuple tuple, Buffer *buffer, Snapshot snapshot, int nkeys, ScanKey key) { ItemId lpp; Page dp; BlockNumber page; BlockNumber pages; int lines; OffsetNumber lineoff; int linesleft; ItemPointer tid; /* * increment access statistics */ IncrHeapAccessStat(local_heapgettup); IncrHeapAccessStat(global_heapgettup); tid = (tuple->t_data == NULL) ? (ItemPointer) NULL : &(tuple->t_self); /* * debugging stuff * * check validity of arguments, here and for other functions too Note: no * locking manipulations needed--this is a local function */ #ifdef HEAPDEBUGALL if (ItemPointerIsValid(tid)) { elog(LOG, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)", RelationGetRelationName(relation), tid, tid->ip_blkid, tid->ip_posid, dir); } else { elog(LOG, "heapgettup(%s, tid=0x%x, dir=%d, ...)", RelationGetRelationName(relation), tid, dir); } elog(LOG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key); elog(LOG, "heapgettup: relation(%c)=`%s', %p", relation->rd_rel->relkind, RelationGetRelationName(relation), snapshot); #endif /* !defined(HEAPLOGALL) */ if (!ItemPointerIsValid(tid)) { Assert(!PointerIsValid(tid)); tid = NULL; } tuple->t_tableOid = relation->rd_id; /* * return null immediately if relation is empty */ if ((pages = relation->rd_nblocks) == 0) { if (BufferIsValid(*buffer)) ReleaseBuffer(*buffer); *buffer = InvalidBuffer; tuple->t_datamcxt = NULL; tuple->t_data = NULL; return; } /* * calculate next starting lineoff, given scan direction */ if (!dir) { /* * ``no movement'' scan direction: refetch same tuple */ if (tid == NULL) { if (BufferIsValid(*buffer)) ReleaseBuffer(*buffer); *buffer = InvalidBuffer; tuple->t_datamcxt = NULL; tuple->t_data = NULL; return; } *buffer = ReleaseAndReadBuffer(*buffer, relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_SHARE); dp = (Page) BufferGetPage(*buffer); lineoff = ItemPointerGetOffsetNumber(tid); lpp = PageGetItemId(dp, lineoff); tuple->t_datamcxt = NULL; tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return; } else if (dir < 0) { /* * reverse scan direction */ if (tid == NULL) { page = pages - 1; /* final page */ } else { page = ItemPointerGetBlockNumber(tid); /* current page */ } Assert(page < pages); *buffer = ReleaseAndReadBuffer(*buffer, relation, page); if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_SHARE); dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber(dp); if (tid == NULL) { lineoff = lines; /* final offnum */ } else { lineoff = /* previous offnum */ OffsetNumberPrev(ItemPointerGetOffsetNumber(tid)); } /* page and lineoff now reference the physically previous tid */ } else { /* * forward scan direction */ if (tid == NULL) { page = 0; /* first page */ lineoff = FirstOffsetNumber; /* first offnum */ } else { page = ItemPointerGetBlockNumber(tid); /* current page */ lineoff = /* next offnum */ OffsetNumberNext(ItemPointerGetOffsetNumber(tid)); } Assert(page < pages); *buffer = ReleaseAndReadBuffer(*buffer, relation, page); if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_SHARE); dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber(dp); /* page and lineoff now reference the physically next tid */ } /* 'dir' is now non-zero */ /* * calculate line pointer and number of remaining items to check on * this page. */ lpp = PageGetItemId(dp, lineoff); if (dir < 0) linesleft = lineoff - 1; else linesleft = lines - lineoff; /* * advance the scan until we find a qualifying tuple or run out of * stuff to scan */ for (;;) { while (linesleft >= 0) { if (ItemIdIsUsed(lpp)) { tuple->t_datamcxt = NULL; tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); ItemPointerSet(&(tuple->t_self), page, lineoff); /* * if current tuple qualifies, return it. */ HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp, snapshot, nkeys, key); if (tuple->t_data != NULL) { LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return; } } /* * otherwise move to the next item on the page */ --linesleft; if (dir < 0) { --lpp; /* move back in this page's ItemId array */ --lineoff; } else { ++lpp; /* move forward in this page's ItemId * array */ ++lineoff; } } /* * if we get here, it means we've exhausted the items on this page * and it's time to move to the next. */ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); /* * return NULL if we've exhausted all the pages */ if ((dir < 0) ? (page == 0) : (page + 1 >= pages)) { if (BufferIsValid(*buffer)) ReleaseBuffer(*buffer); *buffer = InvalidBuffer; tuple->t_datamcxt = NULL; tuple->t_data = NULL; return; } page = (dir < 0) ? (page - 1) : (page + 1); Assert(page < pages); *buffer = ReleaseAndReadBuffer(*buffer, relation, page); if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_SHARE); dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber((Page) dp); linesleft = lines - 1; if (dir < 0) { lineoff = lines; lpp = PageGetItemId(dp, lines); } else { lineoff = FirstOffsetNumber; lpp = PageGetItemId(dp, FirstOffsetNumber); } } } #if defined(DISABLE_COMPLEX_MACRO) /* * This is formatted so oddly so that the correspondence to the macro * definition in access/heapam.h is maintained. */ Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull) { return ( (attnum) > 0 ? ( ((isnull) ? (*(isnull) = false) : (dummyret) NULL), HeapTupleNoNulls(tup) ? ( (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ? ( fetchatt((tupleDesc)->attrs[(attnum) - 1], (char *) (tup)->t_data + (tup)->t_data->t_hoff + (tupleDesc)->attrs[(attnum) - 1]->attcacheoff) ) : nocachegetattr((tup), (attnum), (tupleDesc), (isnull)) ) : ( att_isnull((attnum) - 1, (tup)->t_data->t_bits) ? ( ((isnull) ? (*(isnull) = true) : (dummyret) NULL), (Datum) NULL ) : ( nocachegetattr((tup), (attnum), (tupleDesc), (isnull)) ) ) ) : ( (Datum) NULL ) ); } #endif /* defined(DISABLE_COMPLEX_MACRO) */ /* ---------------------------------------------------------------- * heap access method interface * ---------------------------------------------------------------- */ /* ---------------- * relation_open - open any relation by relation OID * * If lockmode is not "NoLock", the specified kind of lock is * obtained on the relation. (Generally, NoLock should only be * used if the caller knows it has some appropriate lock on the * relation already.) * * An error is raised if the relation does not exist. * * NB: a "relation" is anything with a pg_class entry. The caller is * expected to check whether the relkind is something it can handle. * ---------------- */ Relation relation_open(Oid relationId, LOCKMODE lockmode) { Relation r; Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); /* * increment access statistics */ IncrHeapAccessStat(local_open); IncrHeapAccessStat(global_open); /* The relcache does all the real work... */ r = RelationIdGetRelation(relationId); if (!RelationIsValid(r)) elog(ERROR, "Relation %u does not exist", relationId); if (lockmode != NoLock) LockRelation(r, lockmode); return r; } /* ---------------- * relation_openr - open any relation by name * * As above, but lookup by name instead of OID. * ---------------- */ Relation relation_openr(const char *relationName, LOCKMODE lockmode) { Relation r; Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); /* * increment access statistics */ IncrHeapAccessStat(local_openr); IncrHeapAccessStat(global_openr); /* * Check for shared-cache-inval messages before trying to open the * relation. This is needed to cover the case where the name * identifies a rel that has been dropped and recreated since the * start of our transaction: if we don't flush the old relcache entry * then we'll latch onto that entry and suffer an error when we do * LockRelation. Note that relation_open does not need to do this, * since a relation's OID never changes. * * We skip this if asked for NoLock, on the assumption that the caller * has already ensured some appropriate lock is held. */ if (lockmode != NoLock) AcceptInvalidationMessages(); /* The relcache does all the real work... */ r = RelationNameGetRelation(relationName); if (!RelationIsValid(r)) elog(ERROR, "Relation \"%s\" does not exist", relationName); if (lockmode != NoLock) LockRelation(r, lockmode); return r; } /* ---------------- * relation_close - close any relation * * If lockmode is not "NoLock", we first release the specified lock. * * Note that it is often sensible to hold a lock beyond relation_close; * in that case, the lock is released automatically at xact end. * ---------------- */ void relation_close(Relation relation, LOCKMODE lockmode) { Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); /* * increment access statistics */ IncrHeapAccessStat(local_close); IncrHeapAccessStat(global_close); if (lockmode != NoLock) UnlockRelation(relation, lockmode); /* The relcache does the real work... */ RelationClose(relation); } /* ---------------- * heap_open - open a heap relation by relation OID * * This is essentially relation_open plus check that the relation * is not an index or special relation. (The caller should also check * that it's not a view before assuming it has storage.) * ---------------- */ Relation heap_open(Oid relationId, LOCKMODE lockmode) { Relation r; r = relation_open(relationId, lockmode); if (r->rd_rel->relkind == RELKIND_INDEX) elog(ERROR, "%s is an index relation", RelationGetRelationName(r)); else if (r->rd_rel->relkind == RELKIND_SPECIAL) elog(ERROR, "%s is a special relation", RelationGetRelationName(r)); pgstat_initstats(&r->pgstat_info, r); return r; } /* ---------------- * heap_openr - open a heap relation by name * * As above, but lookup by name instead of OID. * ---------------- */ Relation heap_openr(const char *relationName, LOCKMODE lockmode) { Relation r; r = relation_openr(relationName, lockmode); if (r->rd_rel->relkind == RELKIND_INDEX) elog(ERROR, "%s is an index relation", RelationGetRelationName(r)); else if (r->rd_rel->relkind == RELKIND_SPECIAL) elog(ERROR, "%s is a special relation", RelationGetRelationName(r)); pgstat_initstats(&r->pgstat_info, r); return r; } /* ---------------- * heap_beginscan - begin relation scan * ---------------- */ HeapScanDesc heap_beginscan(Relation relation, int atend, Snapshot snapshot, unsigned nkeys, ScanKey key) { HeapScanDesc scan; /* * increment access statistics */ IncrHeapAccessStat(local_beginscan); IncrHeapAccessStat(global_beginscan); /* * sanity checks */ if (!RelationIsValid(relation)) elog(ERROR, "heap_beginscan: !RelationIsValid(relation)"); /* * increment relation ref count while scanning relation * * This is just to make really sure the relcache entry won't go away * while the scan has a pointer to it. Caller should be holding the * rel open anyway, so this is redundant in all normal scenarios... */ RelationIncrementReferenceCount(relation); /* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */ if (relation->rd_rel->relkind == RELKIND_UNCATALOGED) snapshot = SnapshotSelf; /* * allocate and initialize scan descriptor */ scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData)); scan->rs_rd = relation; scan->rs_snapshot = snapshot; scan->rs_nkeys = (short) nkeys; pgstat_initstats(&scan->rs_pgstat_info, relation); /* * we do this here instead of in initscan() because heap_rescan also * calls initscan() and we don't want to allocate memory again */ if (nkeys) scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); else scan->rs_key = NULL; initscan(scan, relation, atend, nkeys, key); return scan; } /* ---------------- * heap_rescan - restart a relation scan * ---------------- */ void heap_rescan(HeapScanDesc scan, bool scanFromEnd, ScanKey key) { /* * increment access statistics */ IncrHeapAccessStat(local_rescan); IncrHeapAccessStat(global_rescan); /* * unpin scan buffers */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); /* * reinitialize scan descriptor */ initscan(scan, scan->rs_rd, scanFromEnd, scan->rs_nkeys, key); pgstat_reset_heap_scan(&scan->rs_pgstat_info); } /* ---------------- * heap_endscan - end relation scan * * See how to integrate with index scans. * Check handling if reldesc caching. * ---------------- */ void heap_endscan(HeapScanDesc scan) { /* * increment access statistics */ IncrHeapAccessStat(local_endscan); IncrHeapAccessStat(global_endscan); /* Note: no locking manipulations needed */ /* * unpin scan buffers */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); /* * decrement relation reference count and free scan descriptor storage */ RelationDecrementReferenceCount(scan->rs_rd); if (scan->rs_key) pfree(scan->rs_key); pfree(scan); } /* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. * We don't return the buffer anymore, but you can get it from the * returned HeapTuple. * ---------------- */ #ifdef HEAPDEBUGALL #define HEAPDEBUG_1 \ elog(LOG, "heap_getnext([%s,nkeys=%d],backw=%d) called", \ RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, backw) #define HEAPDEBUG_2 \ elog(LOG, "heap_getnext returning EOS") #define HEAPDEBUG_3 \ elog(LOG, "heap_getnext returning tuple"); #else #define HEAPDEBUG_1 #define HEAPDEBUG_2 #define HEAPDEBUG_3 #endif /* !defined(HEAPDEBUGALL) */ HeapTuple heap_getnext(HeapScanDesc scan, int backw) { /* * increment access statistics */ IncrHeapAccessStat(local_getnext); IncrHeapAccessStat(global_getnext); /* Note: no locking manipulations needed */ /* * argument checks */ if (scan == NULL) elog(ERROR, "heap_getnext: NULL relscan"); HEAPDEBUG_1; /* heap_getnext( info ) */ if (backw) { /* * handle reverse scan */ heapgettup(scan->rs_rd, -1, &(scan->rs_ctup), &(scan->rs_cbuf), scan->rs_snapshot, scan->rs_nkeys, scan->rs_key); if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf)) { HEAPDEBUG_2; /* heap_getnext returning EOS */ return NULL; } } else { /* * handle forward scan */ heapgettup(scan->rs_rd, 1, &(scan->rs_ctup), &(scan->rs_cbuf), scan->rs_snapshot, scan->rs_nkeys, scan->rs_key); if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf)) { HEAPDEBUG_2; /* heap_getnext returning EOS */ return NULL; } } pgstat_count_heap_scan(&scan->rs_pgstat_info); /* * if we get here it means we have a new current scan tuple, so point * to the proper return buffer and return the tuple. */ HEAPDEBUG_3; /* heap_getnext returning tuple */ if (scan->rs_ctup.t_data != NULL) pgstat_count_heap_getnext(&scan->rs_pgstat_info); return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup)); } /* ---------------- * heap_fetch - retrive tuple with tid * * Currently ignores LP_IVALID during processing! * * Because this is not part of a scan, there is no way to * automatically lock/unlock the shared buffers. * For this reason, we require that the user retrieve the buffer * value, and they are required to BufferRelease() it when they * are done. If they want to make a copy of it before releasing it, * they can call heap_copytyple(). * ---------------- */ void heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, IndexScanDesc iscan) { ItemId lp; Buffer buffer; PageHeader dp; ItemPointer tid = &(tuple->t_self); OffsetNumber offnum; /* * increment access statistics */ IncrHeapAccessStat(local_fetch); IncrHeapAccessStat(global_fetch); /* * get the buffer from the relation descriptor. Note that this does a * buffer pin. */ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(buffer)) elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%lx) failed", RelationGetRelationName(relation), (long) tid); LockBuffer(buffer, BUFFER_LOCK_SHARE); /* * get the item line pointer corresponding to the requested tid */ dp = (PageHeader) BufferGetPage(buffer); offnum = ItemPointerGetOffsetNumber(tid); lp = PageGetItemId(dp, offnum); /* * more sanity checks */ if (!ItemIdIsUsed(lp)) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); *userbuf = InvalidBuffer; tuple->t_datamcxt = NULL; tuple->t_data = NULL; return; } tuple->t_datamcxt = NULL; tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tuple->t_len = ItemIdGetLength(lp); tuple->t_tableOid = relation->rd_id; /* * check time qualification of tid */ HeapTupleSatisfies(tuple, relation, buffer, dp, snapshot, 0, (ScanKey) NULL); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); if (tuple->t_data == NULL) { /* Tuple failed time check, so we can release now. */ ReleaseBuffer(buffer); *userbuf = InvalidBuffer; } else { /* * All checks passed, so return the tuple as valid. Caller is now * responsible for releasing the buffer. */ *userbuf = buffer; if (iscan != NULL) pgstat_count_heap_fetch(&iscan->xs_pgstat_info); else pgstat_count_heap_fetch(&relation->pgstat_info); } } /* ---------------- * heap_get_latest_tid - get the latest tid of a specified tuple * * ---------------- */ ItemPointer heap_get_latest_tid(Relation relation, Snapshot snapshot, ItemPointer tid) { ItemId lp = NULL; Buffer buffer; PageHeader dp; OffsetNumber offnum; HeapTupleData tp; HeapTupleHeader t_data; ItemPointerData ctid; bool invalidBlock, linkend; /* * get the buffer from the relation descriptor Note that this does a * buffer pin. */ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(buffer)) elog(ERROR, "heap_get_latest_tid: %s relation: ReadBuffer(%lx) failed", RelationGetRelationName(relation), (long) tid); LockBuffer(buffer, BUFFER_LOCK_SHARE); /* * get the item line pointer corresponding to the requested tid */ dp = (PageHeader) BufferGetPage(buffer); offnum = ItemPointerGetOffsetNumber(tid); invalidBlock = true; if (!PageIsNew(dp)) { lp = PageGetItemId(dp, offnum); if (ItemIdIsUsed(lp)) invalidBlock = false; } if (invalidBlock) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return NULL; } /* * more sanity checks */ tp.t_datamcxt = NULL; t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tp.t_len = ItemIdGetLength(lp); tp.t_self = *tid; ctid = tp.t_data->t_ctid; /* * check time qualification of tid */ HeapTupleSatisfies(&tp, relation, buffer, dp, snapshot, 0, (ScanKey) NULL); linkend = true; if ((t_data->t_infomask & HEAP_XMAX_COMMITTED) && !ItemPointerEquals(tid, &ctid)) linkend = false; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); if (tp.t_data == NULL) { if (linkend) return NULL; heap_get_latest_tid(relation, snapshot, &ctid); *tid = ctid; } return tid; } /* ---------------- * heap_insert - insert tuple into a heap * * The assignment of t_min (and thus the others) should be * removed eventually. * ---------------- */ Oid heap_insert(Relation relation, HeapTuple tup) { Buffer buffer; /* increment access statistics */ IncrHeapAccessStat(local_insert); IncrHeapAccessStat(global_insert); if (relation->rd_rel->relhasoids) { /* * If the object id of this tuple has already been assigned, trust * the caller. There are a couple of ways this can happen. At * initial db creation, the backend program sets oids for tuples. * When we define an index, we set the oid. Finally, in the * future, we may allow users to set their own object ids in order * to support a persistent object store (objects need to contain * pointers to one another). */ if (!OidIsValid(tup->t_data->t_oid)) tup->t_data->t_oid = newoid(); else CheckMaxObjectId(tup->t_data->t_oid); } TransactionIdStore(GetCurrentTransactionId(), &(tup->t_data->t_xmin)); tup->t_data->t_cmin = GetCurrentCommandId(); StoreInvalidTransactionId(&(tup->t_data->t_xmax)); tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; tup->t_tableOid = relation->rd_id; #ifdef TUPLE_TOASTER_ACTIVE /* * If the new tuple is too big for storage or contains already toasted * attributes from some other relation, invoke the toaster. */ if (HeapTupleHasExtended(tup) || (MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD)) heap_tuple_toast_attrs(relation, tup, NULL); #endif /* Find buffer to insert this tuple into */ buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer); /* NO ELOG(ERROR) from here till changes are logged */ START_CRIT_SECTION(); RelationPutHeapTuple(relation, buffer, tup); pgstat_count_heap_insert(&relation->pgstat_info); /* XLOG stuff */ { xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; XLogRecData rdata[3]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; xlrec.target.node = relation->rd_node; xlrec.target.tid = tup->t_self; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapInsert; rdata[0].next = &(rdata[1]); xlhdr.t_oid = tup->t_data->t_oid; xlhdr.t_natts = tup->t_data->t_natts; xlhdr.t_hoff = tup->t_data->t_hoff; xlhdr.mask = tup->t_data->t_infomask; rdata[1].buffer = buffer; rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; rdata[1].next = &(rdata[2]); rdata[2].buffer = buffer; rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits); rdata[2].next = NULL; /* If this is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; rdata[1].buffer = rdata[2].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); } END_CRIT_SECTION(); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); /* * If tuple is cachable, mark it for rollback from the caches in case * we abort. Note it is OK to do this after WriteBuffer releases the * buffer, because the "tup" data structure is all in local memory, * not in the shared buffer. */ RelationMark4RollbackHeapTuple(relation, tup); return tup->t_data->t_oid; } /* * heap_delete - delete a tuple * * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_delete instead. */ int heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid) { ItemId lp; HeapTupleData tp; PageHeader dp; Buffer buffer; int result; /* increment access statistics */ IncrHeapAccessStat(local_delete); IncrHeapAccessStat(global_delete); Assert(ItemPointerIsValid(tid)); buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(buffer)) elog(ERROR, "heap_delete: failed ReadBuffer"); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); tp.t_datamcxt = NULL; tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tp.t_len = ItemIdGetLength(lp); tp.t_self = *tid; tp.t_tableOid = relation->rd_id; l1: result = HeapTupleSatisfiesUpdate(&tp); if (result == HeapTupleInvisible) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); elog(ERROR, "heap_delete: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = tp.t_data->t_xmax; /* sleep until concurrent transaction ends */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l1; /* * xwait is committed but if xwait had just marked the tuple for * update then some other xaction could update this tuple before * we got to this point. */ if (!TransactionIdEquals(tp.t_data->t_xmax, xwait)) goto l1; if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED)) { tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(buffer); } /* if tuple was marked for update but not updated... */ if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); *ctid = tp.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; } START_CRIT_SECTION(); /* store transaction information of xact deleting the tuple */ TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax)); tp.t_data->t_cmax = GetCurrentCommandId(); tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); /* XLOG stuff */ { xl_heap_delete xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; xlrec.target.node = relation->rd_node; xlrec.target.tid = tp.t_self; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapDelete; rdata[0].next = &(rdata[1]); rdata[1].buffer = buffer; rdata[1].data = NULL; rdata[1].len = 0; rdata[1].next = NULL; recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(dp, recptr); PageSetSUI(dp, ThisStartUpID); } END_CRIT_SECTION(); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); #ifdef TUPLE_TOASTER_ACTIVE /* * If the relation has toastable attributes, we need to delete no * longer needed items there too. We have to do this before * WriteBuffer because we need to look at the contents of the tuple, * but it's OK to release the context lock on the buffer first. */ if (HeapTupleHasExtended(&tp)) heap_tuple_toast_attrs(relation, NULL, &(tp)); #endif pgstat_count_heap_delete(&relation->pgstat_info); /* * Mark tuple for invalidation from system caches at next command * boundary. We have to do this before WriteBuffer because we need to * look at the contents of the tuple, so we need to hold our refcount * on the buffer. */ RelationInvalidateHeapTuple(relation, &tp); WriteBuffer(buffer); return HeapTupleMayBeUpdated; } /* * simple_heap_delete - delete a tuple * * This routine may be used to delete a tuple when concurrent updates of * the target tuple are not expected (for example, because we have a lock * on the relation associated with the tuple). Any failure is reported * via elog(). */ void simple_heap_delete(Relation relation, ItemPointer tid) { ItemPointerData ctid; int result; result = heap_delete(relation, tid, &ctid); switch (result) { case HeapTupleSelfUpdated: /* Tuple was already updated in current command? */ elog(ERROR, "simple_heap_delete: tuple already updated by self"); break; case HeapTupleMayBeUpdated: /* done successfully */ break; case HeapTupleUpdated: elog(ERROR, "simple_heap_delete: tuple concurrently updated"); break; default: elog(ERROR, "Unknown status %u from heap_delete", result); break; } } /* * heap_update - replace a tuple * * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_update instead. */ int heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, ItemPointer ctid) { ItemId lp; HeapTupleData oldtup; PageHeader dp; Buffer buffer, newbuf; bool need_toast, already_marked; Size newtupsize, pagefree; int result; /* increment access statistics */ IncrHeapAccessStat(local_replace); IncrHeapAccessStat(global_replace); Assert(ItemPointerIsValid(otid)); buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); if (!BufferIsValid(buffer)) elog(ERROR, "heap_update: failed ReadBuffer"); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid)); oldtup.t_datamcxt = NULL; oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp); oldtup.t_len = ItemIdGetLength(lp); oldtup.t_self = *otid; /* * Note: beyond this point, use oldtup not otid to refer to old tuple. * otid may very well point at newtup->t_self, which we will overwrite * with the new tuple's location, so there's great risk of confusion * if we use otid anymore. */ l2: result = HeapTupleSatisfiesUpdate(&oldtup); if (result == HeapTupleInvisible) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); elog(ERROR, "heap_update: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = oldtup.t_data->t_xmax; /* sleep untill concurrent transaction ends */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l2; /* * xwait is committed but if xwait had just marked the tuple for * update then some other xaction could update this tuple before * we got to this point. */ if (!TransactionIdEquals(oldtup.t_data->t_xmax, xwait)) goto l2; if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED)) { oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(buffer); } /* if tuple was marked for update but not updated... */ if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); *ctid = oldtup.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; } /* Fill in OID and transaction status data for newtup */ newtup->t_data->t_oid = oldtup.t_data->t_oid; TransactionIdStore(GetCurrentTransactionId(), &(newtup->t_data->t_xmin)); newtup->t_data->t_cmin = GetCurrentCommandId(); StoreInvalidTransactionId(&(newtup->t_data->t_xmax)); newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK); newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED); /* * If the toaster needs to be activated, OR if the new tuple will not * fit on the same page as the old, then we need to release the * context lock (but not the pin!) on the old tuple's buffer while we * are off doing TOAST and/or table-file-extension work. We must mark * the old tuple to show that it's already being updated, else other * processes may try to update it themselves. To avoid second XLOG log * record, we use xact mgr hook to unlock old tuple without reading * log if xact will abort before update is logged. In the event of * crash prio logging, TQUAL routines will see HEAP_XMAX_UNLOGGED * flag... * * NOTE: this trick is useless currently but saved for future when we'll * implement UNDO and will re-use transaction IDs after postmaster * startup. * * We need to invoke the toaster if there are already any toasted values * present, or if the new tuple is over-threshold. */ need_toast = (HeapTupleHasExtended(&oldtup) || HeapTupleHasExtended(newtup) || (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD)); newtupsize = MAXALIGN(newtup->t_len); pagefree = PageGetFreeSpace((Page) dp); if (need_toast || newtupsize > pagefree) { _locked_tuple_.node = relation->rd_node; _locked_tuple_.tid = oldtup.t_self; XactPushRollback(_heap_unlock_tuple, (void *) &_locked_tuple_); TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); oldtup.t_data->t_cmax = GetCurrentCommandId(); oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED; already_marked = true; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Let the toaster do its thing */ if (need_toast) { heap_tuple_toast_attrs(relation, newtup, &oldtup); newtupsize = MAXALIGN(newtup->t_len); } /* * Now, do we need a new page for the tuple, or not? This is a * bit tricky since someone else could have added tuples to the * page while we weren't looking. We have to recheck the * available space after reacquiring the buffer lock. But don't * bother to do that if the former amount of free space is still * not enough; it's unlikely there's more free now than before. * * What's more, if we need to get a new page, we will need to acquire * buffer locks on both old and new pages. To avoid deadlock * against some other backend trying to get the same two locks in * the other order, we must be consistent about the order we get * the locks in. We use the rule "lock the lower-numbered page of * the relation first". To implement this, we must do * RelationGetBufferForTuple while not holding the lock on the old * page, and we must rely on it to get the locks on both pages in * the correct order. */ if (newtupsize > pagefree) { /* Assume there's no chance to put newtup on same page. */ newbuf = RelationGetBufferForTuple(relation, newtup->t_len, buffer); } else { /* Re-acquire the lock on the old tuple's page. */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* Re-check using the up-to-date free space */ pagefree = PageGetFreeSpace((Page) dp); if (newtupsize > pagefree) { /* * Rats, it doesn't fit anymore. We must now unlock and * relock to avoid deadlock. Fortunately, this path * should seldom be taken. */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, newtup->t_len, buffer); } else { /* OK, it fits here, so we're done. */ newbuf = buffer; } } } else { /* No TOAST work needed, and it'll fit on same page */ already_marked = false; newbuf = buffer; } pgstat_count_heap_update(&relation->pgstat_info); /* * At this point newbuf and buffer are both pinned and locked, and * newbuf has enough space for the new tuple. If they are the same * buffer, only one pin is held. */ /* NO ELOG(ERROR) from here till changes are logged */ START_CRIT_SECTION(); RelationPutHeapTuple(relation, newbuf, newtup); /* insert new tuple */ if (already_marked) { oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED; XactPopRollback(); } else { TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); oldtup.t_data->t_cmax = GetCurrentCommandId(); oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); } /* record address of new tuple in t_ctid of old one */ oldtup.t_data->t_ctid = newtup->t_self; /* XLOG stuff */ { XLogRecPtr recptr = log_heap_update(relation, buffer, oldtup.t_self, newbuf, newtup, false); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); PageSetSUI(BufferGetPage(newbuf), ThisStartUpID); } PageSetLSN(BufferGetPage(buffer), recptr); PageSetSUI(BufferGetPage(buffer), ThisStartUpID); } END_CRIT_SECTION(); if (newbuf != buffer) LockBuffer(newbuf, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* * Mark old tuple for invalidation from system caches at next command * boundary. We have to do this before WriteBuffer because we need to * look at the contents of the tuple, so we need to hold our refcount. */ RelationInvalidateHeapTuple(relation, &oldtup); if (newbuf != buffer) WriteBuffer(newbuf); WriteBuffer(buffer); /* * If new tuple is cachable, mark it for rollback from the caches in * case we abort. Note it is OK to do this after WriteBuffer releases * the buffer, because the "newtup" data structure is all in local * memory, not in the shared buffer. */ RelationMark4RollbackHeapTuple(relation, newtup); return HeapTupleMayBeUpdated; } /* * simple_heap_update - replace a tuple * * This routine may be used to update a tuple when concurrent updates of * the target tuple are not expected (for example, because we have a lock * on the relation associated with the tuple). Any failure is reported * via elog(). */ void simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) { ItemPointerData ctid; int result; result = heap_update(relation, otid, tup, &ctid); switch (result) { case HeapTupleSelfUpdated: /* Tuple was already updated in current command? */ elog(ERROR, "simple_heap_update: tuple already updated by self"); break; case HeapTupleMayBeUpdated: /* done successfully */ break; case HeapTupleUpdated: elog(ERROR, "simple_heap_update: tuple concurrently updated"); break; default: elog(ERROR, "Unknown status %u from heap_update", result); break; } } /* * heap_mark4update - mark a tuple for update */ int heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer) { ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; int result; /* increment access statistics */ IncrHeapAccessStat(local_mark4update); IncrHeapAccessStat(global_mark4update); *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(*buffer)) elog(ERROR, "heap_mark4update: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(*buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); tuple->t_datamcxt = NULL; tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tuple->t_len = ItemIdGetLength(lp); l3: result = HeapTupleSatisfiesUpdate(tuple); if (result == HeapTupleInvisible) { LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(*buffer); elog(ERROR, "heap_mark4update: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = tuple->t_data->t_xmax; /* sleep untill concurrent transaction ends */ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l3; /* * xwait is committed but if xwait had just marked the tuple for * update then some other xaction could update this tuple before * we got to this point. */ if (!TransactionIdEquals(tuple->t_data->t_xmax, xwait)) goto l3; if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED)) { tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(*buffer); } /* if tuple was marked for update but not updated... */ if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); tuple->t_self = tuple->t_data->t_ctid; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return result; } /* * XLOG stuff: no logging is required as long as we have no * savepoints. For savepoints private log could be used... */ ((PageHeader) BufferGetPage(*buffer))->pd_sui = ThisStartUpID; /* store transaction information of xact marking the tuple */ TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax)); tuple->t_data->t_cmax = GetCurrentCommandId(); tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(*buffer); return HeapTupleMayBeUpdated; } /* ---------------- * heap_markpos - mark scan position * * Note: * Should only one mark be maintained per scan at one time. * Check if this can be done generally--say calls to get the * next/previous tuple and NEVER pass struct scandesc to the * user AM's. Now, the mark is sent to the executor for safekeeping. * Probably can store this info into a GENERAL scan structure. * * May be best to change this call to store the marked position * (up to 2?) in the scan structure itself. * Fix to use the proper caching structure. * ---------------- */ void heap_markpos(HeapScanDesc scan) { /* * increment access statistics */ IncrHeapAccessStat(local_markpos); IncrHeapAccessStat(global_markpos); /* Note: no locking manipulations needed */ if (scan->rs_ctup.t_data != NULL) scan->rs_mctid = scan->rs_ctup.t_self; else ItemPointerSetInvalid(&scan->rs_mctid); } /* ---------------- * heap_restrpos - restore position to marked location * * Note: there are bad side effects here. If we were past the end * of a relation when heapmarkpos is called, then if the relation is * extended via insert, then the next call to heaprestrpos will set * cause the added tuples to be visible when the scan continues. * Problems also arise if the TID's are rearranged!!! * * XXX might be better to do direct access instead of * using the generality of heapgettup(). * * XXX It is very possible that when a scan is restored, that a tuple * XXX which previously qualified may fail for time range purposes, unless * XXX some form of locking exists (ie., portals currently can act funny. * ---------------- */ void heap_restrpos(HeapScanDesc scan) { /* * increment access statistics */ IncrHeapAccessStat(local_restrpos); IncrHeapAccessStat(global_restrpos); /* XXX no amrestrpos checking that ammarkpos called */ /* Note: no locking manipulations needed */ /* * unpin scan buffers */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; if (!ItemPointerIsValid(&scan->rs_mctid)) { scan->rs_ctup.t_datamcxt = NULL; scan->rs_ctup.t_data = NULL; } else { scan->rs_ctup.t_self = scan->rs_mctid; scan->rs_ctup.t_datamcxt = NULL; scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */ heapgettup(scan->rs_rd, 0, &(scan->rs_ctup), &(scan->rs_cbuf), scan->rs_snapshot, 0, (ScanKey) NULL); } } XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, char *unused, int unlen) { xl_heap_clean xlrec; XLogRecPtr recptr; XLogRecData rdata[3]; xlrec.node = reln->rd_node; xlrec.block = BufferGetBlockNumber(buffer); rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapClean; rdata[0].next = &(rdata[1]); if (unlen > 0) { rdata[1].buffer = buffer; rdata[1].data = unused; rdata[1].len = unlen; rdata[1].next = &(rdata[2]); } else rdata[0].next = &(rdata[2]); rdata[2].buffer = buffer; rdata[2].data = NULL; rdata[2].len = 0; rdata[2].next = NULL; recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata); return (recptr); } static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move) { /* * Note: xlhdr is declared to have adequate size and correct alignment * for an xl_heap_header. However the two tids, if present at all, * will be packed in with no wasted space after the xl_heap_header; * they aren't necessarily aligned as implied by this struct * declaration. */ struct { xl_heap_header hdr; TransactionId tid1; TransactionId tid2; } xlhdr; int hsize = SizeOfHeapHeader; xl_heap_update xlrec; XLogRecPtr recptr; XLogRecData rdata[4]; Page page = BufferGetPage(newbuf); uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE; xlrec.target.node = reln->rd_node; xlrec.target.tid = from; xlrec.newtid = newtup->t_self; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; rdata[0].next = &(rdata[1]); rdata[1].buffer = oldbuf; rdata[1].data = NULL; rdata[1].len = 0; rdata[1].next = &(rdata[2]); xlhdr.hdr.t_oid = newtup->t_data->t_oid; xlhdr.hdr.t_natts = newtup->t_data->t_natts; xlhdr.hdr.t_hoff = newtup->t_data->t_hoff; xlhdr.hdr.mask = newtup->t_data->t_infomask; if (move) /* remember xmin & xmax */ { TransactionId xmax; if (newtup->t_data->t_infomask & HEAP_XMAX_INVALID || newtup->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) xmax = InvalidTransactionId; else xmax = newtup->t_data->t_xmax; memcpy((char *) &xlhdr + hsize, &xmax, sizeof(TransactionId)); memcpy((char *) &xlhdr + hsize + sizeof(TransactionId), &(newtup->t_data->t_xmin), sizeof(TransactionId)); hsize += 2 * sizeof(TransactionId); } rdata[2].buffer = newbuf; rdata[2].data = (char *) &xlhdr; rdata[2].len = hsize; rdata[2].next = &(rdata[3]); rdata[3].buffer = newbuf; rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); rdata[3].next = NULL; /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); return (recptr); } XLogRecPtr log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup) { return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true)); } static void heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); Relation reln; Buffer buffer; Page page; if (!redo || (record->xl_info & XLR_BKP_BLOCK_1)) return; reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node); if (!RelationIsValid(reln)) return; buffer = XLogReadBuffer(false, reln, xlrec->block); if (!BufferIsValid(buffer)) elog(PANIC, "heap_clean_redo: no block"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) elog(PANIC, "heap_clean_redo: uninitialized page"); if (XLByteLE(lsn, PageGetLSN(page))) { UnlockAndReleaseBuffer(buffer); return; } if (record->xl_len > SizeOfHeapClean) { OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)]; OffsetNumber *unused = unbuf; char *unend; ItemId lp; Assert((record->xl_len - SizeOfHeapClean) <= BLCKSZ); memcpy((char *) unbuf, (char *) xlrec + SizeOfHeapClean, record->xl_len - SizeOfHeapClean); unend = (char *) unbuf + (record->xl_len - SizeOfHeapClean); while ((char *) unused < unend) { lp = ((PageHeader) page)->pd_linp + *unused; lp->lp_flags &= ~LP_USED; unused++; } } PageRepairFragmentation(page, NULL); UnlockAndWriteBuffer(buffer); } static void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) return; if (!RelationIsValid(reln)) return; buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); return; } } else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied * ?! */ elog(PANIC, "heap_delete_undo: bad page LSN"); offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un"); htup = (HeapTupleHeader) PageGetItem(page, lp); if (redo) { htup->t_xmax = record->xl_xid; htup->t_cmax = FirstCommandId; htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); UnlockAndWriteBuffer(buffer); return; } elog(PANIC, "heap_delete_undo: unimplemented"); } static void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); Buffer buffer; Page page; OffsetNumber offnum; if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) return; if (!RelationIsValid(reln)) return; buffer = XLogReadBuffer((redo) ? true : false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page) && (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE))) elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { struct { HeapTupleHeaderData hdr; char data[MaxTupleSize]; } tbuf; HeapTupleHeader htup; xl_heap_header xlhdr; uint32 newlen; if (record->xl_info & XLOG_HEAP_INIT_PAGE) PageInit(page, BufferGetPageSize(buffer), 0); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); return; } offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_insert_redo: invalid max offset number"); newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader; Assert(newlen <= MaxTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapInsert, SizeOfHeapHeader); memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits), (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); htup = &tbuf.hdr; htup->t_oid = xlhdr.t_oid; htup->t_natts = xlhdr.t_natts; htup->t_hoff = xlhdr.t_hoff; htup->t_xmin = record->xl_xid; htup->t_cmin = FirstCommandId; htup->t_xmax = InvalidTransactionId; htup->t_cmax = FirstCommandId; htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask; offnum = PageAddItem(page, (Item) htup, newlen, offnum, LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_insert_redo: failed to add tuple"); PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); /* prev sui */ UnlockAndWriteBuffer(buffer); return; } /* undo insert */ if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied * ?! */ elog(PANIC, "heap_insert_undo: bad page LSN"); elog(PANIC, "heap_insert_undo: unimplemented"); } /* * Handles UPDATE & MOVE */ static void heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); Buffer buffer; bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == ItemPointerGetBlockNumber(&(xlrec->target.tid))); Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; if (!RelationIsValid(reln)) return; if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) goto newt; /* Deal with old tuple version */ buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un"); if (redo) { if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); if (samepage) return; goto newt; } } else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied * ?! */ elog(PANIC, "heap_update_undo: bad old tuple page LSN"); offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un"); htup = (HeapTupleHeader) PageGetItem(page, lp); if (redo) { if (move) { TransactionIdStore(record->xl_xid, (TransactionId *) &(htup->t_cmin)); htup->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); htup->t_infomask |= HEAP_MOVED_OFF; } else { htup->t_xmax = record->xl_xid; htup->t_cmax = FirstCommandId; htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); } if (samepage) goto newsame; PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); UnlockAndWriteBuffer(buffer); goto newt; } elog(PANIC, "heap_update_undo: unimplemented"); /* Deal with new tuple */ newt:; if (redo && ((record->xl_info & XLR_BKP_BLOCK_2) || ((record->xl_info & XLR_BKP_BLOCK_1) && samepage))) return; buffer = XLogReadBuffer((redo) ? true : false, reln, ItemPointerGetBlockNumber(&(xlrec->newtid))); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); newsame:; if (PageIsNew((PageHeader) page) && (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE))) elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { struct { HeapTupleHeaderData hdr; char data[MaxTupleSize]; } tbuf; xl_heap_header xlhdr; int hsize; uint32 newlen; if (record->xl_info & XLOG_HEAP_INIT_PAGE) PageInit(page, BufferGetPageSize(buffer), 0); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { UnlockAndReleaseBuffer(buffer); return; } offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); hsize = SizeOfHeapUpdate + SizeOfHeapHeader; if (move) hsize += (2 * sizeof(TransactionId)); newlen = record->xl_len - hsize; Assert(newlen <= MaxTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, SizeOfHeapHeader); memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits), (char *) xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); htup = &tbuf.hdr; htup->t_oid = xlhdr.t_oid; htup->t_natts = xlhdr.t_natts; htup->t_hoff = xlhdr.t_hoff; if (move) { hsize = SizeOfHeapUpdate + SizeOfHeapHeader; memcpy(&(htup->t_xmax), (char *) xlrec + hsize, sizeof(TransactionId)); memcpy(&(htup->t_xmin), (char *) xlrec + hsize + sizeof(TransactionId), sizeof(TransactionId)); TransactionIdStore(record->xl_xid, (TransactionId *) &(htup->t_cmin)); htup->t_infomask = xlhdr.mask; htup->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); htup->t_infomask |= HEAP_MOVED_IN; } else { htup->t_xmin = record->xl_xid; htup->t_cmin = FirstCommandId; htup->t_xmax = InvalidTransactionId; htup->t_cmax = FirstCommandId; htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask; } offnum = PageAddItem(page, (Item) htup, newlen, offnum, LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); /* prev sui */ UnlockAndWriteBuffer(buffer); return; } /* undo */ if (XLByteLT(PageGetLSN(page), lsn)) /* changes not applied?! */ elog(PANIC, "heap_update_undo: bad new tuple page LSN"); elog(PANIC, "heap_update_undo: unimplemented"); } static void _heap_unlock_tuple(void *data) { xl_heaptid *xltid = (xl_heaptid *) data; Relation reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node); Buffer buffer; Page page; OffsetNumber offnum; ItemId lp; HeapTupleHeader htup; if (!RelationIsValid(reln)) elog(PANIC, "_heap_unlock_tuple: can't open relation"); buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xltid->tid))); if (!BufferIsValid(buffer)) elog(PANIC, "_heap_unlock_tuple: can't read buffer"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) elog(PANIC, "_heap_unlock_tuple: uninitialized page"); offnum = ItemPointerGetOffsetNumber(&(xltid->tid)); if (offnum > PageGetMaxOffsetNumber(page)) elog(PANIC, "_heap_unlock_tuple: invalid itemid"); lp = PageGetItemId(page, offnum); if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) elog(PANIC, "_heap_unlock_tuple: unused/deleted tuple in rollback"); htup = (HeapTupleHeader) PageGetItem(page, lp); if (!TransactionIdEquals(htup->t_xmax, GetCurrentTransactionId()) || htup->t_cmax != GetCurrentCommandId()) elog(PANIC, "_heap_unlock_tuple: invalid xmax/cmax in rollback"); htup->t_infomask &= ~HEAP_XMAX_UNLOGGED; htup->t_infomask |= HEAP_XMAX_INVALID; UnlockAndWriteBuffer(buffer); return; } void heap_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) heap_xlog_insert(true, lsn, record); else if (info == XLOG_HEAP_DELETE) heap_xlog_delete(true, lsn, record); else if (info == XLOG_HEAP_UPDATE) heap_xlog_update(true, lsn, record, false); else if (info == XLOG_HEAP_MOVE) heap_xlog_update(true, lsn, record, true); else if (info == XLOG_HEAP_CLEAN) heap_xlog_clean(true, lsn, record); else elog(PANIC, "heap_redo: unknown op code %u", info); } void heap_undo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) heap_xlog_insert(false, lsn, record); else if (info == XLOG_HEAP_DELETE) heap_xlog_delete(false, lsn, record); else if (info == XLOG_HEAP_UPDATE) heap_xlog_update(false, lsn, record, false); else if (info == XLOG_HEAP_MOVE) heap_xlog_update(false, lsn, record, true); else if (info == XLOG_HEAP_CLEAN) heap_xlog_clean(false, lsn, record); else elog(PANIC, "heap_undo: unknown op code %u", info); } static void out_target(char *buf, xl_heaptid *target) { sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", target->node.tblNode, target->node.relNode, ItemPointerGetBlockNumber(&(target->tid)), ItemPointerGetOffsetNumber(&(target->tid))); } void heap_desc(char *buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) { xl_heap_insert *xlrec = (xl_heap_insert *) rec; strcat(buf, "insert: "); out_target(buf, &(xlrec->target)); } else if (info == XLOG_HEAP_DELETE) { xl_heap_delete *xlrec = (xl_heap_delete *) rec; strcat(buf, "delete: "); out_target(buf, &(xlrec->target)); } else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE) { xl_heap_update *xlrec = (xl_heap_update *) rec; if (info == XLOG_HEAP_UPDATE) strcat(buf, "update: "); else strcat(buf, "move: "); out_target(buf, &(xlrec->target)); sprintf(buf + strlen(buf), "; new %u/%u", ItemPointerGetBlockNumber(&(xlrec->newtid)), ItemPointerGetOffsetNumber(&(xlrec->newtid))); } else if (info == XLOG_HEAP_CLEAN) { xl_heap_clean *xlrec = (xl_heap_clean *) rec; sprintf(buf + strlen(buf), "clean: node %u/%u; blk %u", xlrec->node.tblNode, xlrec->node.relNode, xlrec->block); } else strcat(buf, "UNKNOWN"); }