postgresql/src/backend/commands/sequence.c

899 lines
22 KiB
C
Raw Normal View History

1997-04-02 05:51:23 +02:00
/*-------------------------------------------------------------------------
*
* sequence.c
* PostgreSQL sequences support code.
1997-04-02 05:51:23 +02:00
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.67 2001/11/05 17:46:24 momjian Exp $
*
1997-04-02 05:51:23 +02:00
*-------------------------------------------------------------------------
*/
#include "postgres.h"
1997-04-02 05:51:23 +02:00
#include <ctype.h>
#include "access/heapam.h"
#include "commands/creatinh.h"
#include "commands/sequence.h"
1999-07-16 07:00:38 +02:00
#include "miscadmin.h"
#include "utils/acl.h"
1999-07-16 07:00:38 +02:00
#include "utils/builtins.h"
#include "utils/int8.h"
#ifdef MULTIBYTE
#include "mb/pg_wchar.h"
#endif
1997-04-02 05:51:23 +02:00
#define SEQ_MAGIC 0x1717
1997-04-02 05:51:23 +02:00
#ifndef INT64_IS_BUSTED
#ifdef HAVE_LL_CONSTANTS
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFFLL)
#else
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFFFFFFFFFF)
#endif
#else /* INT64_IS_BUSTED */
#define SEQ_MAXVALUE ((int64) 0x7FFFFFFF)
#endif /* INT64_IS_BUSTED */
#define SEQ_MINVALUE (-SEQ_MAXVALUE)
1997-04-02 05:51:23 +02:00
/*
* We don't want to log each fetching values from sequences,
* so we pre-log a few fetches in advance. In the event of
* crash we can lose as much as we pre-logged.
*/
2001-03-22 05:01:46 +01:00
#define SEQ_LOG_VALS 32
typedef struct sequence_magic
{
uint32 magic;
} sequence_magic;
1997-04-02 05:51:23 +02:00
typedef struct SeqTableData
{
char *name;
Oid relid;
Relation rel; /* NULL if rel is not open in cur xact */
int64 cached;
int64 last;
int64 increment;
struct SeqTableData *next;
} SeqTableData;
1997-04-02 05:51:23 +02:00
typedef SeqTableData *SeqTable;
static SeqTable seqtab = NULL;
static char *get_seq_name(text *seqin);
static SeqTable init_sequence(char *caller, char *name);
1998-09-01 05:29:17 +02:00
static Form_pg_sequence read_info(char *caller, SeqTable elm, Buffer *buf);
static void init_params(CreateSeqStmt *seq, Form_pg_sequence new);
static int64 get_param(DefElem *def);
static void do_setval(char *seqname, int64 next, bool iscalled);
1997-04-02 05:51:23 +02:00
/*
1999-05-25 18:15:34 +02:00
* DefineSequence
* Creates a new sequence relation
1997-04-02 05:51:23 +02:00
*/
void
DefineSequence(CreateSeqStmt *seq)
1997-04-02 05:51:23 +02:00
{
FormData_pg_sequence new;
CreateStmt *stmt = makeNode(CreateStmt);
ColumnDef *coldef;
TypeName *typnam;
Relation rel;
Buffer buf;
PageHeader page;
sequence_magic *sm;
HeapTuple tuple;
TupleDesc tupDesc;
Datum value[SEQ_COL_LASTCOL];
char null[SEQ_COL_LASTCOL];
int i;
NameData name;
/* Check and set values */
init_params(seq, &new);
/*
1998-09-01 05:29:17 +02:00
* Create relation (and fill *null & *value)
*/
stmt->tableElts = NIL;
for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
1997-04-02 05:51:23 +02:00
{
typnam = makeNode(TypeName);
typnam->setof = FALSE;
typnam->arrayBounds = NULL;
1998-02-07 07:11:56 +01:00
typnam->typmod = -1;
coldef = makeNode(ColumnDef);
coldef->typename = typnam;
coldef->raw_default = NULL;
coldef->cooked_default = NULL;
coldef->is_not_null = false;
null[i - 1] = ' ';
switch (i)
{
case SEQ_COL_NAME:
typnam->name = "name";
coldef->colname = "sequence_name";
namestrcpy(&name, seq->seqname);
value[i - 1] = NameGetDatum(&name);
break;
case SEQ_COL_LASTVAL:
typnam->name = "int8";
coldef->colname = "last_value";
value[i - 1] = Int64GetDatumFast(new.last_value);
break;
case SEQ_COL_INCBY:
typnam->name = "int8";
coldef->colname = "increment_by";
value[i - 1] = Int64GetDatumFast(new.increment_by);
break;
case SEQ_COL_MAXVALUE:
typnam->name = "int8";
coldef->colname = "max_value";
value[i - 1] = Int64GetDatumFast(new.max_value);
break;
case SEQ_COL_MINVALUE:
typnam->name = "int8";
coldef->colname = "min_value";
value[i - 1] = Int64GetDatumFast(new.min_value);
break;
case SEQ_COL_CACHE:
typnam->name = "int8";
coldef->colname = "cache_value";
value[i - 1] = Int64GetDatumFast(new.cache_value);
break;
case SEQ_COL_LOG:
typnam->name = "int8";
coldef->colname = "log_cnt";
value[i - 1] = Int64GetDatum((int64) 1);
break;
case SEQ_COL_CYCLE:
typnam->name = "bool";
coldef->colname = "is_cycled";
value[i - 1] = BoolGetDatum(new.is_cycled);
break;
case SEQ_COL_CALLED:
typnam->name = "bool";
coldef->colname = "is_called";
value[i - 1] = BoolGetDatum(false);
break;
}
stmt->tableElts = lappend(stmt->tableElts, coldef);
}
stmt->relname = seq->seqname;
stmt->inhRelnames = NIL;
stmt->constraints = NIL;
stmt->istemp = seq->istemp;
stmt->hasoids = false;
DefineRelation(stmt, RELKIND_SEQUENCE);
rel = heap_openr(seq->seqname, AccessExclusiveLock);
1998-09-01 05:29:17 +02:00
tupDesc = RelationGetDescr(rel);
/* Initialize first page of relation with special magic number */
buf = ReadBuffer(rel, P_NEW);
if (!BufferIsValid(buf))
elog(ERROR, "DefineSequence: ReadBuffer failed");
Assert(BufferGetBlockNumber(buf) == 0);
page = (PageHeader) BufferGetPage(buf);
PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));
sm = (sequence_magic *) PageGetSpecialPointer(page);
sm->magic = SEQ_MAGIC;
/* hack: ensure heap_insert will insert on the just-created page */
rel->rd_targblock = 0;
/* Now - form & insert sequence tuple */
tuple = heap_formtuple(tupDesc, value, null);
heap_insert(rel, tuple);
/*
* After crash REDO of heap_insert above would re-init page and our
* magic number would be lost. We have to log sequence creation. This
* means two log records instead of one -:(
*/
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
START_CRIT_SECTION();
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
/* We do not log first nextval call, so "advance" sequence here */
newseq->is_called = true;
newseq->log_cnt = 0;
xlrec.node = rel->rd_node;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) tuple->t_data;
rdata[1].len = tuple->t_len;
rdata[1].next = NULL;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
heap_close(rel, AccessExclusiveLock);
1997-04-02 05:51:23 +02:00
}
Datum
nextval(PG_FUNCTION_ARGS)
1997-04-02 05:51:23 +02:00
{
text *seqin = PG_GETARG_TEXT_P(0);
char *seqname = get_seq_name(seqin);
SeqTable elm;
Buffer buf;
1998-09-01 05:29:17 +02:00
Form_pg_sequence seq;
int64 incby,
maxv,
minv,
cache,
log,
fetch,
last;
int64 result,
next,
rescnt = 0;
bool logit = false;
if (pg_aclcheck(seqname, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("nextval", seqname);
pfree(seqname);
if (elm->last != elm->cached) /* some numbers were cached */
{
elm->last += elm->increment;
PG_RETURN_INT64(elm->last);
1997-04-02 05:51:23 +02:00
}
1999-05-25 18:15:34 +02:00
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
last = next = result = seq->last_value;
incby = seq->increment_by;
maxv = seq->max_value;
minv = seq->min_value;
fetch = cache = seq->cache_value;
log = seq->log_cnt;
if (!seq->is_called)
{
rescnt++; /* last_value if not called */
fetch--;
log--;
}
if (log < fetch)
{
fetch = log = fetch - log + SEQ_LOG_VALS;
logit = true;
}
2001-03-22 05:01:46 +01:00
while (fetch) /* try to fetch cache [+ log ] numbers */
1997-04-02 05:51:23 +02:00
{
/*
* Check MAXVALUE for ascending sequences and MINVALUE for
* descending sequences
*/
if (incby > 0)
{
/* ascending sequence */
if ((maxv >= 0 && next > maxv - incby) ||
(maxv < 0 && next + incby > maxv))
{
if (rescnt > 0)
break; /* stop fetching */
if (!seq->is_cycled)
elog(ERROR, "%s.nextval: reached MAXVALUE (" INT64_FORMAT ")",
elm->name, maxv);
next = minv;
}
else
next += incby;
}
else
{
/* descending sequence */
if ((minv < 0 && next < minv - incby) ||
(minv >= 0 && next + incby < minv))
{
if (rescnt > 0)
break; /* stop fetching */
if (!seq->is_cycled)
elog(ERROR, "%s.nextval: reached MINVALUE (" INT64_FORMAT ")",
elm->name, minv);
next = maxv;
}
else
next += incby;
}
fetch--;
if (rescnt < cache)
{
log--;
rescnt++;
last = next;
2001-03-22 05:01:46 +01:00
if (rescnt == 1) /* if it's first result - */
result = next; /* it's what to return */
}
}
/* save info in local cache */
elm->last = result; /* last returned number */
elm->cached = last; /* last fetched number */
START_CRIT_SECTION();
if (logit)
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
2001-03-22 05:01:46 +01:00
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node;
rdata[0].buffer = InvalidBuffer;
2001-03-22 05:01:46 +01:00
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].buffer = InvalidBuffer;
2001-03-22 05:01:46 +01:00
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].next = NULL;
2001-03-22 05:01:46 +01:00
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
2001-03-22 05:01:46 +01:00
if (fetch) /* not all numbers were fetched */
log -= fetch;
}
/* update on-disk data */
seq->last_value = last; /* last fetched number */
seq->is_called = true;
Assert(log >= 0);
seq->log_cnt = log; /* how much is logged */
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.nextval: WriteBuffer failed", elm->name);
PG_RETURN_INT64(result);
1997-04-02 05:51:23 +02:00
}
Datum
currval(PG_FUNCTION_ARGS)
1997-04-02 05:51:23 +02:00
{
text *seqin = PG_GETARG_TEXT_P(0);
char *seqname = get_seq_name(seqin);
SeqTable elm;
int64 result;
if (pg_aclcheck(seqname, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
elog(ERROR, "%s.currval: you don't have permissions to read sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("currval", seqname);
if (elm->increment == 0) /* nextval/read_info were not called */
elog(ERROR, "%s.currval is not yet defined in this session",
seqname);
result = elm->last;
pfree(seqname);
PG_RETURN_INT64(result);
1997-04-02 05:51:23 +02:00
}
2001-03-22 05:01:46 +01:00
/*
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
*
* Note that the 3 arg version (which sets the is_called flag) is
* only for use in pg_dump, and setting the is_called flag may not
2001-03-22 05:01:46 +01:00
* work if multiple users are attached to the database and referencing
* the sequence (unlikely if pg_dump is restoring it).
*
2001-03-22 05:01:46 +01:00
* It is necessary to have the 3 arg version so that pg_dump can
* restore the state of a sequence exactly during data-only restores -
* it is the only way to clear the is_called flag in an existing
* sequence.
*/
2000-10-16 19:08:11 +02:00
static void
do_setval(char *seqname, int64 next, bool iscalled)
{
SeqTable elm;
Buffer buf;
1998-09-01 05:29:17 +02:00
Form_pg_sequence seq;
if (pg_aclcheck(seqname, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
elog(ERROR, "%s.setval: you don't have permissions to set sequence %s",
seqname, seqname);
/* open and AccessShareLock sequence */
elm = init_sequence("setval", seqname);
1999-05-25 18:15:34 +02:00
seq = read_info("setval", elm, &buf); /* lock page' buffer and
* read tuple */
if ((next < seq->min_value) || (next > seq->max_value))
elog(ERROR, "%s.setval: value " INT64_FORMAT " is out of bounds (" INT64_FORMAT "," INT64_FORMAT ")",
seqname, next, seq->min_value, seq->max_value);
/* save info in local cache */
elm->last = next; /* last returned number */
2001-03-22 05:01:46 +01:00
elm->cached = next; /* last cached number (forget cached
* values) */
START_CRIT_SECTION();
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
2001-03-22 05:01:46 +01:00
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = elm->rel->rd_node;
rdata[0].buffer = InvalidBuffer;
2001-03-22 05:01:46 +01:00
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].next = &(rdata[1]);
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].buffer = InvalidBuffer;
2001-03-22 05:01:46 +01:00
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].next = NULL;
2001-03-22 05:01:46 +01:00
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled;
seq->log_cnt = (iscalled) ? 0 : 1;
END_CRIT_SECTION();
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (WriteBuffer(buf) == STATUS_ERROR)
elog(ERROR, "%s.setval: WriteBuffer failed", seqname);
pfree(seqname);
}
/*
* Implement the 2 arg setval procedure.
* See do_setval for discussion.
*/
Datum
setval(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
int64 next = PG_GETARG_INT64(1);
char *seqname = get_seq_name(seqin);
do_setval(seqname, next, true);
PG_RETURN_INT64(next);
}
/*
* Implement the 3 arg setval procedure.
* See do_setval for discussion.
*/
Datum
setval_and_iscalled(PG_FUNCTION_ARGS)
{
text *seqin = PG_GETARG_TEXT_P(0);
int64 next = PG_GETARG_INT64(1);
bool iscalled = PG_GETARG_BOOL(2);
char *seqname = get_seq_name(seqin);
do_setval(seqname, next, iscalled);
PG_RETURN_INT64(next);
}
/*
* Given a 'text' parameter to a sequence function, extract the actual
* sequence name. We downcase the name if it's not double-quoted,
* and truncate it if it's too long.
*
* This is a kluge, really --- should be able to write nextval(seqrel).
*/
static char *
get_seq_name(text *seqin)
{
char *rawname = DatumGetCString(DirectFunctionCall1(textout,
2001-03-22 05:01:46 +01:00
PointerGetDatum(seqin)));
int rawlen = strlen(rawname);
char *seqname;
if (rawlen >= 2 &&
rawname[0] == '\"' && rawname[rawlen - 1] == '\"')
{
/* strip off quotes, keep case */
rawname[rawlen - 1] = '\0';
seqname = pstrdup(rawname + 1);
pfree(rawname);
}
else
{
seqname = rawname;
2001-03-22 05:01:46 +01:00
/*
* It's important that this match the identifier downcasing code
* used by backend/parser/scan.l.
*/
for (; *rawname; rawname++)
{
if (isupper((unsigned char) *rawname))
*rawname = tolower((unsigned char) *rawname);
}
}
/* Truncate name if it's overlength; again, should match scan.l */
if (strlen(seqname) >= NAMEDATALEN)
{
#ifdef MULTIBYTE
int len;
len = pg_mbcliplen(seqname, rawlen, NAMEDATALEN - 1);
seqname[len] = '\0';
#else
seqname[NAMEDATALEN - 1] = '\0';
#endif
}
return seqname;
}
1998-09-01 05:29:17 +02:00
static Form_pg_sequence
1997-09-08 22:59:27 +02:00
read_info(char *caller, SeqTable elm, Buffer *buf)
1997-04-02 05:51:23 +02:00
{
1999-05-25 18:15:34 +02:00
PageHeader page;
ItemId lp;
HeapTupleData tuple;
sequence_magic *sm;
1999-05-25 18:15:34 +02:00
Form_pg_sequence seq;
if (elm->rel->rd_nblocks > 1)
elog(ERROR, "%s.%s: invalid number of blocks in sequence",
elm->name, caller);
*buf = ReadBuffer(elm->rel, 0);
if (!BufferIsValid(*buf))
elog(ERROR, "%s.%s: ReadBuffer failed", elm->name, caller);
LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
page = (PageHeader) BufferGetPage(*buf);
sm = (sequence_magic *) PageGetSpecialPointer(page);
if (sm->magic != SEQ_MAGIC)
elog(ERROR, "%s.%s: bad magic (%08X)", elm->name, caller, sm->magic);
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsUsed(lp));
1998-11-27 20:52:36 +01:00
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
1998-11-27 20:52:36 +01:00
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
elm->increment = seq->increment_by;
1998-09-01 05:29:17 +02:00
return seq;
1997-04-02 05:51:23 +02:00
}
static SeqTable
init_sequence(char *caller, char *name)
1997-04-02 05:51:23 +02:00
{
SeqTable elm,
prev = (SeqTable) NULL;
Relation seqrel;
/* Look to see if we already have a seqtable entry for name */
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
{
if (strcmp(elm->name, name) == 0)
break;
prev = elm;
}
/* If so, and if it's already been opened in this xact, just return it */
if (elm != (SeqTable) NULL && elm->rel != (Relation) NULL)
return elm;
/* Else open and check it */
seqrel = heap_openr(name, AccessShareLock);
if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
elog(ERROR, "%s.%s: %s is not a sequence", name, caller, name);
if (elm != (SeqTable) NULL)
{
/*
* We are using a seqtable entry left over from a previous xact;
* must check for relid change.
*/
elm->rel = seqrel;
if (RelationGetRelid(seqrel) != elm->relid)
{
elog(NOTICE, "%s.%s: sequence was re-created",
name, caller);
elm->relid = RelationGetRelid(seqrel);
elm->cached = elm->last = elm->increment = 0;
}
}
else
{
/*
* Time to make a new seqtable entry. These entries live as long
* as the backend does, so we use plain malloc for them.
*/
elm = (SeqTable) malloc(sizeof(SeqTableData));
2001-06-01 21:54:58 +02:00
if (elm == NULL)
elog(ERROR, "Memory exhausted in init_sequence");
elm->name = strdup(name);
if (elm->name == NULL)
elog(ERROR, "Memory exhausted in init_sequence");
elm->rel = seqrel;
elm->relid = RelationGetRelid(seqrel);
elm->cached = elm->last = elm->increment = 0;
elm->next = (SeqTable) NULL;
if (seqtab == (SeqTable) NULL)
seqtab = elm;
else
prev->next = elm;
}
1998-09-01 05:29:17 +02:00
return elm;
1997-04-02 05:51:23 +02:00
}
/*
1999-05-25 18:15:34 +02:00
* CloseSequences
* is called by xact mgr at commit/abort.
1997-04-02 05:51:23 +02:00
*/
void
CloseSequences(void)
1997-04-02 05:51:23 +02:00
{
SeqTable elm;
Relation rel;
for (elm = seqtab; elm != (SeqTable) NULL; elm = elm->next)
1997-04-02 05:51:23 +02:00
{
rel = elm->rel;
if (rel != (Relation) NULL) /* opened in current xact */
{
elm->rel = (Relation) NULL;
heap_close(rel, AccessShareLock);
}
}
1997-04-02 05:51:23 +02:00
}
static void
1998-09-01 05:29:17 +02:00
init_params(CreateSeqStmt *seq, Form_pg_sequence new)
1997-04-02 05:51:23 +02:00
{
DefElem *last_value = NULL;
DefElem *increment_by = NULL;
DefElem *max_value = NULL;
DefElem *min_value = NULL;
DefElem *cache_value = NULL;
List *option;
new->is_cycled = false;
foreach(option, seq->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "increment") == 0)
increment_by = defel;
else if (strcmp(defel->defname, "start") == 0)
last_value = defel;
else if (strcmp(defel->defname, "maxvalue") == 0)
max_value = defel;
else if (strcmp(defel->defname, "minvalue") == 0)
min_value = defel;
else if (strcmp(defel->defname, "cache") == 0)
cache_value = defel;
else if (strcmp(defel->defname, "cycle") == 0)
{
if (defel->arg != (Node *) NULL)
elog(ERROR, "DefineSequence: CYCLE ??");
new->is_cycled = true;
}
else
elog(ERROR, "DefineSequence: option \"%s\" not recognized",
defel->defname);
}
if (increment_by == (DefElem *) NULL) /* INCREMENT BY */
new->increment_by = 1;
else if ((new->increment_by = get_param(increment_by)) == 0)
elog(ERROR, "DefineSequence: can't INCREMENT by 0");
if (max_value == (DefElem *) NULL) /* MAXVALUE */
{
if (new->increment_by > 0)
new->max_value = SEQ_MAXVALUE; /* ascending seq */
else
new->max_value = -1; /* descending seq */
}
1997-04-02 05:51:23 +02:00
else
new->max_value = get_param(max_value);
1997-04-02 05:51:23 +02:00
if (min_value == (DefElem *) NULL) /* MINVALUE */
{
if (new->increment_by > 0)
new->min_value = 1; /* ascending seq */
else
new->min_value = SEQ_MINVALUE; /* descending seq */
}
1997-04-02 05:51:23 +02:00
else
new->min_value = get_param(min_value);
if (new->min_value >= new->max_value)
elog(ERROR, "DefineSequence: MINVALUE (" INT64_FORMAT ") can't be >= MAXVALUE (" INT64_FORMAT ")",
new->min_value, new->max_value);
if (last_value == (DefElem *) NULL) /* START WITH */
{
if (new->increment_by > 0)
new->last_value = new->min_value; /* ascending seq */
else
new->last_value = new->max_value; /* descending seq */
}
1997-04-02 05:51:23 +02:00
else
new->last_value = get_param(last_value);
if (new->last_value < new->min_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be < MINVALUE (" INT64_FORMAT ")",
new->last_value, new->min_value);
if (new->last_value > new->max_value)
elog(ERROR, "DefineSequence: START value (" INT64_FORMAT ") can't be > MAXVALUE (" INT64_FORMAT ")",
new->last_value, new->max_value);
if (cache_value == (DefElem *) NULL) /* CACHE */
new->cache_value = 1;
else if ((new->cache_value = get_param(cache_value)) <= 0)
elog(ERROR, "DefineSequence: CACHE (" INT64_FORMAT ") can't be <= 0",
new->cache_value);
1997-04-02 05:51:23 +02:00
}
static int64
get_param(DefElem *def)
1997-04-02 05:51:23 +02:00
{
if (def->arg == (Node *) NULL)
elog(ERROR, "DefineSequence: \"%s\" value unspecified", def->defname);
if (IsA(def->arg, Integer))
return (int64) intVal(def->arg);
/*
* Values too large for int4 will be represented as Float constants by
* the lexer. Accept these if they are valid int8 strings.
*/
if (IsA(def->arg, Float))
return DatumGetInt64(DirectFunctionCall1(int8in,
CStringGetDatum(strVal(def->arg))));
/* Shouldn't get here unless parser messed up */
elog(ERROR, "DefineSequence: \"%s\" value must be integer", def->defname);
return 0; /* not reached; keep compiler quiet */
1997-04-02 05:51:23 +02:00
}
2001-03-22 05:01:46 +01:00
void
seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
2001-03-22 05:01:46 +01:00
uint8 info = record->xl_info & ~XLR_INFO_MASK;
Relation reln;
Buffer buffer;
Page page;
char *item;
Size itemsz;
xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
sequence_magic *sm;
if (info != XLOG_SEQ_LOG)
elog(STOP, "seq_redo: unknown op code %u", info);
reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, 0);
if (!BufferIsValid(buffer))
2001-03-22 05:01:46 +01:00
elog(STOP, "seq_redo: can't read block of %u/%u",
xlrec->node.tblNode, xlrec->node.relNode);
page = (Page) BufferGetPage(buffer);
PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic));
sm = (sequence_magic *) PageGetSpecialPointer(page);
sm->magic = SEQ_MAGIC;
2001-03-22 05:01:46 +01:00
item = (char *) xlrec + sizeof(xl_seq_rec);
itemsz = record->xl_len - sizeof(xl_seq_rec);
itemsz = MAXALIGN(itemsz);
2001-03-22 05:01:46 +01:00
if (PageAddItem(page, (Item) item, itemsz,
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(STOP, "seq_redo: failed to add item to page");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
2001-03-22 05:01:46 +01:00
void
seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
2001-03-22 05:01:46 +01:00
void
seq_desc(char *buf, uint8 xl_info, char *rec)
{
2001-03-22 05:01:46 +01:00
uint8 info = xl_info & ~XLR_INFO_MASK;
xl_seq_rec *xlrec = (xl_seq_rec *) rec;
if (info == XLOG_SEQ_LOG)
strcat(buf, "log: ");
else
{
strcat(buf, "UNKNOWN");
return;
}
sprintf(buf + strlen(buf), "node %u/%u",
2001-03-22 05:01:46 +01:00
xlrec->node.tblNode, xlrec->node.relNode);
}