/**************************************************************************** * pending.c * $Id: pending.c,v 1.18 2004/05/26 00:08:26 wieck Exp $ * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.18 2004/05/26 00:08:26 wieck Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. * All tables that should be mirrored should have this trigger hooked up to it. * * Written by Steven Singer (ssinger@navtechinc.com) * (c) 2001-2002 Navtech Systems Support Inc. * ALL RIGHTS RESERVED * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph and the following two paragraphs appear in all copies. * * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * ***************************************************************************/ #include #include #include #include #include enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupdesc, Oid tableOid, char cOp); int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid); int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 #define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); #if defined DEBUG_OUTPUT #define debug_msg2(x,y) elog(NOTICE,x,y) #define debug_msg(x) elog(NOTICE,x) #define debug_msg3(x,y,z) elog(NOTICE,x,y,z) #else #define debug_msg2(x,y) #define debug_msg(x) #define debug_msg3(x,y,z) #endif extern Datum nextval(PG_FUNCTION_ARGS); extern Datum setval(PG_FUNCTION_ARGS); int saveSequenceUpdate(const text * sequenceName, int nextSequenceValue); /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the * table the trigger was applied to. If this name is incorrect so will the * mirroring. ****************************************************************************/ Datum recordchange(PG_FUNCTION_ARGS) { TriggerData *trigdata; TupleDesc tupdesc; HeapTuple beforeTuple = NULL; HeapTuple afterTuple = NULL; HeapTuple retTuple = NULL; char *tblname; char op = 0; char *schemaname; char *fullyqualtblname; char *pkxpress=NULL; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("dbmirror:recordchange could not connect to SPI"))); return -1; } trigdata = (TriggerData *) fcinfo->context; /* Extract the table name */ tblname = SPI_getrelname(trigdata->tg_relation); #ifndef NOSCHEMAS schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation)); fullyqualtblname = SPI_palloc(strlen(tblname) + strlen(schemaname) + 6); sprintf(fullyqualtblname, "\"%s\".\"%s\"", schemaname, tblname); #else fullyqualtblname = SPI_palloc(strlen(tblname) + 3); sprintf(fullyqualtblname, "\"%s\"", tblname); #endif tupdesc = trigdata->tg_relation->rd_att; if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { retTuple = trigdata->tg_newtuple; beforeTuple = trigdata->tg_trigtuple; afterTuple = trigdata->tg_newtuple; op = 'u'; } else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) { retTuple = trigdata->tg_trigtuple; afterTuple = trigdata->tg_trigtuple; op = 'i'; } else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) { retTuple = trigdata->tg_trigtuple; beforeTuple = trigdata->tg_trigtuple; op = 'd'; } else { ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), errmsg("dbmirror:recordchange Unknown operation"))); } if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, retTuple->t_tableOid, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), errmsg("operation could not be mirrored"))); return PointerGetDatum(NULL); } debug_msg("dbmirror:recordchange returning on success"); SPI_pfree(fullyqualtblname); if(pkxpress != NULL) SPI_pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } else { /* * Not being called as a trigger. */ return PointerGetDatum(NULL); } } /***************************************************************************** * Constructs and executes an SQL query to write a record of this tuple change * to the pending table. *****************************************************************************/ int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, Oid tableOid, char cOp) { char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; char nulls[3]=" "; /* Points the current tuple(before or after) */ Datum saPlanData[3]; Oid taPlanArgTypes[4] = {NAMEOID, CHAROID, INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), errmsg("dbmirror:storePending error creating plan"))); saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); debug_msg("dbmirror:storePending row successfully stored in pending table"); if (cOp == 'd') { /** * This is a record of a delete operation. * Just store the key data. */ iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { /** * An Insert operation. * Store all data */ iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE); } debug_msg("dbmirror:storePending done storing keyinfo"); return iResult; } int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {NAMEOID}; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; int iRetCode; pplan = SPI_prepare(insQuery, 1, saPlanArgTypes); if (pplan == NULL) { elog(NOTICE, "could not prepare INSERT plan"); return -1; } /* pplan = SPI_saveplan(pplan); */ cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), /* cpTableName already contains quotes... */ errmsg("there is no PRIMARY KEY for table %s", cpTableName))); debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData); saPlanData[0] = PointerGetDatum(cpKeyData); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); if (cpKeyData != NULL) SPI_pfree(cpKeyData); if (iRetCode != SPI_OK_INSERT) { ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION) ,errmsg("error inserting row in pendingDelete"))); return -1; } debug_msg("insert successful"); return 0; } int2vector * getPrimaryKey(Oid tblOid) { char *queryBase; char *query; bool isNull; int2vector *resultKey; int2vector *tpResultKey; HeapTuple resTuple; Datum resDatum; int ret; queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid="; query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN + 1); sprintf(query, "%s%d", queryBase, tblOid); ret = SPI_exec(query, 1); SPI_pfree(query); if (ret != SPI_OK_SELECT || SPI_processed != 1) return NULL; resTuple = SPI_tuptable->vals[0]; resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull); tpResultKey = (int2vector *) DatumGetPointer(resDatum); resultKey = SPI_palloc(sizeof(int2vector)); memcpy(resultKey, tpResultKey, sizeof(int2vector)); return resultKey; } /****************************************************************************** * Stores a copy of the non-key data for the row. *****************************************************************************/ int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; int iRetValue; pplan = SPI_prepare(insQuery, 1, planArgTypes); if (pplan == NULL) { elog(NOTICE, "could not prepare INSERT plan"); return -1; } /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) cpKeyData = packageData(tTupleData, tTupleDesc, tableOid, NONPRIMARY); else cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); if (cpKeyData != 0) SPI_pfree(cpKeyData); if (iRetValue != SPI_OK_INSERT) { elog(NOTICE, "error inserting row in pendingDelete"); return -1; } debug_msg("dbmirror:storeKeyData insert successful"); return 0; } /** * Packages the data in tTupleData into a string of the format * FieldName='value text' where any quotes inside of value text * are escaped with a backslash and any backslashes in value text * are esacped by a second back slash. * * tTupleDesc should be a description of the tuple stored in * tTupleData. * * eFieldUsage specifies which fields to use. * PRIMARY implies include only primary key fields. * NONPRIMARY implies include only non-primary key fields. * ALL implies include all fields. */ char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; int2vector *tpPKeys = NULL; int iColumnCounter; char *cpDataBlock; int iDataBlockSize; int iUsedDataBlock; iNumCols = tTupleDesc->natts; if (eKeyUsage != ALL) { tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } if (tpPKeys != NULL) { debug_msg("dbmirror:packageData have primary keys"); } cpDataBlock = SPI_palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++) { int iIsPrimaryKey; int iPrimaryKeyIndex; char *cpUnFormatedPtr; char *cpFormatedPtr; char *cpFieldName; char *cpFieldData; if (eKeyUsage != ALL) { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0; iPrimaryKeyIndex++) { if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter) { iIsPrimaryKey = 1; break; } } if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY)) { /** * Don't use. */ debug_msg("dbmirror:packageData skipping column"); continue; } } /* KeyUsage!=ALL */ if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) { /** * This column has been dropped. * Do not mirror it. */ continue; } cpFieldName = DatumGetPointer(NameGetDatum (&tTupleDesc->attrs [iColumnCounter - 1]->attname)); debug_msg2("dbmirror:packageData field name: %s", cpFieldName); while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) { cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; if (cpFieldData != NULL) { *cpFormatedPtr = '\''; iUsedDataBlock++; cpFormatedPtr++; } else { sprintf(cpFormatedPtr, " "); iUsedDataBlock++; cpFormatedPtr++; continue; } debug_msg2("dbmirror:packageData field data: \"%s\"", cpFieldData); debug_msg("dbmirror:packageData starting format loop"); while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'') { *cpFormatedPtr = '\\'; cpFormatedPtr++; iUsedDataBlock++; } *cpFormatedPtr = *cpUnFormatedPtr; cpFormatedPtr++; cpUnFormatedPtr++; iUsedDataBlock++; } SPI_pfree(cpFieldData); while (iDataBlockSize - iUsedDataBlock < 3) { cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; debug_msg2("dbmirror:packageData data block: \"%s\"", cpDataBlock); } /* for iColumnCounter */ if (tpPKeys != NULL) SPI_pfree(tpPKeys); debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize, iUsedDataBlock); memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } PG_FUNCTION_INFO_V1(setval); Datum setval(PG_FUNCTION_ARGS) { text * sequenceName; Oid setvalArgTypes[2] = {TEXTOID,INT4OID}; int nextValue; void * setvalPlan=NULL; Datum setvalData[2]; const char * setvalQuery = "SELECT setval_pg($1,$2)"; int ret; sequenceName = PG_GETARG_TEXT_P(0); nextValue = PG_GETARG_INT32(1); setvalData[0] = PointerGetDatum(sequenceName); setvalData[1] = Int32GetDatum(nextValue); if (SPI_connect() < 0) { ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("dbmirror:setval could not connect to SPI"))); return -1; } setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes); if(setvalPlan == NULL) { ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("dbmirror:setval could not prepare plan"))); return -1; } ret = SPI_execp(setvalPlan,setvalData,NULL,1); if(ret != SPI_OK_SELECT || SPI_processed != 1) return -1; debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue); ret = saveSequenceUpdate(sequenceName,nextValue); SPI_pfree(setvalPlan); SPI_finish(); debug_msg("dbmirror:setval about to return"); return Int64GetDatum(nextValue); } PG_FUNCTION_INFO_V1(nextval); Datum nextval(PG_FUNCTION_ARGS) { text * sequenceName; const char * nextvalQuery = "SELECT nextval_pg($1)"; Oid nextvalArgTypes[1] = {TEXTOID}; void * nextvalPlan=NULL; Datum nextvalData[1]; int ret; HeapTuple resTuple; char isNull; int nextSequenceValue; debug_msg("dbmirror:nextval Starting pending.so:nextval"); sequenceName = PG_GETARG_TEXT_P(0); if (SPI_connect() < 0) { ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("dbmirror:nextval could not connect to SPI"))); return -1; } nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes); debug_msg("prepared plan to call nextval_pg"); if(nextvalPlan==NULL) { ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("dbmirror:nextval error creating plan"))); return -1; } nextvalData[0] = PointerGetDatum(sequenceName); ret = SPI_execp(nextvalPlan,nextvalData,NULL,1); debug_msg("dbmirror:Executed call to nextval_pg"); if(ret != SPI_OK_SELECT || SPI_processed != 1) return -1; resTuple = SPI_tuptable->vals[0]; debug_msg("dbmirror:nextval Set resTuple"); nextSequenceValue =* (unsigned int *)(DatumGetPointer(SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1,&isNull))); debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue); saveSequenceUpdate(sequenceName,nextSequenceValue); SPI_pfree(resTuple); SPI_pfree(nextvalPlan); SPI_finish(); return Int64GetDatum(nextSequenceValue); } int saveSequenceUpdate(const text * sequenceName, int nextSequenceVal) { Oid insertArgTypes[2] = {TEXTOID,INT4OID}; Oid insertDataArgTypes[1] = {NAMEOID}; void * insertPlan=NULL; void * insertDataPlan=NULL; Datum insertDatum[2]; Datum insertDataDatum[1]; char nextSequenceText[32]; const char * insertQuery = "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \ "($1,'s',$2)"; const char * insertDataQuery = "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \ "(currval('dbmirror_pending_seqid_seq'),'t',$1)"; int ret; insertPlan = SPI_prepare(insertQuery,2,insertArgTypes); insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes); debug_msg("Prepared insert query"); if(insertPlan == NULL || insertDataPlan == NULL) { ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan"))); } insertDatum[1] = Int32GetDatum(GetCurrentTransactionId()); insertDatum[0] = PointerGetDatum(sequenceName); sprintf(nextSequenceText,"%d",nextSequenceVal); insertDataDatum[0] = PointerGetDatum(nextSequenceText); debug_msg2("dbmirror:savesequenceupdate: Setting value %s", nextSequenceText); debug_msg("dbmirror:About to execute insert query"); ret = SPI_execp(insertPlan,insertDatum,NULL,1); ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1); debug_msg("dbmirror:Insert query finished"); SPI_pfree(insertPlan); SPI_pfree(insertDataPlan); return ret; }