pgbench: Add \syncpipeline

This change adds a new meta-command called \syncpipeline to pgbench,
able to send a sync message without flushing using the new libpq
function PQsendPipelineSync().

This meta-command is available within a block made of \startpipeline and
\endpipeline.

Author: Anthonin Bonnefoy
Discussion: https://postgr.es/m/CAO6_XqpcNhW6LZHLF-2NpPzdTbyMm4-RVkr3+AP5cOKSm9hrWA@mail.gmail.com
This commit is contained in:
Michael Paquier 2024-01-24 16:55:19 +09:00
parent faa2b953ba
commit 94edfe250c
3 changed files with 70 additions and 6 deletions

View File

@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
<varlistentry id="pgbench-metacommand-pipeline">
<term><literal>\startpipeline</literal></term>
<term><literal>\syncpipeline</literal></term>
<term><literal>\endpipeline</literal></term>
<listitem>
<para>
These commands delimit the start and end of a pipeline of SQL
statements. In pipeline mode, statements are sent to the server
without waiting for the results of previous statements. See
This group of commands implements pipelining of SQL statements.
A pipeline must begin with a <command>\startpipeline</command>
and end with an <command>\endpipeline</command>. In between there
may be any number of <command>\syncpipeline</command> commands,
which sends a <link linkend="protocol-flow-ext-query">sync message</link>
without ending the ongoing pipeline and flushing the send buffer.
In pipeline mode, statements are sent to the server without waiting
for the results of previous statements. See
<xref linkend="libpq-pipeline-mode"/> for more details.
Pipeline mode requires the use of extended query protocol.
</para>

View File

@ -608,6 +608,7 @@ typedef struct
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
int num_syncs; /* number of ongoing sync commands */
/* client variables */
Variables variables;
@ -697,6 +698,7 @@ typedef enum MetaCommand
META_ELSE, /* \else */
META_ENDIF, /* \endif */
META_STARTPIPELINE, /* \startpipeline */
META_SYNCPIPELINE, /* \syncpipeline */
META_ENDPIPELINE, /* \endpipeline */
} MetaCommand;
@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
mc = META_ASET;
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
mc = META_STARTPIPELINE;
else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
mc = META_SYNCPIPELINE;
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
mc = META_ENDPIPELINE;
else
@ -3317,8 +3321,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
break;
case PGRES_PIPELINE_SYNC:
pg_log_debug("client %d pipeline ending", st->id);
if (PQexitPipelineMode(st->con) != 1)
pg_log_debug("client %d pipeline ending, ongoing syncs: %d",
st->id, st->num_syncs);
st->num_syncs--;
if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
PQerrorMessage(st->con));
break;
@ -4449,6 +4455,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED;
}
}
else if (command->meta == META_SYNCPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
{
commandFailed(st, "syncpipeline", "not in pipeline mode");
return CSTATE_ABORTED;
}
if (PQsendPipelineSync(st->con) == 0)
{
commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
st->num_syncs++;
}
else if (command->meta == META_ENDPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@ -4461,6 +4481,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
st->num_syncs++;
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
/* collect pending results before getting out of pipeline mode */
return CSTATE_WAIT_RESULT;
@ -5794,7 +5815,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
}
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
my_command->meta == META_STARTPIPELINE ||
my_command->meta == META_ENDPIPELINE)
my_command->meta == META_ENDPIPELINE ||
my_command->meta == META_SYNCPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],

View File

@ -814,6 +814,27 @@ $node->pgbench(
}
});
# Working \startpipeline with \syncpipeline
$node->pgbench(
'-t 1 -n -M extended',
0,
[ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
[],
'working \startpipeline with \syncpipeline',
{
'001_pgbench_pipeline_sync' => q{
-- test startpipeline
\startpipeline
select 1;
\syncpipeline
\syncpipeline
select 2;
\syncpipeline
select 3;
\endpipeline
}
});
# Working \startpipeline in prepared query mode
$node->pgbench(
'-t 1 -n -M prepared',
@ -904,6 +925,21 @@ $node->pgbench(
}
});
# Try \startpipeline with \syncpipeline without \endpipeline
$node->pgbench(
'-t 2 -n -M extended',
2,
[],
[qr{end of script reached with pipeline open}],
'error: call \startpipeline and \syncpipeline without \endpipeline',
{
'001_pgbench_pipeline_7' => q{
-- startpipeline with \syncpipeline only
\startpipeline
\syncpipeline
}
});
# Working \startpipeline in prepared query mode with serializable
$node->pgbench(
'-c4 -t 10 -n -M prepared',