diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 7e9b897020..2325a011a1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.115 2001/05/16 22:35:12 tgl Exp $ * * * INTERFACE ROUTINES @@ -1317,7 +1317,7 @@ heap_insert(Relation relation, HeapTuple tup) #endif /* Find buffer for this tuple */ - buffer = RelationGetBufferForTuple(relation, tup->t_len); + buffer = RelationGetBufferForTuple(relation, tup->t_len, 0); /* NO ELOG(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -1578,6 +1578,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, newbuf; bool need_toast, already_marked; + Size newtupsize, + pagefree; int result; /* increment access statistics */ @@ -1685,8 +1687,10 @@ l2: HeapTupleHasExtended(newtup) || (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD)); - if (need_toast || - (unsigned) MAXALIGN(newtup->t_len) > PageGetFreeSpace((Page) dp)) + newtupsize = MAXALIGN(newtup->t_len); + pagefree = PageGetFreeSpace((Page) dp); + + if (need_toast || newtupsize > pagefree) { _locked_tuple_.node = relation->rd_node; _locked_tuple_.tid = oldtup.t_self; @@ -1704,17 +1708,60 @@ l2: /* Let the toaster do its thing */ if (need_toast) + { heap_tuple_toast_attrs(relation, newtup, &oldtup); + newtupsize = MAXALIGN(newtup->t_len); + } - /* Now, do we need a new page for the tuple, or not? */ - if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp)) - newbuf = buffer; + /* + * Now, do we need a new page for the tuple, or not? This is a bit + * tricky since someone else could have added tuples to the page + * while we weren't looking. We have to recheck the available space + * after reacquiring the buffer lock. But don't bother to do that + * if the former amount of free space is still not enough; it's + * unlikely there's more free now than before. + * + * What's more, if we need to get a new page, we will need to acquire + * buffer locks on both old and new pages. To avoid deadlock against + * some other backend trying to get the same two locks in the other + * order, we must be consistent about the order we get the locks in. + * We use the rule "lock the higher-numbered page of the relation + * first". To implement this, we must do RelationGetBufferForTuple + * while not holding the lock on the old page, and we must tell it + * to give us a page beyond the old page. + */ + if (newtupsize > pagefree) + { + /* Assume there's no chance to put newtup on same page. */ + newbuf = RelationGetBufferForTuple(relation, newtup->t_len, + BufferGetBlockNumber(buffer) + 1); + /* Now reacquire lock on old tuple's page. */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } else - newbuf = RelationGetBufferForTuple(relation, newtup->t_len); - - /* Re-acquire the lock on the old tuple's page. */ - /* this seems to be deadlock free... */ - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + { + /* Re-acquire the lock on the old tuple's page. */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + /* Re-check using the up-to-date free space */ + pagefree = PageGetFreeSpace((Page) dp); + if (newtupsize > pagefree) + { + /* + * Rats, it doesn't fit anymore. We must now unlock and + * relock to avoid deadlock. Fortunately, this path should + * seldom be taken. + */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + newbuf = RelationGetBufferForTuple(relation, newtup->t_len, + BufferGetBlockNumber(buffer) + 1); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } + else + { + /* OK, it fits here, so we're done. */ + newbuf = buffer; + } + } } else { @@ -1723,6 +1770,11 @@ l2: newbuf = buffer; } + /* + * At this point newbuf and buffer are both pinned and locked, + * and newbuf has enough space for the new tuple. + */ + /* NO ELOG(ERROR) from here till changes are logged */ START_CRIT_SECTION(); diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 7278ecd2de..1451dc2ecc 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $ + * $Id: hio.c,v 1.39 2001/05/16 22:35:12 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -66,18 +66,25 @@ RelationPutHeapTuple(Relation relation, /* * RelationGetBufferForTuple * - * Returns exclusive-locked buffer with free space >= given len. + * Returns exclusive-locked buffer with free space >= given len, + * being careful to select only a page at or beyond minblocknum + * in the relation. * - * Note that we use LockPage to lock relation for extension. We can - * do this as long as in all other places we use page-level locking - * for indices only. Alternatively, we could define pseudo-table as - * we do for transactions with XactLockTable. + * The minblocknum parameter is needed to prevent deadlock between + * concurrent heap_update operations; see heap_update for details. + * Pass zero if you don't particularly care which page you get. * - * ELOG(ERROR) is allowed here, so this routine *must* be called - * before any (unlogged) changes are made in buffer pool. + * Note that we use LockPage to lock relation for extension. We can + * do this as long as in all other places we use page-level locking + * for indices only. Alternatively, we could define pseudo-table as + * we do for transactions with XactLockTable. + * + * ELOG(ERROR) is allowed here, so this routine *must* be called + * before any (unlogged) changes are made in buffer pool. */ Buffer -RelationGetBufferForTuple(Relation relation, Size len) +RelationGetBufferForTuple(Relation relation, Size len, + BlockNumber minblocknum) { Buffer buffer = InvalidBuffer; Page pageHeader; @@ -103,16 +110,19 @@ RelationGetBufferForTuple(Relation relation, Size len) if (relation->rd_nblocks > 0) { lastblock = relation->rd_nblocks - 1; - buffer = ReadBuffer(relation, lastblock); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - pageHeader = (Page) BufferGetPage(buffer); - if (len <= PageGetFreeSpace(pageHeader)) - return buffer; - /* - * Doesn't fit, so we'll have to try someplace else. - */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - /* buffer release will happen below... */ + if (lastblock >= minblocknum) + { + buffer = ReadBuffer(relation, lastblock); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + pageHeader = (Page) BufferGetPage(buffer); + if (len <= PageGetFreeSpace(pageHeader)) + return buffer; + /* + * Doesn't fit, so we'll have to try someplace else. + */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + /* buffer release will happen below... */ + } } /* @@ -137,32 +147,38 @@ RelationGetBufferForTuple(Relation relation, Size len) */ relation->rd_nblocks = RelationGetNumberOfBlocks(relation); - if (relation->rd_nblocks > oldnblocks) + if ((BlockNumber) relation->rd_nblocks > oldnblocks) { /* * Someone else has indeed extended the relation recently. * Try to fit our tuple into the new last page. */ lastblock = relation->rd_nblocks - 1; - buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - pageHeader = (Page) BufferGetPage(buffer); - if (len <= PageGetFreeSpace(pageHeader)) + if (lastblock >= minblocknum) { - /* OK, we don't need to extend again. */ - if (!relation->rd_myxactonly) - UnlockPage(relation, 0, ExclusiveLock); - return buffer; + buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + pageHeader = (Page) BufferGetPage(buffer); + if (len <= PageGetFreeSpace(pageHeader)) + { + /* OK, we don't need to extend again. */ + if (!relation->rd_myxactonly) + UnlockPage(relation, 0, ExclusiveLock); + return buffer; + } + /* + * Doesn't fit, so we'll have to extend the relation (again). + */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + /* buffer release will happen below... */ } - /* - * Doesn't fit, so we'll have to extend the relation (again). - */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - /* buffer release will happen below... */ } /* * Extend the relation by one page and update rd_nblocks for next time. + * + * Note: at this point minblocknum is ignored; we won't extend by more + * than one block... */ lastblock = relation->rd_nblocks; buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true); diff --git a/src/include/access/hio.h b/src/include/access/hio.h index 4147645134..8c50a128a2 100644 --- a/src/include/access/hio.h +++ b/src/include/access/hio.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: hio.h,v 1.17 2001/01/24 19:43:19 momjian Exp $ + * $Id: hio.h,v 1.18 2001/05/16 22:35:12 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -18,6 +18,7 @@ extern void RelationPutHeapTuple(Relation relation, Buffer buffer, HeapTuple tuple); -extern Buffer RelationGetBufferForTuple(Relation relation, Size len); +extern Buffer RelationGetBufferForTuple(Relation relation, Size len, + BlockNumber minblocknum); #endif /* HIO_H */