Sample postgres_fdw tables remotely during ANALYZE

When collecting ANALYZE sample on foreign tables, postgres_fdw fetched
all rows and performed the sampling locally. For large tables this means
transferring and immediately discarding large amounts of data.

This commit allows the sampling to be performed on the remote server,
transferring only the much smaller sample. The sampling is performed
using the built-in TABLESAMPLE methods (system, bernoulli) or random()
function, depending on the remote server version.

Remote sampling can be enabled by analyze_sampling on the foreign server
and/or foreign table, with supported values 'off', 'auto', 'system',
'bernoulli' and 'random'. The default value is 'auto' which uses either
'bernoulli' (TABLESAMPLE method) or 'random' (for remote servers without
TABLESAMPLE support).
This commit is contained in:
Tomas Vondra 2022-12-30 23:14:53 +01:00
parent 02699bc1fd
commit 8ad51b5f44
7 changed files with 397 additions and 9 deletions

View File

@ -2367,14 +2367,57 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
appendStringInfo(buf, "::pg_catalog.regclass) / %d", BLCKSZ);
}
/*
* Construct SELECT statement to acquire the number of rows of a relation.
*
* Note: we just return the remote server's reltuples value, which might
* be off a good deal, but it doesn't seem worth working harder. See
* comments in postgresAcquireSampleRowsFunc.
*/
void
deparseAnalyzeTuplesSql(StringInfo buf, Relation rel)
{
StringInfoData relname;
/* We'll need the remote relation name as a literal. */
initStringInfo(&relname);
deparseRelation(&relname, rel);
appendStringInfoString(buf, "SELECT reltuples FROM pg_catalog.pg_class WHERE oid = ");
deparseStringLiteral(buf, relname.data);
appendStringInfoString(buf, "::pg_catalog.regclass");
}
/*
* Construct SELECT statement to acquire sample rows of given relation.
*
* SELECT command is appended to buf, and list of columns retrieved
* is returned to *retrieved_attrs.
*
* We only support sampling methods we can decide based on server version.
* Allowing custom TSM modules (like tsm_system_rows) might be useful, but it
* would require detecting which extensions are installed, to allow automatic
* fall-back. Moreover, the methods may use different parameters like number
* of rows (and not sampling rate). So we leave this for future improvements.
*
* Using random() to sample rows on the remote server has the advantage that
* this works on all PostgreSQL versions (unlike TABLESAMPLE), and that it
* does the sampling on the remote side (without transferring everything and
* then discarding most rows).
*
* The disadvantage is that we still have to read all rows and evaluate the
* random(), while TABLESAMPLE (at least with the "system" method) may skip.
* It's not that different from the "bernoulli" method, though.
*
* We could also do "ORDER BY random() LIMIT x", which would always pick
* the expected number of rows, but it requires sorting so it may be much
* more expensive (particularly on large tables, which is what what the
* remote sampling is meant to improve).
*/
void
deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
deparseAnalyzeSql(StringInfo buf, Relation rel,
PgFdwSamplingMethod sample_method, double sample_frac,
List **retrieved_attrs)
{
Oid relid = RelationGetRelid(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
@ -2422,10 +2465,35 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
appendStringInfoString(buf, "NULL");
/*
* Construct FROM clause
* Construct FROM clause, and perhaps WHERE clause too, depending on the
* selected sampling method.
*/
appendStringInfoString(buf, " FROM ");
deparseRelation(buf, rel);
switch (sample_method)
{
case ANALYZE_SAMPLE_OFF:
/* nothing to do here */
break;
case ANALYZE_SAMPLE_RANDOM:
appendStringInfo(buf, " WHERE pg_catalog.random() < %f", sample_frac);
break;
case ANALYZE_SAMPLE_SYSTEM:
appendStringInfo(buf, " TABLESAMPLE SYSTEM(%f)", (100.0 * sample_frac));
break;
case ANALYZE_SAMPLE_BERNOULLI:
appendStringInfo(buf, " TABLESAMPLE BERNOULLI(%f)", (100.0 * sample_frac));
break;
case ANALYZE_SAMPLE_AUTO:
/* should have been resolved into actual method */
elog(ERROR, "unexpected sampling method");
break;
}
}
/*

View File

@ -11779,3 +11779,28 @@ SELECT * FROM prem2;
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
-- ===================================================================
-- test for ANALYZE sampling
-- ===================================================================
CREATE TABLE analyze_table (id int, a text, b bigint);
CREATE FOREIGN TABLE analyze_ftable (id int, a text, b bigint)
SERVER loopback OPTIONS (table_name 'analyze_rtable1');
INSERT INTO analyze_table (SELECT x FROM generate_series(1,1000) x);
ANALYZE analyze_table;
SET default_statistics_target = 10;
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (analyze_sampling 'invalid');
ERROR: invalid value for string option "analyze_sampling": invalid
ALTER SERVER loopback OPTIONS (analyze_sampling 'auto');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'system');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'bernoulli');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'random');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'off');
ANALYZE analyze_table;
-- cleanup
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;

View File

@ -210,6 +210,23 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
errmsg("sslcert and sslkey are superuser-only"),
errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser.")));
}
else if (strcmp(def->defname, "analyze_sampling") == 0)
{
char *value;
value = defGetString(def);
/* we recognize off/auto/random/system/bernoulli */
if (strcmp(value, "off") != 0 &&
strcmp(value, "auto") != 0 &&
strcmp(value, "random") != 0 &&
strcmp(value, "system") != 0 &&
strcmp(value, "bernoulli") != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for string option \"%s\": %s",
def->defname, value)));
}
}
PG_RETURN_VOID();
@ -257,6 +274,10 @@ InitPgFdwOptions(void)
{"keep_connections", ForeignServerRelationId, false},
{"password_required", UserMappingRelationId, false},
/* sampling is available on both server and table */
{"analyze_sampling", ForeignServerRelationId, false},
{"analyze_sampling", ForeignTableRelationId, false},
/*
* sslcert and sslkey are in fact libpq options, but we repeat them
* here to allow them to appear in both foreign server context (when

View File

@ -4973,11 +4973,60 @@ postgresAnalyzeForeignTable(Relation relation,
return true;
}
/*
* postgresCountTuplesForForeignTable
* Count tuples in foreign table (just get pg_class.reltuples).
*/
static double
postgresCountTuplesForForeignTable(Relation relation)
{
ForeignTable *table;
UserMapping *user;
PGconn *conn;
StringInfoData sql;
PGresult *volatile res = NULL;
volatile double reltuples = -1;
/*
* Get the connection to use. We do the remote access as the table's
* owner, even if the ANALYZE was started by some other user.
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
*/
initStringInfo(&sql);
deparseAnalyzeTuplesSql(&sql, relation);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
if (PQntuples(res) != 1 || PQnfields(res) != 1)
elog(ERROR, "unexpected result from deparseAnalyzeTuplesSql query");
reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
}
PG_FINALLY();
{
if (res)
PQclear(res);
}
PG_END_TRY();
ReleaseConnection(conn);
return reltuples;
}
/*
* Acquire a random sample of rows from foreign table managed by postgres_fdw.
*
* We fetch the whole table from the remote side and pick out some sample rows.
*
* Selected rows are returned in the caller-allocated array rows[],
* which must have at least targrows entries.
* The actual number of rows selected is returned as the function result.
@ -5000,9 +5049,14 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
ForeignServer *server;
UserMapping *user;
PGconn *conn;
int server_version_num;
PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
double sample_frac = -1.0;
double reltuples;
unsigned int cursor_number;
StringInfoData sql;
PGresult *volatile res = NULL;
ListCell *lc;
/* Initialize workspace state */
astate.rel = relation;
@ -5030,20 +5084,147 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
conn = GetConnection(user, false, NULL);
/* We'll need server version, so fetch it now. */
server_version_num = PQserverVersion(conn);
/*
* What sampling method should we use?
*/
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "analyze_sampling") == 0)
{
char *value = defGetString(def);
if (strcmp(value, "off") == 0)
method = ANALYZE_SAMPLE_OFF;
else if (strcmp(value, "auto") == 0)
method = ANALYZE_SAMPLE_AUTO;
else if (strcmp(value, "random") == 0)
method = ANALYZE_SAMPLE_RANDOM;
else if (strcmp(value, "system") == 0)
method = ANALYZE_SAMPLE_SYSTEM;
else if (strcmp(value, "bernoulli") == 0)
method = ANALYZE_SAMPLE_BERNOULLI;
break;
}
}
foreach(lc, table->options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "analyze_sampling") == 0)
{
char *value = defGetString(def);
if (strcmp(value, "off") == 0)
method = ANALYZE_SAMPLE_OFF;
else if (strcmp(value, "auto") == 0)
method = ANALYZE_SAMPLE_AUTO;
else if (strcmp(value, "random") == 0)
method = ANALYZE_SAMPLE_RANDOM;
else if (strcmp(value, "system") == 0)
method = ANALYZE_SAMPLE_SYSTEM;
else if (strcmp(value, "bernoulli") == 0)
method = ANALYZE_SAMPLE_BERNOULLI;
break;
}
}
/*
* Error-out if explicitly required one of the TABLESAMPLE methods, but
* the server does not support it.
*/
if ((server_version_num < 95000) &&
(method == ANALYZE_SAMPLE_SYSTEM ||
method == ANALYZE_SAMPLE_BERNOULLI))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("remote server does not support TABLESAMPLE feature")));
/*
* For "auto" method, pick the one we believe is best. For servers with
* TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
* random() to at least reduce network transfer.
*/
if (method == ANALYZE_SAMPLE_AUTO)
{
if (server_version_num < 95000)
method = ANALYZE_SAMPLE_RANDOM;
else
method = ANALYZE_SAMPLE_BERNOULLI;
}
/*
* If we've decided to do remote sampling, calculate the sampling rate. We
* need to get the number of tuples from the remote server, but skip that
* network round-trip if not needed.
*/
if (method != ANALYZE_SAMPLE_OFF)
{
reltuples = postgresCountTuplesForForeignTable(relation);
/*
* Remote's reltuples could be 0 or -1 if the table has never been
* vacuumed/analyzed. In that case, disable sampling after all.
*/
if ((reltuples <= 0) || (targrows >= reltuples))
method = ANALYZE_SAMPLE_OFF;
else
{
/*
* All supported sampling methods require sampling rate,
* not target rows directly, so we calculate that using
* the remote reltuples value. That's imperfect, because
* it might be off a good deal, but that's not something
* we can (or should) address here.
*
* If reltuples is too low (i.e. when table grew), we'll
* end up sampling more rows - but then we'll apply the
* local sampling, so we get the expected sample size.
* This is the same outcome as without remote sampling.
*
* If reltuples is too high (e.g. after bulk DELETE), we
* will end up sampling too few rows.
*
* We can't really do much better here - we could try
* sampling a bit more rows, but we don't know how off
* the reltuples value is so how much is "a bit more"?
*
* Furthermore, the targrows value for partitions is
* determined based on table size (relpages), which can
* be off in different ways too. Adjusting the sampling
* rate here might make the issue worse.
*/
sample_frac = targrows / reltuples;
/*
* Ensure the sampling rate is between 0.0 and 1.0, even after the
* 10% adjustment above. (Clamping to 0.0 is just paranoia.)
*/
sample_frac = Min(1.0, Max(0.0, sample_frac));
}
}
/*
* Construct cursor that retrieves whole rows from remote.
*/
cursor_number = GetCursorNumber(conn);
initStringInfo(&sql);
appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
char fetch_sql[64];
int fetch_size;
ListCell *lc;
res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@ -5130,8 +5311,15 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* We assume that we have no dead tuple. */
*totaldeadrows = 0.0;
/* We've retrieved all living tuples from foreign server. */
*totalrows = astate.samplerows;
/*
* Without sampling, we've retrieved all living tuples from foreign
* server, so report that as totalrows. Otherwise use the reltuples
* estimate we got from the remote side.
*/
if (method == ANALYZE_SAMPLE_OFF)
*totalrows = astate.samplerows;
else
*totalrows = reltuples;
/*
* Emit some interesting relation info
@ -5139,7 +5327,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
ereport(elevel,
(errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
RelationGetRelationName(relation),
astate.samplerows, astate.numrows)));
*totalrows, astate.numrows)));
return astate.numrows;
}

View File

@ -134,6 +134,18 @@ typedef struct PgFdwConnState
AsyncRequest *pendingAreq; /* pending async request */
} PgFdwConnState;
/*
* Method used by ANALYZE to sample remote rows.
*/
typedef enum PgFdwSamplingMethod
{
ANALYZE_SAMPLE_OFF, /* no remote sampling */
ANALYZE_SAMPLE_AUTO, /* choose by server version */
ANALYZE_SAMPLE_RANDOM, /* remote random() */
ANALYZE_SAMPLE_SYSTEM, /* TABLESAMPLE system */
ANALYZE_SAMPLE_BERNOULLI /* TABLESAMPLE bernoulli */
} PgFdwSamplingMethod;
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
@ -211,7 +223,10 @@ extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
List *returningList,
List **retrieved_attrs);
extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
extern void deparseAnalyzeTuplesSql(StringInfo buf, Relation rel);
extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
PgFdwSamplingMethod sample_method,
double sample_frac,
List **retrieved_attrs);
extern void deparseTruncateSql(StringInfo buf,
List *rels,

View File

@ -3938,3 +3938,39 @@ SELECT * FROM prem2;
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
-- ===================================================================
-- test for ANALYZE sampling
-- ===================================================================
CREATE TABLE analyze_table (id int, a text, b bigint);
CREATE FOREIGN TABLE analyze_ftable (id int, a text, b bigint)
SERVER loopback OPTIONS (table_name 'analyze_rtable1');
INSERT INTO analyze_table (SELECT x FROM generate_series(1,1000) x);
ANALYZE analyze_table;
SET default_statistics_target = 10;
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (analyze_sampling 'invalid');
ALTER SERVER loopback OPTIONS (analyze_sampling 'auto');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'system');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'bernoulli');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'random');
ANALYZE analyze_table;
ALTER SERVER loopback OPTIONS (SET analyze_sampling 'off');
ANALYZE analyze_table;
-- cleanup
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;

View File

@ -326,6 +326,41 @@ OPTIONS (ADD password_required 'false');
frequently updated, the local statistics will soon be obsolete.
</para>
<para>
The following option controls how such an <command>ANALYZE</command>
operation behaves:
</para>
<variablelist>
<varlistentry>
<term><literal>analyze_sampling</literal> (<type>text</type>)</term>
<listitem>
<para>
This option, which can be specified for a foreign table or a foreign
server, determines if <command>ANALYZE</command> on a foreign table
samples the data on the remote side, or reads and transfers all data
and performs the sampling locally. The supported values
are <literal>off</literal>, <literal>random</literal>,
<literal>system</literal>, <literal>bernoulli</literal>
and <literal>auto</literal>. <literal>off</literal> disables remote
sampling, so all data are transferred and sampled locally.
<literal>random</literal> performs remote sampling using the
<literal>random()</literal> function to choose returned rows,
while <literal>system</literal> and <literal>bernoulli</literal> rely
on the built-in <literal>TABLESAMPLE</literal> methods of those
names. <literal>random</literal> works on all remote server versions,
while <literal>TABLESAMPLE</literal> is supported only since 9.5.
<literal>auto</literal> (the default) picks the recommended sampling
method automatically; currently it means
either <literal>bernoulli</literal> or <literal>random</literal>
depending on the remote server version.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect3>
<sect3>