/*------------------------------------------------------------------------- * * ginfast.c * Fast insert routines for the Postgres inverted index access method. * Pending entries are stored in linear list of pages. Later on * (typically during VACUUM), ginInsertCleanup() will be invoked to * transfer pending entries into the regular index structure. This * wins because bulk insertion is much more efficient than retail. * * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.2 2009/03/24 22:06:03 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/gin.h" #include "access/tuptoaster.h" #include "catalog/index.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/memutils.h" #define GIN_PAGE_FREESIZE \ ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) ) typedef struct DatumArray { Datum *values; /* expansible array */ int32 nvalues; /* current number of valid entries */ int32 maxvalues; /* allocated size of array */ } DatumArray; /* * Build a pending-list page from the given array of tuples, and write it out. */ static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, int32 ntuples, BlockNumber rightlink) { Page page = BufferGetPage(buffer); int i, freesize, size=0; OffsetNumber l, off; char *workspace; char *ptr; /* workspace could be a local array; we use palloc for alignment */ workspace = palloc(BLCKSZ); START_CRIT_SECTION(); GinInitBuffer(buffer, GIN_LIST); off = FirstOffsetNumber; ptr = workspace; for(i=0; irightlink = rightlink; /* * tail page may contain only the whole row(s) or final * part of row placed on previous pages */ if ( rightlink == InvalidBlockNumber ) { GinPageSetFullRow(page); GinPageGetOpaque(page)->maxoff = 1; } else { GinPageGetOpaque(page)->maxoff = 0; } freesize = PageGetFreeSpace(page); MarkBufferDirty(buffer); if (!index->rd_istemp) { XLogRecData rdata[2]; ginxlogInsertListPage data; XLogRecPtr recptr; rdata[0].buffer = buffer; rdata[0].buffer_std = true; rdata[0].data = (char*)&data; rdata[0].len = sizeof(ginxlogInsertListPage); rdata[0].next = rdata+1; rdata[1].buffer = InvalidBuffer; rdata[1].data = workspace; rdata[1].len = size; rdata[1].next = NULL; data.blkno = BufferGetBlockNumber(buffer); data.rightlink = rightlink; data.ntuples = ntuples; recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } UnlockReleaseBuffer(buffer); END_CRIT_SECTION(); pfree(workspace); return freesize; } static void makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, GinMetaPageData *res) { Buffer curBuffer = InvalidBuffer; Buffer prevBuffer = InvalidBuffer; int i, size = 0, tupsize; int startTuple = 0; Assert(ntuples > 0); /* * Split tuples into pages */ for(i=0;inPendingPages++; writeListPage(index, prevBuffer, tuples+startTuple, i-startTuple, BufferGetBlockNumber(curBuffer)); } else { res->head = BufferGetBlockNumber(curBuffer); } prevBuffer = curBuffer; startTuple = i; size = 0; } tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData); if ( size + tupsize >= GinListPageSize ) { /* won't fit, force a new page and reprocess */ i--; curBuffer = InvalidBuffer; } else { size += tupsize; } } /* * Write last page */ res->tail = BufferGetBlockNumber(curBuffer); res->tailFreeSize = writeListPage(index, curBuffer, tuples+startTuple, ntuples-startTuple, InvalidBlockNumber); res->nPendingPages++; /* that was only one heap tuple */ res->nPendingHeapTuples = 1; } /* * Inserts collected values during normal insertion. Function guarantees * that all values of heap will be stored sequentially, preserving order */ void ginHeapTupleFastInsert(Relation index, GinState *ginstate, GinTupleCollector *collector) { Buffer metabuffer; Page metapage; GinMetaPageData *metadata = NULL; XLogRecData rdata[2]; Buffer buffer = InvalidBuffer; Page page = NULL; ginxlogUpdateMeta data; bool separateList = false; bool needCleanup = false; if ( collector->ntuples == 0 ) return; data.node = index->rd_node; data.ntuples = 0; data.newRightlink = data.prevTail = InvalidBlockNumber; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &data; rdata[0].len = sizeof(ginxlogUpdateMeta); rdata[0].next = NULL; metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); metapage = BufferGetPage(metabuffer); if ( collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE ) { /* * Total size is greater than one page => make sublist */ separateList = true; } else { LockBuffer(metabuffer, GIN_EXCLUSIVE); metadata = GinPageGetMeta(metapage); if ( metadata->head == InvalidBlockNumber || collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize ) { /* * Pending list is empty or total size is greater than freespace * on tail page => make sublist * * We unlock metabuffer to keep high concurrency */ separateList = true; LockBuffer(metabuffer, GIN_UNLOCK); } } if ( separateList ) { GinMetaPageData sublist; /* * We should make sublist separately and append it to the tail */ memset( &sublist, 0, sizeof(GinMetaPageData) ); makeSublist(index, collector->tuples, collector->ntuples, &sublist); /* * metapage was unlocked, see above */ LockBuffer(metabuffer, GIN_EXCLUSIVE); metadata = GinPageGetMeta(metapage); if ( metadata->head == InvalidBlockNumber ) { /* * Sublist becomes main list */ START_CRIT_SECTION(); memcpy(metadata, &sublist, sizeof(GinMetaPageData) ); memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData) ); } else { /* * merge lists */ data.prevTail = metadata->tail; buffer = ReadBuffer(index, metadata->tail); LockBuffer(buffer, GIN_EXCLUSIVE); page = BufferGetPage(buffer); Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber); START_CRIT_SECTION(); GinPageGetOpaque(page)->rightlink = sublist.head; metadata->tail = sublist.tail; metadata->tailFreeSize = sublist.tailFreeSize; metadata->nPendingPages += sublist.nPendingPages; metadata->nPendingHeapTuples += sublist.nPendingHeapTuples; memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) ); data.newRightlink = sublist.head; MarkBufferDirty(buffer); } } else { /* * Insert into tail page, metapage is already locked */ OffsetNumber l, off; int i, tupsize; char *ptr; buffer = ReadBuffer(index, metadata->tail); LockBuffer(buffer, GIN_EXCLUSIVE); page = BufferGetPage(buffer); off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); rdata[0].next = rdata + 1; rdata[1].buffer = buffer; rdata[1].buffer_std = true; ptr = rdata[1].data = (char *) palloc( collector->sumsize ); rdata[1].len = collector->sumsize; rdata[1].next = NULL; data.ntuples = collector->ntuples; START_CRIT_SECTION(); /* * Increase counter of heap tuples */ Assert( GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples ); GinPageGetOpaque(page)->maxoff++; metadata->nPendingHeapTuples++; for(i=0; intuples; i++) { tupsize = IndexTupleSize(collector->tuples[i]); l = PageAddItem(page, (Item)collector->tuples[i], tupsize, off, false, false); if (l == InvalidOffsetNumber) elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(index)); memcpy(ptr, collector->tuples[i], tupsize); ptr+=tupsize; off++; } metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData); memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) ); MarkBufferDirty(buffer); } /* * Make real write */ MarkBufferDirty(metabuffer); if ( !index->rd_istemp ) { XLogRecPtr recptr; recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata); PageSetLSN(metapage, recptr); PageSetTLI(metapage, ThisTimeLineID); if ( buffer != InvalidBuffer ) { PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } } if (buffer != InvalidBuffer) UnlockReleaseBuffer(buffer); /* * Force pending list cleanup when it becomes too long. * And, ginInsertCleanup could take significant amount of * time, so we prefer to call it when it can do all the work in a * single collection cycle. In non-vacuum mode, it shouldn't * require maintenance_work_mem, so fire it while pending list is * still small enough to fit into work_mem. * * ginInsertCleanup() should not be called inside our CRIT_SECTION. */ if ( metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L ) needCleanup = true; UnlockReleaseBuffer(metabuffer); END_CRIT_SECTION(); if ( needCleanup ) ginInsertCleanup(index, ginstate, false, NULL); } /* * Collect values from one tuples to be indexed. All values for * one tuples should be written at once - to guarantee consistent state */ uint32 ginHeapTupleFastCollect(Relation index, GinState *ginstate, GinTupleCollector *collector, OffsetNumber attnum, Datum value, ItemPointer item) { Datum *entries; int32 i, nentries; entries = extractEntriesSU(ginstate, attnum, value, &nentries); if (nentries == 0) /* nothing to insert */ return 0; /* * Allocate/reallocate memory for storing collected tuples */ if ( collector->tuples == NULL ) { collector->lentuples = nentries * index->rd_att->natts; collector->tuples = (IndexTuple*)palloc(sizeof(IndexTuple) * collector->lentuples); } while ( collector->ntuples + nentries > collector->lentuples ) { collector->lentuples *= 2; collector->tuples = (IndexTuple*)repalloc( collector->tuples, sizeof(IndexTuple) * collector->lentuples); } /* * Creates tuple's array */ for (i = 0; i < nentries; i++) { int32 tupsize; collector->tuples[collector->ntuples + i] = GinFormTuple(ginstate, attnum, entries[i], NULL, 0); collector->tuples[collector->ntuples + i]->t_tid = *item; tupsize = IndexTupleSize(collector->tuples[collector->ntuples + i]); if ( tupsize > TOAST_INDEX_TARGET || tupsize >= GinMaxItemSize) elog(ERROR, "huge tuple"); collector->sumsize += tupsize; } collector->ntuples += nentries; return nentries; } /* * Deletes pending list pages up to (not including) newHead page. * If newHead == InvalidBlockNumber then function drops the whole list. * * metapage is pinned and exclusive-locked throughout this function. * * Returns true if another cleanup process is running concurrently * (if so, we can just abandon our own efforts) */ static bool shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, IndexBulkDeleteResult *stats) { Page metapage; GinMetaPageData *metadata; BlockNumber blknoToDelete; metapage = BufferGetPage(metabuffer); metadata = GinPageGetMeta(metapage); blknoToDelete = metadata->head; do { Page page; int i; int64 nDeletedHeapTuples = 0; ginxlogDeleteListPages data; XLogRecData rdata[1]; Buffer buffers[GIN_NDELETE_AT_ONCE]; data.node = index->rd_node; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &data; rdata[0].len = sizeof(ginxlogDeleteListPages); rdata[0].next = NULL; data.ndeleted = 0; while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead) { data.toDelete[ data.ndeleted ] = blknoToDelete; buffers[ data.ndeleted ] = ReadBuffer(index, blknoToDelete); LockBuffer( buffers[ data.ndeleted ], GIN_EXCLUSIVE ); page = BufferGetPage( buffers[ data.ndeleted ] ); data.ndeleted++; if ( GinPageIsDeleted(page) ) { /* concurrent cleanup process is detected */ for(i=0;imaxoff; blknoToDelete = GinPageGetOpaque( page )->rightlink; } if (stats) stats->pages_deleted += data.ndeleted; START_CRIT_SECTION(); metadata->head = blknoToDelete; Assert( metadata->nPendingPages >= data.ndeleted ); metadata->nPendingPages -= data.ndeleted; Assert( metadata->nPendingHeapTuples >= nDeletedHeapTuples ); metadata->nPendingHeapTuples -= nDeletedHeapTuples; if ( blknoToDelete == InvalidBlockNumber ) { metadata->tail = InvalidBlockNumber; metadata->tailFreeSize = 0; metadata->nPendingPages = 0; metadata->nPendingHeapTuples = 0; } memcpy( &data.metadata, metadata, sizeof(GinMetaPageData)); MarkBufferDirty( metabuffer ); for(i=0; iflags = GIN_DELETED; MarkBufferDirty( buffers[ i ] ); } if ( !index->rd_istemp ) { XLogRecPtr recptr; recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata); PageSetLSN(metapage, recptr); PageSetTLI(metapage, ThisTimeLineID); for(i=0; invalues >= datums->maxvalues) { datums->maxvalues *= 2; datums->values = (Datum*)repalloc(datums->values, sizeof(Datum)*datums->maxvalues); } datums->values[ datums->nvalues++ ] = datum; } /* * Go through all tuples >= startoff on page and collect values in memory * * Note that da is just workspace --- it does not carry any state across * calls. */ static void processPendingPage(BuildAccumulator *accum, DatumArray *da, Page page, OffsetNumber startoff) { ItemPointerData heapptr; OffsetNumber i,maxoff; OffsetNumber attrnum, curattnum; /* reset *da to empty */ da->nvalues = 0; maxoff = PageGetMaxOffsetNumber(page); Assert( maxoff >= FirstOffsetNumber ); ItemPointerSetInvalid(&heapptr); attrnum = 0; for (i = startoff; i <= maxoff; i = OffsetNumberNext(i)) { IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); curattnum = gintuple_get_attrnum(accum->ginstate, itup); if ( !ItemPointerIsValid(&heapptr) ) { heapptr = itup->t_tid; attrnum = curattnum; } else if ( !(ItemPointerEquals(&heapptr, &itup->t_tid) && curattnum == attrnum) ) { /* * We can insert several datums per call, but only for one heap * tuple and one column. */ ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues); da->nvalues = 0; heapptr = itup->t_tid; attrnum = curattnum; } addDatum(da, gin_index_getattr(accum->ginstate, itup)); } ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues); } /* * Move tuples from pending pages into regular GIN structure. * * This can be called concurrently by multiple backends, so it must cope. * On first glance it looks completely not concurrent-safe and not crash-safe * either. The reason it's okay is that multiple insertion of the same entry * is detected and treated as a no-op by gininsert.c. If we crash after * posting entries to the main index and before removing them from the * pending list, it's okay because when we redo the posting later on, nothing * bad will happen. Likewise, if two backends simultaneously try to post * a pending entry into the main index, one will succeed and one will do * nothing. We try to notice when someone else is a little bit ahead of * us in the process, but that's just to avoid wasting cycles. Only the * action of removing a page from the pending list really needs exclusive * lock. * * vac_delay indicates that ginInsertCleanup is called from vacuum process, * so call vacuum_delay_point() periodically. * If stats isn't null, we count deleted pending pages into the counts. */ void ginInsertCleanup(Relation index, GinState *ginstate, bool vac_delay, IndexBulkDeleteResult *stats) { Buffer metabuffer, buffer; Page metapage, page; GinMetaPageData *metadata; MemoryContext opCtx, oldCtx; BuildAccumulator accum; DatumArray datums; BlockNumber blkno; metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); LockBuffer(metabuffer, GIN_SHARE); metapage = BufferGetPage(metabuffer); metadata = GinPageGetMeta(metapage); if ( metadata->head == InvalidBlockNumber ) { /* Nothing to do */ UnlockReleaseBuffer(metabuffer); return; } /* * Read and lock head of pending list */ blkno = metadata->head; buffer = ReadBuffer(index, blkno); LockBuffer(buffer, GIN_SHARE); page = BufferGetPage(buffer); LockBuffer(metabuffer, GIN_UNLOCK); /* * Initialize. All temporary space will be in opCtx */ opCtx = AllocSetContextCreate(CurrentMemoryContext, "GIN insert cleanup temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); oldCtx = MemoryContextSwitchTo(opCtx); datums.maxvalues=128; datums.nvalues = 0; datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues); ginInitBA(&accum); accum.ginstate = ginstate; /* * At the top of this loop, we have pin and lock on the current page * of the pending list. However, we'll release that before exiting * the loop. Note we also have pin but not lock on the metapage. */ for(;;) { if ( GinPageIsDeleted(page) ) { /* another cleanup process is running concurrently */ UnlockReleaseBuffer( buffer ); break; } /* * read page's datums into memory */ processPendingPage(&accum, &datums, page, FirstOffsetNumber); if (vac_delay) vacuum_delay_point(); /* * Is it time to flush memory to disk? Flush if we are at the end * of the pending list, or if we have a full row and memory is * getting full. * * XXX using up maintenance_work_mem here is probably unreasonably * much, since vacuum might already be using that much. */ if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber || (GinPageHasFullRow(page) && (accum.allocatedMemory >= maintenance_work_mem * 1024L || accum.maxdepth > GIN_MAX_TREE_DEPTH))) { ItemPointerData *list; uint32 nlist; Datum entry; OffsetNumber maxoff, attnum; /* * Unlock current page to increase performance. * Changes of page will be checked later by comparing * maxoff after completion of memory flush. */ maxoff = PageGetMaxOffsetNumber(page); LockBuffer(buffer, GIN_UNLOCK); /* * Moving collected data into regular structure can take * significant amount of time - so, run it without locking pending * list. */ while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL) { ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE); if (vac_delay) vacuum_delay_point(); } /* * Lock the whole list to remove pages */ LockBuffer(metabuffer, GIN_EXCLUSIVE); LockBuffer(buffer, GIN_SHARE); if ( GinPageIsDeleted(page) ) { /* another cleanup process is running concurrently */ UnlockReleaseBuffer(buffer); LockBuffer(metabuffer, GIN_UNLOCK); break; } /* * While we left the page unlocked, more stuff might have gotten * added to it. If so, process those entries immediately. There * shouldn't be very many, so we don't worry about the fact that * we're doing this with exclusive lock. Insertion algorithm * gurantees that inserted row(s) will not continue on next page. * NOTE: intentionally no vacuum_delay_point in this loop. */ if ( PageGetMaxOffsetNumber(page) != maxoff ) { ginInitBA(&accum); processPendingPage(&accum, &datums, page, maxoff+1); while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL) ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE); } /* * Remember next page - it will become the new list head */ blkno = GinPageGetOpaque(page)->rightlink; UnlockReleaseBuffer(buffer); /* shiftList will do exclusive locking */ /* * remove readed pages from pending list, at this point all * content of readed pages is in regular structure */ if ( shiftList(index, metabuffer, blkno, stats) ) { /* another cleanup process is running concurrently */ LockBuffer(metabuffer, GIN_UNLOCK); break; } Assert( blkno == metadata->head ); LockBuffer(metabuffer, GIN_UNLOCK); /* * if we removed the whole pending list just exit */ if ( blkno == InvalidBlockNumber ) break; /* * release memory used so far and reinit state */ MemoryContextReset(opCtx); ginInitBA(&accum); datums.nvalues = 0; datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues); } else { blkno = GinPageGetOpaque(page)->rightlink; UnlockReleaseBuffer(buffer); } /* * Read next page in pending list */ CHECK_FOR_INTERRUPTS(); buffer = ReadBuffer(index, blkno); LockBuffer(buffer, GIN_SHARE); page = BufferGetPage(buffer); } ReleaseBuffer(metabuffer); /* Clean up temporary space */ MemoryContextSwitchTo(oldCtx); MemoryContextDelete(opCtx); }