Refactor page compactifying code.

The logic to compact away removed tuples from page was duplicated with
small differences in PageRepairFragmentation, PageIndexMultiDelete, and
PageIndexDeleteNoCompact. Put it into a common function.

Reviewed by Peter Geoghegan.
This commit is contained in:
Heikki Linnakangas 2015-02-03 14:09:29 +02:00
parent 507627f5b5
commit 809d9a260b
1 changed files with 53 additions and 77 deletions

View File

@ -404,10 +404,9 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
*/ */
typedef struct itemIdSortData typedef struct itemIdSortData
{ {
int offsetindex; /* linp array index */ uint16 offsetindex; /* linp array index */
int itemoff; /* page offset of item data */ int16 itemoff; /* page offset of item data */
Size alignedlen; /* MAXALIGN(item data len) */ uint16 alignedlen; /* MAXALIGN(item data len) */
ItemIdData olditemid; /* used only in PageIndexMultiDelete */
} itemIdSortData; } itemIdSortData;
typedef itemIdSortData *itemIdSort; typedef itemIdSortData *itemIdSort;
@ -419,6 +418,38 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
((itemIdSort) itemidp1)->itemoff; ((itemIdSort) itemidp1)->itemoff;
} }
/*
* After removing or marking some line pointers unused, move the tuples to
* remove the gaps caused by the removed items.
*/
static void
compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
{
PageHeader phdr = (PageHeader) page;
Offset upper;
int i;
/* sort itemIdSortData array into decreasing itemoff order */
qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
itemoffcompare);
upper = phdr->pd_special;
for (i = 0; i < nitems; i++)
{
itemIdSort itemidptr = &itemidbase[i];
ItemId lp;
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
upper -= itemidptr->alignedlen;
memmove((char *) page + upper,
(char *) page + itemidptr->itemoff,
itemidptr->alignedlen);
lp->lp_off = upper;
}
phdr->pd_upper = upper;
}
/* /*
* PageRepairFragmentation * PageRepairFragmentation
* *
@ -441,7 +472,6 @@ PageRepairFragmentation(Page page)
nunused; nunused;
int i; int i;
Size totallen; Size totallen;
Offset upper;
/* /*
* It's worth the trouble to be more paranoid here than in most places, * It's worth the trouble to be more paranoid here than in most places,
@ -515,24 +545,7 @@ PageRepairFragmentation(Page page)
errmsg("corrupted item lengths: total %u, available space %u", errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower))); (unsigned int) totallen, pd_special - pd_lower)));
/* sort itemIdSortData array into decreasing itemoff order */ compactify_tuples(itemidbase, nstorage, page);
qsort((char *) itemidbase, nstorage, sizeof(itemIdSortData),
itemoffcompare);
/* compactify page */
upper = pd_special;
for (i = 0, itemidptr = itemidbase; i < nstorage; i++, itemidptr++)
{
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
upper -= itemidptr->alignedlen;
memmove((char *) page + upper,
(char *) page + itemidptr->itemoff,
itemidptr->alignedlen);
lp->lp_off = upper;
}
((PageHeader) page)->pd_upper = upper;
} }
/* Set hint bit for PageAddItem */ /* Set hint bit for PageAddItem */
@ -782,13 +795,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Offset pd_upper = phdr->pd_upper; Offset pd_upper = phdr->pd_upper;
Offset pd_special = phdr->pd_special; Offset pd_special = phdr->pd_special;
itemIdSortData itemidbase[MaxIndexTuplesPerPage]; itemIdSortData itemidbase[MaxIndexTuplesPerPage];
ItemIdData newitemids[MaxIndexTuplesPerPage];
itemIdSort itemidptr; itemIdSort itemidptr;
ItemId lp; ItemId lp;
int nline, int nline,
nused; nused;
int i;
Size totallen; Size totallen;
Offset upper;
Size size; Size size;
unsigned offset; unsigned offset;
int nextitm; int nextitm;
@ -857,9 +869,9 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
{ {
itemidptr->offsetindex = nused; /* where it will go */ itemidptr->offsetindex = nused; /* where it will go */
itemidptr->itemoff = offset; itemidptr->itemoff = offset;
itemidptr->olditemid = *lp;
itemidptr->alignedlen = MAXALIGN(size); itemidptr->alignedlen = MAXALIGN(size);
totallen += itemidptr->alignedlen; totallen += itemidptr->alignedlen;
newitemids[nused] = *lp;
itemidptr++; itemidptr++;
nused++; nused++;
} }
@ -875,26 +887,15 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
errmsg("corrupted item lengths: total %u, available space %u", errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower))); (unsigned int) totallen, pd_special - pd_lower)));
/* sort itemIdSortData array into decreasing itemoff order */ /*
qsort((char *) itemidbase, nused, sizeof(itemIdSortData), * Looks good. Overwrite the line pointers with the copy, from which we've
itemoffcompare); * removed all the unused items.
*/
/* compactify page and install new itemids */ memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
upper = pd_special;
for (i = 0, itemidptr = itemidbase; i < nused; i++, itemidptr++)
{
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
upper -= itemidptr->alignedlen;
memmove((char *) page + upper,
(char *) page + itemidptr->itemoff,
itemidptr->alignedlen);
*lp = itemidptr->olditemid;
lp->lp_off = upper;
}
phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData); phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
phdr->pd_upper = upper;
/* and compactify the tuple data */
compactify_tuples(itemidbase, nused, page);
} }
/* /*
@ -1000,7 +1001,6 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
itemIdSort itemidptr; itemIdSort itemidptr;
int i; int i;
Size totallen; Size totallen;
Offset upper;
/* /*
* Scan the page taking note of each item that we need to preserve. * Scan the page taking note of each item that we need to preserve.
@ -1012,7 +1012,8 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
*/ */
itemidptr = itemidbase; itemidptr = itemidbase;
totallen = 0; totallen = 0;
for (i = 0; i < nline; i++, itemidptr++) PageClearHasFreeLinePointers(page);
for (i = 0; i < nline; i++)
{ {
ItemId lp; ItemId lp;
@ -1024,13 +1025,15 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
itemidptr->itemoff = ItemIdGetOffset(lp); itemidptr->itemoff = ItemIdGetOffset(lp);
itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp)); itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
totallen += itemidptr->alignedlen; totallen += itemidptr->alignedlen;
itemidptr++;
} }
else else
{ {
itemidptr->itemoff = 0; PageSetHasFreeLinePointers(page);
itemidptr->alignedlen = 0; ItemIdSetUnused(lp);
} }
} }
nline = itemidptr - itemidbase;
/* By here, there are exactly nline elements in itemidbase array */ /* By here, there are exactly nline elements in itemidbase array */
if (totallen > (Size) (pd_special - pd_lower)) if (totallen > (Size) (pd_special - pd_lower))
@ -1039,38 +1042,11 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
errmsg("corrupted item lengths: total %u, available space %u", errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower))); (unsigned int) totallen, pd_special - pd_lower)));
/* sort itemIdSortData array into decreasing itemoff order */
qsort((char *) itemidbase, nline, sizeof(itemIdSortData),
itemoffcompare);
/* /*
* Defragment the data areas of each tuple, being careful to preserve * Defragment the data areas of each tuple, being careful to preserve
* each item's position in the linp array. * each item's position in the linp array.
*/ */
upper = pd_special; compactify_tuples(itemidbase, nline, page);
PageClearHasFreeLinePointers(page);
for (i = 0, itemidptr = itemidbase; i < nline; i++, itemidptr++)
{
ItemId lp;
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
if (itemidptr->alignedlen == 0)
{
PageSetHasFreeLinePointers(page);
ItemIdSetUnused(lp);
continue;
}
upper -= itemidptr->alignedlen;
memmove((char *) page + upper,
(char *) page + itemidptr->itemoff,
itemidptr->alignedlen);
lp->lp_off = upper;
/* lp_flags and lp_len remain the same as originally */
}
/* Set the new page limits */
phdr->pd_upper = upper;
phdr->pd_lower = SizeOfPageHeaderData + i * sizeof(ItemIdData);
} }
} }