diff --git a/contrib/dblink/Makefile b/contrib/dblink/Makefile index 8b730fab73..dee7865a5f 100644 --- a/contrib/dblink/Makefile +++ b/contrib/dblink/Makefile @@ -1,4 +1,4 @@ -# $Header: /cvsroot/pgsql/contrib/dblink/Makefile,v 1.4 2001/09/06 10:49:29 petere Exp $ +# $Header: /cvsroot/pgsql/contrib/dblink/Makefile,v 1.5 2002/06/23 21:58:07 momjian Exp $ subdir = contrib/dblink top_builddir = ../.. diff --git a/contrib/dbmirror/AddTrigger.sql b/contrib/dbmirror/AddTrigger.sql new file mode 100644 index 0000000000..da786ec5dd --- /dev/null +++ b/contrib/dbmirror/AddTrigger.sql @@ -0,0 +1,5 @@ + +CREATE TRIGGER "MyTableName_Trig" AFTER INSERT OR DELETE OR UPDATE +ON "MyTableName" FOR EACH ROW EXECUTE PROCEDURE +"recordchange" (); + diff --git a/contrib/dbmirror/COPYING b/contrib/dbmirror/COPYING new file mode 100644 index 0000000000..60549be514 --- /dev/null +++ b/contrib/dbmirror/COPYING @@ -0,0 +1,340 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc. + 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Library General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) 19yy + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) 19yy name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + , 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Library General +Public License instead of this License. diff --git a/contrib/dbmirror/DBMirror.pl b/contrib/dbmirror/DBMirror.pl new file mode 100755 index 0000000000..c934c869b7 --- /dev/null +++ b/contrib/dbmirror/DBMirror.pl @@ -0,0 +1,869 @@ +#!/usr/bin/perl +############################################################################# +# +# DBMirror.pl +# Contains the Database mirroring script. +# This script queries the pending table off the database specified +# (along with the associated schema) for updates that are pending on a +# specific host. The database on that host is then updated with the changes. +# +# +# Written by Steven Singer (ssinger@navtechinc.com) +# (c) 2001-2002 Navtech Systems Support Inc. +# Released under the GNU Public License version 2. See COPYING. +# +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +############################################################################## +# $Id: DBMirror.pl,v 1.1 2002/06/23 21:58:07 momjian Exp $ +# +############################################################################## + +=head1 NAME + +DBMirror.pl - A Perl module to mirror database changes from a master database +to a slave. + +=head1 SYNPOSIS + + +DBMirror.pl slaveConfigfile.conf + + +=head1 DESCRIPTION + +This Perl script will connect to the master database and query its pending +table for a list of pending changes. + +The transactions of the original changes to the master will be preserved +when sending things to the slave. + +=cut + + +=head1 METHODS + +=over 4 + +=cut + + +BEGIN { + # add in a global path to files + # Pg should be included. +} + + +use strict; +use Pg; +use IO::Handle; +sub mirrorCommand($$$$$$); +sub mirrorInsert($$$$$); +sub mirrorDelete($$$$$); +sub mirrorUpdate($$$$$); +sub sendQueryToSlaves($$); +sub logErrorMessage($); +sub openSlaveConnection($); +sub updateMirrorHostTable($$); + sub extractData($$); +local $::masterHost; +local $::masterDb; +local $::masterUser; +local $::masterPassword; +local $::errorThreshold=5; +local $::errorEmailAddr=undef; + +my %slaveInfoHash; +local $::slaveInfo = \%slaveInfoHash; + +my $lastErrorMsg; +my $repeatErrorCount=0; + +my $lastXID; +my $commandCount=0; + +my $masterConn; + +Main(); + +sub Main() { + +#run the configuration file. + if ($#ARGV != 0) { + die "usage: DBMirror.pl configFile\n"; + } + if( ! defined do $ARGV[0]) { + logErrorMessage("Invalid Configuration file $ARGV[0]"); + die; + } + + + my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; + + $masterConn = Pg::connectdb($connectString); + + unless($masterConn->status == PGRES_CONNECTION_OK) { + logErrorMessage("Can't connect to master database\n" . + $masterConn->errorMessage); + die; + } + + + my $firstTime = 1; + while(1) { + if($firstTime == 0) { + sleep 60; + } + $firstTime = 0; +# Open up the connection to the slave. + if(! defined $::slaveInfo->{"status"} || + $::slaveInfo->{"status"} == -1) { + openSlaveConnection($::slaveInfo); + } + + + + sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); + + + #Obtain a list of pending transactions using ordering by our approximation + #to the commit time. The commit time approximation is taken to be the + #SeqId of the last row edit in the transaction. + my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; + $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; + $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; + $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; + $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; + $pendingTransQuery .= " ON pd.\"XID\""; + $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; + $pendingTransQuery .= " GROUP BY pd.\"XID\" "; + $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; + + + my $pendingTransResults = $masterConn->exec($pendingTransQuery); + unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) { + logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); + die; + } + + my $numPendingTrans = $pendingTransResults->ntuples; + my $curTransTuple = 0; + + + # + # This loop loops through each pending transaction in the proper order. + # The Pending row edits for that transaction will be queried from the + # master and sent + committed to the slaves. + while($curTransTuple < $numPendingTrans) { + my $XID = $pendingTransResults->getvalue($curTransTuple,0); + my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); + my $seqId; + + my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; + $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; + $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; + $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; + + $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; + + + my $pendingResults = $masterConn->exec($pendingQuery); + unless($pendingResults->resultStatus==PGRES_TUPLES_OK) { + logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); + die; + } + + + + my $numPending = $pendingResults->ntuples; + my $curTuple = 0; + sendQueryToSlaves(undef,"BEGIN"); + while ($curTuple < $numPending) { + $seqId = $pendingResults->getvalue($curTuple,0); + my $tableName = $pendingResults->getvalue($curTuple,1); + my $op = $pendingResults->getvalue($curTuple,2); + + $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, + $pendingResults,$curTuple) +1; + if($::slaveInfo->{"status"}==-1) { + last; + } + + } + #Now commit the transaction. + if($::slaveInfo->{"status"}==-1) { + last; + } + sendQueryToSlaves(undef,"COMMIT"); + updateMirrorHostTable($XID,$seqId); + if($commandCount > 5000) { + $commandCount = 0; + $::slaveInfo->{"status"} = -1; + $::slaveInfo->{"slaveConn"}->reset; + #Open the connection right away. + openSlaveConnection($::slaveInfo); + + } + + $pendingResults = undef; + $curTransTuple = $curTransTuple +1; + }#while transactions left. + + $pendingTransResults = undef; + + }#while(1) +}#Main + + + +=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple) + +Mirrors a single SQL Command(change to a single row) to the slave. + +=over 4 + +=item * SeqId + +The id number of the change to mirror. This is the +primary key of the pending table. + + +=item * tableName + +The name of the table the transaction takes place on. + +=item * op + +The type of operation this transaction is. 'i' for insert, 'u' for update or +'d' for delete. + +=item * transId + +The Transaction of of the Transaction that this command is part of. + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. If the command is an update then this points to the row +with IsKey equal to true. The next row, curTuple+1 is the contains the +PendingData with IsKey false for the update. + + +=item returns + + +The tuple number of last tuple for this command. This might be equal to +currentTuple or it might be larger (+1 in the case of an Update). + + +=back + +=cut + + +sub mirrorCommand($$$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $op = $_[2]; + my $transId = $_[3]; + my $pendingResults = $_[4]; + my $currentTuple = $_[5]; + + if($op eq 'i') { + $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults + ,$currentTuple); + } + if($op eq 'd') { + $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } + if($op eq 'u') { + $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } + $commandCount = $commandCount +1; + if($commandCount % 100 == 0) { + # print "Sent 100 commmands on SeqId $seqId \n"; + # flush STDOUT; + } + return $currentTuple + } + + +=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple) + +Mirrors an INSERT operation to the slave database. A new row is placed +in the slave database containing the primary key from pendingKeys along with +the data fields contained in the row identified by sourceOid. + +=over 4 + +=item * transId + +The sequence id of the INSERT operation being mirrored. This is the primary +key of the pending table. + +=item * tableName + + +The name of the table the transaction takes place on. + +=item * sourceOid + +The OID of the row in the master database for which this transaction effects. +If the transaction is a delete then the operation is not valid. + +=item * transId + +The Transaction Id of transaction that this insert is part of. + + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of an insert this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple. + + +=back + +=cut + + +sub mirrorInsert($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResults = $_[3]; + my $currentTuple = $_[4]; + my $counter; + my $column; + + my $firstIteration=1; + my %recordValues = extractData($pendingResults,$currentTuple); + + + #Now build the insert query. + my $insertQuery = "INSERT INTO \"$tableName\" ("; + my $valuesQuery = ") VALUES ("; + foreach $column (keys (%recordValues)) { + if($firstIteration==0) { + $insertQuery .= " ,"; + $valuesQuery .= " ,"; + } + $insertQuery .= "\"$column\""; + if(defined $recordValues{$column}) { + my $quotedValue = $recordValues{$column}; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $valuesQuery .= "'$quotedValue'"; + } + else { + $valuesQuery .= "null"; + } + $firstIteration=0; + } + $valuesQuery .= ")"; + sendQueryToSlaves(undef,$insertQuery . $valuesQuery); + return $currentTuple; +} + +=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple) + +Deletes a single row from the slave database. The row is identified by the +primary key for the transaction in the pendingKeys table. + +=over 4 + +=item * SeqId + +The Sequence id for this delete request. + +=item * tableName + +The name of the table to delete the row from. + +=item * transId + +The Transaction Id of the transaction that this command is part of. + + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of a delete this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple. + + +=back + +=cut + + +sub mirrorDelete($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + my %dataHash; + my $currentField; + my $firstField=1; + %dataHash = extractData($pendingResult,$currentTuple); + + my $counter=0; + my $deleteQuery = "DELETE FROM \"$tableName\" WHERE "; + foreach $currentField (keys %dataHash) { + if($firstField==0) { + $deleteQuery .= " AND "; + } + my $currentValue = $dataHash{$currentField}; + $deleteQuery .= "\""; + $deleteQuery .= $currentField; + if(defined $currentValue) { + $deleteQuery .= "\"='"; + $deleteQuery .= $currentValue; + $deleteQuery .= "'"; + } + else { + $deleteQuery .= " is null "; + } + $counter++; + $firstField=0; + } + + sendQueryToSlaves($transId,$deleteQuery); + return $currentTuple; +} + + +=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple) + +Mirrors over an edit request to a single row of the database. +The primary key from before the edit is used to determine which row in the +slave should be changed. + +After the edit takes place on the slave its primary key will match the primary +key the master had immediatly following the edit. All other fields will be set +to the current values. + +Data integrity is maintained because the mirroring is performed in an +SQL transcation so either all pending changes are made or none are. + +=over 4 + +=item * seqId + +The Sequence id of the update. + +=item * tableName + +The name of the table to perform the update on. + +=item * transId + +The transaction Id for the transaction that this command is part of. + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of a delete this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple +1. Which points to the non key row of the update. + + +=back + +=cut + +sub mirrorUpdate($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + + my $counter; + my $quotedValue; + my $updateQuery = "UPDATE \"$tableName\" SET "; + my $currentField; + + + + my %keyValueHash; + my %dataValueHash; + my $firstIteration=1; + + #Extract the Key values. This row contains the values of the + # key fields before the update occours(the WHERE clause) + %keyValueHash = extractData($pendingResult,$currentTuple); + + + #Extract the data values. This is a SET clause that contains + #values for the entire row AFTER the update. + %dataValueHash = extractData($pendingResult,$currentTuple+1); + + $firstIteration=1; + foreach $currentField (keys (%dataValueHash)) { + if($firstIteration==0) { + $updateQuery .= ", "; + } + $updateQuery .= " \"$currentField\"="; + my $currentValue = $dataValueHash{$currentField}; + if(defined $currentValue ) { + $quotedValue = $currentValue; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $updateQuery .= "'$quotedValue'"; + } + else { + $updateQuery .= "null "; + } + $firstIteration=0; + } + + + $updateQuery .= " WHERE "; + $firstIteration=1; + foreach $currentField (keys (%keyValueHash)) { + my $currentValue; + if($firstIteration==0) { + $updateQuery .= " AND "; + } + $updateQuery .= "\"$currentField\"="; + $currentValue = $keyValueHash{$currentField}; + if(defined $currentValue) { + $quotedValue = $currentValue; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $updateQuery .= "'$quotedValue'"; + } + else { + $updateQuery .= " null "; + } + $firstIteration=0; + } + + sendQueryToSlaves($transId,$updateQuery); + return $currentTuple+1; +} + + + +=item sendQueryToSlaves(seqId,sqlQuery) + +Sends an SQL query to the slave. + + +=over 4 + +=item * seqId + +The sequence Id of the command being sent. Undef if no command is associated +with the query being sent. + +=item * sqlQuery + + +SQL operation to perform on the slave. + +=back + +=cut + +sub sendQueryToSlaves($$) { + my $seqId = $_[0]; + my $sqlQuery = $_[1]; + + if($::slaveInfo->{"status"} == 0) { + my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); + unless($queryResult->resultStatus == PGRES_COMMAND_OK) { + my $errorMessage; + $errorMessage = "Error sending query $seqId to " ; + $errorMessage .= $::slaveInfo->{"slaveHost"}; + $errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage; + $errorMessage .= "\n" . $sqlQuery; + logErrorMessage($errorMessage); + $::slaveInfo->{"slaveConn"}->exec("ROLLBACK"); + $::slaveInfo->{"status"} = -1; + } + } + +} + + +=item logErrorMessage(error) + +Mails an error message to the users specified $errorEmailAddr +The error message is also printed to STDERR. + +=over 4 + +=item * error + +The error message to log. + +=back + +=cut + +sub logErrorMessage($) { + my $error = $_[0]; + + if(defined $lastErrorMsg and $error eq $lastErrorMsg) { + if($repeatErrorCount<$::errorThreshold) { + $repeatErrorCount++; + warn($error); + return; + } + + } + $repeatErrorCount=0; + if(defined $::errorEmailAddr) { + my $mailPipe; + open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr"); + print mailPipe "=====================================================\n"; + print mailPipe " DBMirror.pl \n"; + print mailPipe "\n"; + print mailPipe " The DBMirror.pl script has encountred an error. \n"; + print mailPipe " It might indicate that either the master database has\n"; + print mailPipe " gone down or that the connection to a slave database can\n"; + print mailPipe " not be made. \n"; + print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n"; + print mailPipe "\n"; + print mailPipe $error; + print mailPipe "\n\n\n=================================================\n"; + close mailPipe; + } + warn($error); + + $lastErrorMsg = $error; + +} + +sub openSlaveConnection($) { + my $slavePtr = $_[0]; + my $slaveConn; + + + my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; + $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; + $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; + $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; + + $slaveConn = Pg::connectdb($slaveConnString); + + if($slaveConn->status !=PGRES_CONNECTION_OK) { + my $errorMessage = "Can't connect to slave database " ; + $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; + $errorMessage .= $slaveConn->errorMessage; + logErrorMessage($errorMessage); + $slavePtr->{"status"} = -1; + } + else { + $slavePtr->{"slaveConn"} = $slaveConn; + $slavePtr->{"status"} = 0; + #Determine the MirrorHostId for the slave from the master's database + my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' + . ' "MirrorHost" WHERE "HostName"' + . '=\'' . $slavePtr->{"slaveHost"} + . '\''); + if($resultSet->ntuples !=1) { + my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; + $errorMessage .= "Has no MirrorHost entry on master\n"; + logErrorMessage($errorMessage); + $slavePtr->{"status"}=-1; + return; + + } + $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); + + + + } + +} + + +=item updateMirrorHostTable(lastTransId,lastSeqId) + +Updates the MirroredTransaction table to reflect the fact that +this transaction has been sent to the current slave. + +=over 4 + +=item * lastTransId + +The Transaction id for the last transaction that has been succesfully mirrored to +the currently open slaves. + +=item * lastSeqId + +The Sequence Id of the last command that has been succefully mirrored + + +=back + + +=cut + +sub updateMirrorHostTable($$) { + my $lastTransId = shift; + my $lastSeqId = shift; + + if($::slaveInfo->{"status"}==0) { + my $deleteTransactionQuery; + my $deleteResult; + my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; + $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; + $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; + + my $updateResult = $masterConn->exec($updateMasterQuery); + unless($updateResult->resultStatus == PGRES_COMMAND_OK) { + my $errorMessage = $masterConn->errorMessage . "\n"; + $errorMessage .= $updateMasterQuery; + logErrorMessage($errorMessage); + die; + } +# print "Updated slaves to transaction $lastTransId\n" ; +# flush STDOUT; + + #If this transaction has now been mirrored to all mirror hosts + #then it can be deleted. + $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' + . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' + . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' + . ' "MirrorHost")'; + + $deleteResult = $masterConn->exec($deleteTransactionQuery); + if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { + logErrorMessage($masterConn->errorMessage . "\n" . + $deleteTransactionQuery); + die; + } + + } + +} + + +sub extractData($$) { + my $pendingResult = $_[0]; + my $currentTuple = $_[1]; + my $fnumber; + my %valuesHash; + $fnumber = 4; + my $dataField = $pendingResult->getvalue($currentTuple,$fnumber); + + while(length($dataField)>0) { + # Extract the field name that is surronded by double quotes + $dataField =~ m/(\".*?\")/s; + my $fieldName = $1; + $dataField = substr $dataField ,length($fieldName); + $fieldName =~ s/\"//g; #Remove the surronding " signs. + + if($dataField =~ m/(^= )/s) { + #Matched null + $dataField = substr $dataField , length($1); + $valuesHash{$fieldName}=undef; + } + elsif ($dataField =~ m/(^=\')/s) { + #Has data. + my $value; + $dataField = substr $dataField ,2; #Skip the =' + LOOP: { #This is to allow us to use last from a do loop. + #Recommended in perlsyn manpage. + do { + my $matchString; + #Find the substring ending with the first ' or first \ + $dataField =~ m/(.*?[\'\\])?/s; + $matchString = $1; + $value .= substr $matchString,0,length($matchString)-1; + + if($matchString =~ m/(\'$)/s) { + # $1 runs to the end of the field value. + $dataField = substr $dataField,length($matchString)+1; + last; + + } + else { + #deal with the escape character. + #It The character following the escape gets appended. + $dataField = substr $dataField,length($matchString); + $dataField =~ s/(^.)//s; + $value .= $1; + + + + } + + + } until(length($dataField)==0); + } + $valuesHash{$fieldName} = $value; + + + }#else if + else { + + logErrorMessage "Error in PendingData Sequence Id " . + $pendingResult->getvalue($currentTuple,0); + die; + } + + + + } #while + return %valuesHash; + +} diff --git a/contrib/dbmirror/Makefile b/contrib/dbmirror/Makefile new file mode 100644 index 0000000000..668cdfbd27 --- /dev/null +++ b/contrib/dbmirror/Makefile @@ -0,0 +1,10 @@ +# $Header: /cvsroot/pgsql/contrib/dbmirror/Attic/Makefile,v 1.1 2002/06/23 21:58:07 momjian Exp $ + +subdir = contrib/dbmirror +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global + +MODULES = pending +DOCS = README.dbmirror + +include $(top_srcdir)/contrib/contrib-global.mk diff --git a/contrib/dbmirror/MirrorSetup.sql b/contrib/dbmirror/MirrorSetup.sql new file mode 100644 index 0000000000..b28b8e4104 --- /dev/null +++ b/contrib/dbmirror/MirrorSetup.sql @@ -0,0 +1,42 @@ + +CREATE FUNCTION "recordchange" () RETURNS opaque AS +'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; + +CREATE TABLE "MirrorHost" ( +"MirrorHostId" serial, +"HostName" varchar NOT NULL +); + + + + + +CREATE TABLE "Pending" ( +"SeqId" serial, +"TableName" varchar NOT NULL, +"Op" character, +"XID" int4 NOT NULL, +PRIMARY KEY ("SeqId") + +); + +CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); + +CREATE TABLE "PendingData" ( +"SeqId" int4 NOT NULL, +"IsKey" bool NOT NULL, +"Data" varchar, +PRIMARY KEY ("SeqId", "IsKey") , +FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE +); + + +CREATE TABLE "MirroredTransaction" ( +"XID" int4 NOT NULL, +"LastSeqId" int4 NOT NULL, +"MirrorHostId" int4 NOT NULL, +PRIMARY KEY ("XID","MirrorHostId"), +FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, +FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE +CASCADE ON DELETE CASCADE +); diff --git a/contrib/dbmirror/README.dbmirror b/contrib/dbmirror/README.dbmirror new file mode 100644 index 0000000000..8788ffd4e5 --- /dev/null +++ b/contrib/dbmirror/README.dbmirror @@ -0,0 +1,204 @@ +DBMirror - Postgres Database Mirroring +=================================================== + + +DBMirror is a database mirroring system developed for the Postgres +database Written and maintened by Steven Singer(ssinger@navtechinc.com) + + +(c) 2001-2002 Navtech Systems Support Inc. +Released under the GNU Public License version 2. See COPYING. + + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + + +Overrview +-------------------------------------------------------------------- + +The mirroring system is trigger based and provides the following key features: + +-Support for multiple mirror slaves +-Transactions are maintained +-Per table selection of what gets mirrored. + + +The system is based on the idea that a master database exist where all +edits are made to the tables being mirrored. A trigger attatched to the +tables being mirrored runs logging information about the edit to +the Pending table and PendingData table. + +A perl script(DBMirror.pl) runs continiously for each slave database(A database +that the change is supposed to be mirrored to) examining the Pending +table; searching for transactions that need to be sent to that particular slave +database. Those transactions are then mirrored to the slave database and +the MirroredTransaction table is updated to reflect that the transaction has +been sent. + +If the transaction has been sent to all know slave hosts (All entries +in the MirrorHost table) then all records of it are purged from the +Pending tables. + +Installation Instructions +------------------------------------------------------------------------ + +1) Compile pending.c + +The file pending.c contains the recordchange trigger. This runs every +time a row inside of a table being mirrored changes. + + +To build the trigger run make on the "Makefile" in the DBMirror directory. + +The Makefile supplied assumes that the postgres include files are in +/usr/local/pgsql/include/server. + +Postgres-7.1.x installations should change this to +/usr/local/pgsql/include (The server part is for 7.2+) + +If you have installed the postgres include files to another location then +modify the Makefile to reflect this. + +The trigger requires that all postgres headers be installed, this is +accomplished in postgresql(7.1 or 7.2) by running "make install-all-headers" +in the postgres source directory. + +The Makefile should create a file named pending.so that contains the trigger. + +Install this file in /usr/local/pgsql/lib (or another suitable location). + +If you choose a different location the MirrorSetup.sql script will need +to be modified to reflect your new location. The CREATE FUNCTION command +in the MirrorSetup.sql script associates the trigger function with the +pending.so shared library. Modify the arguments to this command if you +choose to install the trigger elsewhere. + +2) Run MirroSetup.sql + +This file contains SQL commands to setup the Mirroring environment. +This includes + +-Telling Postgres about the "recordchange" trigger function. +-Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables + + +To execute the script use psql as follows + +"psql -f MirrorSetup.sql MyDatabaseName" + +where MyDatabaseName is the name of the database you wish to install mirroring +on(Your master). + + +3) Create slaveDatabase.conf files. + +Each slave database needs its own configuration file for the +DBMirror.pl script. See slaveDatabase.conf for a sample. + +The master settings refer to the master database(The one that is +being mirrored). + +The slave settings refer to the database that the data is being mirrored to. +The slaveHost parameter must refer to the machine name of the slave (Either +a resolvable hostname or an IP address). The value for slave host +must match the Hostname field in the MirrorHost table(See step 6). + +The master user must have sufficient permissions to modify the Pending +tables and to read all of the tables being mirrored. + +The slave user must have enough permissions on the slave database to +modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being +mirrored. + +4) Add the trigger to tables. + +Execute the SQL code in AddTrigger.sql once for each table that should +be mirrored. Replace MyTableName with the name of the table that should +be mirrored. + +5) Create the slave database. + +The DBMirror system keeps the contents of mirrored tables identical on the +master and slave databases. When you first install the mirror triggers the +master and slave databases must be the same. + +If you are starting with an empty master database then the slave should +be empty as well. Otherwise use pg_dump to ensure that the slave database +tables are initially identical to the master. + +6) Add entries in the MirrorHost table. + +Each slave database must have an entry in the MirrorHost table. + +The name of the host in the MirrorHost table must exactly match the +slaveHost variable for that slave in the configuration file. + +For example +INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com'); + + +6) Start DBMirror.pl + + +DBMirror.pl is the perl script that handles the mirroring. + +It requires the Perl library Pg(See src/interfaces/perl5 in the postgres +source distribution). + +It takes its configuration file as an argument(The one from step 3) +One instance of DBMirror.pl runs for each slave machine that is receiving +mirrored data. + +Any errors are printed to standard out and emailed to the address specified in +the configuration file. + +DBMirror can be run from the master, the slave, or a third machine as long +as it is able to access both the master and slave databases. + +7) Periodically run clean_pending.pl +clean_pending.pl cleans out any entries from the Pending tables that +have already been mirrored to all hosts in the MirrorHost table. +It uses the same configuration file as DBMirror.pl. + +Normally DBMirror.pl will clean these tables as it goes but in some +circumstances this will not happen. + +For example if a transaction has been mirrored to all slaves except for +one, then that host is removed from the MirrorHost table(It stops being +a mirror slave) the transactions that had already been mirrored to +all the other hosts will not be deleted from the Pending tables by +DBMirror.pl since DBMirror.pl will run against these transactions again +since they have already been sent to all the other hosts. + +clean_pending.pl will remove these transactions. + +TODO(Current Limitations) +---------- +-Support for selective mirroring based on the content of data. +-Support for BLOB's. +-Support for conflict resolution. +-Batching SQL commands in DBMirror for better performance over WAN's. +-Better support for dealing with Schema changes. + +Tested Platforms: +------------------ + +DBMirror has been tested on the following configurations but should +work on any platform with Postgres >= 7.1 and Perl 5.6. + +RedHat Linux 7.1 & 6.2 + -Postgres 7.1.2 + -Perl 5.6 + +Mandrake Linux 8.0(Limited Testing) + -Postgres 7.2 + -Perl 5.6 + + +Steven Singer +Navtech Systems Support Inc. +ssinger@navtechinc.com diff --git a/contrib/dbmirror/clean_pending.pl b/contrib/dbmirror/clean_pending.pl new file mode 100755 index 0000000000..2729288177 --- /dev/null +++ b/contrib/dbmirror/clean_pending.pl @@ -0,0 +1,106 @@ +#!/usr/bin/perl +# clean_pending.pl +# This perl script removes entries from the pending,pendingKeys, +# pendingDeleteData tables that have already been mirrored to all hosts. +# +# +# +# Written by Steven Singer (ssinger@navtechinc.com) +# (c) 2001-2002 Navtech Systems Support Inc. +# Released under the GNU Public License version 2. See COPYING. +# +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +############################################################################## +# $Id: clean_pending.pl,v 1.1 2002/06/23 21:58:08 momjian Exp $ +############################################################################## + + + +=head1 NAME + +clean_pending.pl - A Perl script to remove old entries from the +pending, pendingKeys, and pendingDeleteData tables. + + +=head1 SYNPOSIS + + +clean_pending.pl databasename + + +=head1 DESCRIPTION + + +This Perl script connects to the database specified as a command line argument +on the local system. It uses a hard-coded username and password. +It then removes any entries from the pending, pendingDeleteData, and +pendingKeys tables that have already been sent to all hosts in mirrorHosts. + + +=cut + +BEGIN { + # add in a global path to files + #Ensure that Pg is in the path. +} + + +use strict; +use Pg; +if ($#ARGV != 0) { + die "usage: clean_pending.pl configFile\n"; +} + +if( ! defined do $ARGV[0]) { + die("Invalid Configuration file $ARGV[0]"); +} + +#connect to the database. + +my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; + +my $dbConn = Pg::connectdb($connectString); +unless($dbConn->status == PGRES_CONNECTION_OK) { + printf("Can't connect to database\n"); + die; +} +my $result = $dbConn->exec("BEGIN"); +unless($result->resultStatus == PGRES_COMMAND_OK) { + die $dbConn->errorMessage; +} + + +#delete all transactions that have been sent to all mirrorhosts +#or delete everything if no mirror hosts are defined. +# Postgres takes the "SELECT COUNT(*) FROM "MirrorHost" and makes it into +# an InitPlan. EXPLAIN show's this. +my $deletePendingQuery = 'DELETE FROM "Pending" WHERE (SELECT '; +$deletePendingQuery .= ' COUNT(*) FROM "MirroredTransaction" WHERE '; +$deletePendingQuery .= ' "XID"="Pending"."XID") = (SELECT COUNT(*) FROM '; +$deletePendingQuery .= ' "MirrorHost") OR (SELECT COUNT(*) FROM '; +$deletePendingQuery .= ' "MirrorHost") = 0'; + +my $result = $dbConn->exec($deletePendingQuery); +unless ($result->resultStatus == PGRES_COMMAND_OK ) { + printf($dbConn->errorMessage); + die; +} +$dbConn->exec("COMMIT"); +$result = $dbConn->exec('VACUUM "Pending"'); +unless ($result->resultStatus == PGRES_COMMAND_OK) { + printf($dbConn->errorMessage); +} +$result = $dbConn->exec('VACUUM "PendingData"'); +unless($result->resultStatus == PGRES_COMMAND_OK) { + printf($dbConn->errorMessage); +} +$result = $dbConn->exec('VACUUM "MirroredTransaction"'); +unless($result->resultStatus == PGRES_COMMAND_OK) { + printf($dbConn->errorMessage); +} + diff --git a/contrib/dbmirror/pending.c b/contrib/dbmirror/pending.c new file mode 100644 index 0000000000..a9027dbe50 --- /dev/null +++ b/contrib/dbmirror/pending.c @@ -0,0 +1,447 @@ +/**************************************************************************** + * pending.c + * $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $ + * + * This file contains a trigger for Postgresql-7.x to record changes to tables + * to a pending table for mirroring. + * All tables that should be mirrored should have this trigger hooked up to it. + * + * Written by Steven Singer (ssinger@navtechinc.com) + * (c) 2001-2002 Navtech Systems Support Inc. + * Released under the GNU Public License version 2. See COPYING. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * + ***************************************************************************/ +#include +#include +#include + +enum FieldUsage {PRIMARY=0,NONPRIMARY,ALL,NUM_FIELDUSAGE}; + +int storePending(char * cpTableName, HeapTuple tBeforeTuple, + HeapTuple tAfterTuple, + TupleDesc tTupdesc, + TriggerData * tpTrigdata,char cOp); + +int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, + TriggerData * tpTrigdata); +int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc, + TriggerData * tpTrigData,int iIncludeKeyData); + +int2vector * getPrimaryKey(Oid tblOid); + +char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, + TriggerData * tTrigData, + enum FieldUsage eKeyUsage ); + +#define BUFFER_SIZE 256 +#define MAX_OID_LEN 10 + + +extern Datum recordchange(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(recordchange); + + +/***************************************************************************** + * The entry point for the trigger function. + * The Trigger takes a single SQL 'text' argument indicating the name of the + * table the trigger was applied to. If this name is incorrect so will the + * mirroring. + ****************************************************************************/ +Datum recordchange(PG_FUNCTION_ARGS) { + TriggerData * trigdata; + TupleDesc tupdesc; + HeapTuple beforeTuple=NULL; + HeapTuple afterTuple=NULL; + HeapTuple retTuple=NULL; + char * tblname; + char op; + if(fcinfo->context!=NULL) { + + if(SPI_connect() < 0) { + elog(NOTICE,"storePending could not connect to SPI"); + return -1; + } + trigdata = (TriggerData*)fcinfo->context; + /* Extract the table name */ + tblname = SPI_getrelname(trigdata->tg_relation); + tupdesc = trigdata->tg_relation->rd_att; + if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { + retTuple = trigdata->tg_newtuple; + beforeTuple = trigdata->tg_trigtuple; + afterTuple = trigdata->tg_newtuple; + op='u'; + + } + else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) { + retTuple = trigdata->tg_trigtuple; + afterTuple = trigdata->tg_trigtuple; + op = 'i'; + } + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) { + retTuple = trigdata->tg_trigtuple; + beforeTuple = trigdata->tg_trigtuple; + op = 'd'; + } + + if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) { + /* An error occoured. Skip the operation. */ + elog(ERROR,"Operation could not be mirrored"); + return PointerGetDatum(NULL); + + } +#if defined DEBUG_OUTPUT + elog(NOTICE,"Returning on success"); +#endif + SPI_finish(); + return PointerGetDatum(retTuple); + } + else { + /* + * Not being called as a trigger. + */ + return PointerGetDatum(NULL); + } +} + + +/***************************************************************************** + * Constructs and executes an SQL query to write a record of this tuple change + * to the pending table. + *****************************************************************************/ +int storePending(char * cpTableName, HeapTuple tBeforeTuple, + HeapTuple tAfterTuple, + TupleDesc tTupDesc, + TriggerData * tpTrigData,char cOp) { + char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; + + int iResult=0; + HeapTuple tCurTuple; // Points the current tuple(before or after) + Datum saPlanData[4]; + Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID}; + void * vpPlan; + + tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; + + + + + vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes); + if(vpPlan==NULL) { + elog(NOTICE,"Error creating plan"); + } + // SPI_saveplan(vpPlan); + + saPlanData[0] = PointerGetDatum(cpTableName); + saPlanData[1] = CharGetDatum(cOp); + saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); + + + iResult = SPI_execp(vpPlan,saPlanData,NULL,1); + if(iResult < 0) { + elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult); + } + + +#if defined DEBUG_OUTPUT + elog(NOTICE,"row successfully stored in pending table"); +#endif + + if(cOp=='d') { + /** + * This is a record of a delete operation. + * Just store the key data. + */ + iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData); + } + else if (cOp=='i') { + /** + * An Insert operation. + * Store all data + */ + iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE); + + } + else { + /* op must be an update. */ + iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData); + iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc, + tpTrigData,TRUE); + } + +#if defined DEBUG_OUTPUT + elog(NOTICE,"DOne storing keyinfo"); +#endif + + return iResult; + +} + +int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, + TupleDesc tTupleDesc, + TriggerData * tpTrigData) { + + Oid saPlanArgTypes[1] = {NAMEOID}; + char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; + void * pplan; + Datum saPlanData[1]; + char * cpKeyData; + int iRetCode; + + pplan = SPI_prepare(insQuery,1,saPlanArgTypes); + if(pplan==NULL) { + elog(NOTICE,"Could not prepare INSERT plan"); + return -1; + } + + // pplan = SPI_saveplan(pplan); + cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY); +#if defined DEBUG_OUTPUT + elog(NOTICE,cpKeyData); +#endif + saPlanData[0] = PointerGetDatum(cpKeyData); + + iRetCode = SPI_execp(pplan,saPlanData,NULL,1); + + if(cpKeyData!=NULL) { + SPI_pfree(cpKeyData); + } + + if(iRetCode != SPI_OK_INSERT ) { + elog(NOTICE,"Error inserting row in pendingDelete"); + return -1; + } +#if defined DEBUG_OUTPUT + elog(NOTICE,"INSERT SUCCESFULL"); +#endif + + return 0; + +} + + + + +int2vector * getPrimaryKey(Oid tblOid) { + char * queryBase; + char * query; + bool isNull; + int2vector * resultKey; + int2vector * tpResultKey; + HeapTuple resTuple; + Datum resDatum; + int ret; + queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid="; + query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1); + sprintf(query,"%s%d",queryBase,tblOid); + ret = SPI_exec(query,1); + if(ret != SPI_OK_SELECT || SPI_processed != 1 ) { + elog(NOTICE,"Could not select primary index key"); + return NULL; + } + + resTuple = SPI_tuptable->vals[0]; + resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull); + + tpResultKey = (int2vector*) DatumGetPointer(resDatum); + resultKey = SPI_palloc(sizeof(int2vector)); + memcpy(resultKey,tpResultKey,sizeof(int2vector)); + + SPI_pfree(query); + return resultKey; +} + +/****************************************************************************** + * Stores a copy of the non-key data for the row. + *****************************************************************************/ +int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc, + TriggerData * tpTrigData,int iIncludeKeyData) { + + Oid planArgTypes[1] = {NAMEOID}; + char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; + void * pplan; + Datum planData[1]; + char * cpKeyData; + int iRetValue; + + pplan = SPI_prepare(insQuery,1,planArgTypes); + if(pplan==NULL) { + elog(NOTICE,"Could not prepare INSERT plan"); + return -1; + } + + // pplan = SPI_saveplan(pplan); + if(iIncludeKeyData==0) { + cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY); + } + else { + cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL); + } + + planData[0] = PointerGetDatum(cpKeyData); + iRetValue = SPI_execp(pplan,planData,NULL,1); + + if(cpKeyData!=0) { + SPI_pfree(cpKeyData); + } + + if(iRetValue != SPI_OK_INSERT ) { + elog(NOTICE,"Error inserting row in pendingDelete"); + return -1; + } +#if defined DEBUG_OUTPUT + elog(NOTICE,"INSERT SUCCESFULL"); +#endif + + return 0; + +} + +/** + * Packages the data in tTupleData into a string of the format + * FieldName='value text' where any quotes inside of value text + * are escaped with a backslash and any backslashes in value text + * are esacped by a second back slash. + * + * tTupleDesc should be a description of the tuple stored in + * tTupleData. + * + * eFieldUsage specifies which fields to use. + * PRIMARY implies include only primary key fields. + * NONPRIMARY implies include only non-primary key fields. + * ALL implies include all fields. + */ +char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, + TriggerData * tpTrigData, + enum FieldUsage eKeyUsage ) { + int iNumCols; + int2vector * tpPKeys=NULL; + int iColumnCounter; + char * cpDataBlock; + int iDataBlockSize; + int iUsedDataBlock; + + iNumCols = tTupleDesc->natts; + + if(eKeyUsage!=ALL) { + tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); + if(tpPKeys==NULL) { + return NULL; + } + } +#if defined DEBUG_OUTPUT + if(tpPKeys!=NULL) { + elog(NOTICE,"Have primary keys"); + } +#endif + cpDataBlock = SPI_palloc(BUFFER_SIZE); + iDataBlockSize = BUFFER_SIZE; + iUsedDataBlock = 0; /* To account for the null */ + + for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) { + int iIsPrimaryKey; + int iPrimaryKeyIndex; + char * cpUnFormatedPtr; + char * cpFormatedPtr; + + char * cpFieldName; + char * cpFieldData; + if(eKeyUsage!=ALL) { + //Determine if this is a primary key or not. + iIsPrimaryKey=0; + for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0; + iPrimaryKeyIndex++) { + if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) { + iIsPrimaryKey=1; + break; + } + } + if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) { + /** + * Don't use. + */ +#if defined DEBUG_OUTPUT + elog(NOTICE,"Skipping column"); +#endif + continue; + } + } /* KeyUsage!=ALL */ + cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs + [iColumnCounter-1]->attname)); +#if defined DEBUG_OUTPUT + elog(NOTICE,cpFieldName); +#endif + while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) { + cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE); + iDataBlockSize = iDataBlockSize + BUFFER_SIZE; + } + sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName); + iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3; + cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter); + + cpUnFormatedPtr = cpFieldData; + cpFormatedPtr = cpDataBlock + iUsedDataBlock; + if(cpFieldData!=NULL) { + *cpFormatedPtr='\''; + iUsedDataBlock++; + cpFormatedPtr++; + } + else { + *cpFormatedPtr=' '; + iUsedDataBlock++; + cpFormatedPtr++; + continue; + + } +#if defined DEBUG_OUTPUT + elog(NOTICE,cpFieldData); + elog(NOTICE,"Starting format loop"); +#endif + while(*cpUnFormatedPtr!=0) { + while(iDataBlockSize - iUsedDataBlock < 2) { + cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE); + iDataBlockSize = iDataBlockSize + BUFFER_SIZE; + cpFormatedPtr = cpDataBlock + iUsedDataBlock; + } + if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') { + *cpFormatedPtr='\\'; + cpFormatedPtr++; + iUsedDataBlock++; + } + *cpFormatedPtr=*cpUnFormatedPtr; + cpFormatedPtr++; + cpUnFormatedPtr++; + iUsedDataBlock++; + } + + SPI_pfree(cpFieldData); + + while(iDataBlockSize - iUsedDataBlock < 3) { + cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE); + iDataBlockSize = iDataBlockSize + BUFFER_SIZE; + cpFormatedPtr = cpDataBlock + iUsedDataBlock; + } + sprintf(cpFormatedPtr,"' "); + iUsedDataBlock = iUsedDataBlock +2; +#if defined DEBUG_OUTPUT + elog(NOTICE,cpDataBlock); +#endif + + } /* for iColumnCounter */ + if(tpPKeys!=NULL) { + SPI_pfree(tpPKeys); + } +#if defined DEBUG_OUTPUT + elog(NOTICE,"Returning"); +#endif + memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock); + + return cpDataBlock; + +} diff --git a/contrib/dbmirror/slaveDatabase.conf b/contrib/dbmirror/slaveDatabase.conf new file mode 100644 index 0000000000..c6199bf850 --- /dev/null +++ b/contrib/dbmirror/slaveDatabase.conf @@ -0,0 +1,22 @@ +######################################################################### +# Config file for DBMirror.pl +# This file contains a sample configuration file for DBMirror.pl +# It contains configuration information to mirror data from +# the master database to a single slave system. +# +# $Id: slaveDatabase.conf,v 1.1 2002/06/23 21:58:08 momjian Exp $ +####################################################################### + +$masterHost = "masterMachine.mydomain.com"; +$masterDb = "myDatabase"; +$masterUser = "postgres"; +$masterPassword = "postgrespassword"; + +# Where to email Error messages to +# $errorEmailAddr = "me@mydomain.com"; + +$slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com"; +$slaveInfo->{"slaveDb"} = "myDatabase"; +$slaveInfo->{"slaveUser"} = "postgres"; +$slaveInfo->{"slavePassword"} = "postgrespassword"; +