postgresql/src/include/libpq/libpq-be-fe-helpers.h

458 lines
12 KiB
C

/*-------------------------------------------------------------------------
*
* libpq-be-fe-helpers.h
* Helper functions for using libpq in extensions
*
* Code built directly into the backend is not allowed to link to libpq
* directly. Extension code is allowed to use libpq however. However, libpq
* used in extensions has to be careful not to block inside libpq, otherwise
* interrupts will not be processed, leading to issues like unresolvable
* deadlocks. Backend code also needs to take care to acquire/release an
* external fd for the connection, otherwise fd.c's accounting of fd's is
* broken.
*
* This file provides helper functions to make it easier to comply with these
* rules. It is a header only library as it needs to be linked into each
* extension using libpq, and it seems too small to be worth adding a
* dedicated static library for.
*
* TODO: For historical reasons the connections established here are not put
* into non-blocking mode. That can lead to blocking even when only the async
* libpq functions are used. This should be fixed.
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/libpq/libpq-be-fe-helpers.h
*
*-------------------------------------------------------------------------
*/
#ifndef LIBPQ_BE_FE_HELPERS_H
#define LIBPQ_BE_FE_HELPERS_H
/*
* Despite the name, BUILDING_DLL is set only when building code directly part
* of the backend. Which also is where libpq isn't allowed to be
* used. Obviously this doesn't protect against libpq-fe.h getting included
* otherwise, but perhaps still protects against a few mistakes...
*/
#ifdef BUILDING_DLL
#error "libpq may not be used code directly built into the backend"
#endif
#include "libpq-fe.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
static inline void libpqsrv_connect_prepare(void);
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
/*
* PQconnectdb() wrapper that reserves a file descriptor and processes
* interrupts during connection establishment.
*
* Throws an error if AcquireExternalFD() fails, but does not throw if
* connection establishment itself fails. Callers need to use PQstatus() to
* check if connection establishment succeeded.
*/
static inline PGconn *
libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
{
PGconn *conn = NULL;
libpqsrv_connect_prepare();
conn = PQconnectStart(conninfo);
libpqsrv_connect_internal(conn, wait_event_info);
return conn;
}
/*
* Like libpqsrv_connect(), except that this is a wrapper for
* PQconnectdbParams().
*/
static inline PGconn *
libpqsrv_connect_params(const char *const *keywords,
const char *const *values,
int expand_dbname,
uint32 wait_event_info)
{
PGconn *conn = NULL;
libpqsrv_connect_prepare();
conn = PQconnectStartParams(keywords, values, expand_dbname);
libpqsrv_connect_internal(conn, wait_event_info);
return conn;
}
/*
* PQfinish() wrapper that additionally releases the reserved file descriptor.
*
* It is allowed to call this with a NULL pgconn iff NULL was returned by
* libpqsrv_connect*.
*/
static inline void
libpqsrv_disconnect(PGconn *conn)
{
/*
* If no connection was established, we haven't reserved an FD for it (or
* already released it). This rule makes it easier to write PG_CATCH()
* handlers for this facility's users.
*
* See also libpqsrv_connect_internal().
*/
if (conn == NULL)
return;
ReleaseExternalFD();
PQfinish(conn);
}
/* internal helper functions follow */
/*
* Helper function for all connection establishment functions.
*/
static inline void
libpqsrv_connect_prepare(void)
{
/*
* We must obey fd.c's limit on non-virtual file descriptors. Assume that
* a PGconn represents one long-lived FD. (Doing this here also ensures
* that VFDs are closed if needed to make room.)
*/
if (!AcquireExternalFD())
{
#ifndef WIN32 /* can't write #if within ereport() macro */
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("could not establish connection"),
errdetail("There are too many open files on the local server."),
errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
#else
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("could not establish connection"),
errdetail("There are too many open files on the local server."),
errhint("Raise the server's max_files_per_process setting.")));
#endif
}
}
/*
* Helper function for all connection establishment functions.
*/
static inline void
libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
{
/*
* With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
* that here.
*/
if (conn == NULL)
{
ReleaseExternalFD();
return;
}
/*
* Can't wait without a socket. Note that we don't want to close the libpq
* connection yet, so callers can emit a useful error.
*/
if (PQstatus(conn) == CONNECTION_BAD)
return;
/*
* WaitLatchOrSocket() can conceivably fail, handle that case here instead
* of requiring all callers to do so.
*/
PG_TRY();
{
PostgresPollingStatusType status;
/*
* Poll connection until we have OK or FAILED status.
*
* Per spec for PQconnectPoll, first wait till socket is write-ready.
*/
status = PGRES_POLLING_WRITING;
while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
{
int io_flag;
int rc;
if (status == PGRES_POLLING_READING)
io_flag = WL_SOCKET_READABLE;
#ifdef WIN32
/*
* Windows needs a different test while waiting for
* connection-made
*/
else if (PQstatus(conn) == CONNECTION_STARTED)
io_flag = WL_SOCKET_CONNECTED;
#endif
else
io_flag = WL_SOCKET_WRITEABLE;
rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
PQsocket(conn),
0,
wait_event_info);
/* Interrupted? */
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
/* If socket is ready, advance the libpq state machine */
if (rc & io_flag)
status = PQconnectPoll(conn);
}
}
PG_CATCH();
{
/*
* If an error is thrown here, the callers won't call
* libpqsrv_disconnect() with a conn, so release resources
* immediately.
*/
ReleaseExternalFD();
PQfinish(conn);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* PQexec() wrapper that processes interrupts.
*
* Unless PQsetnonblocking(conn, 1) is in effect, this can't process
* interrupts while pushing the query text to the server. Consider that
* setting if query strings can be long relative to TCP buffer size.
*
* This has the preconditions of PQsendQuery(), not those of PQexec(). Most
* notably, PQexec() would silently discard any prior query results.
*/
static inline PGresult *
libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
{
if (!PQsendQuery(conn, query))
return NULL;
return libpqsrv_get_result_last(conn, wait_event_info);
}
/*
* PQexecParams() wrapper that processes interrupts.
*
* See notes at libpqsrv_exec().
*/
static inline PGresult *
libpqsrv_exec_params(PGconn *conn,
const char *command,
int nParams,
const Oid *paramTypes,
const char *const *paramValues,
const int *paramLengths,
const int *paramFormats,
int resultFormat,
uint32 wait_event_info)
{
if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
paramLengths, paramFormats, resultFormat))
return NULL;
return libpqsrv_get_result_last(conn, wait_event_info);
}
/*
* Like PQexec(), loop over PQgetResult() until it returns NULL or another
* terminal state. Return the last non-NULL result or the terminal state.
*/
static inline PGresult *
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
{
PGresult *volatile lastResult = NULL;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
for (;;)
{
/* Wait for, and collect, the next PGresult. */
PGresult *result;
result = libpqsrv_get_result(conn, wait_event_info);
if (result == NULL)
break; /* query is complete, or failure */
/*
* Emulate PQexec()'s behavior of returning the last result when
* there are many.
*/
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(conn) == CONNECTION_BAD)
break;
}
}
PG_CATCH();
{
PQclear(lastResult);
PG_RE_THROW();
}
PG_END_TRY();
return lastResult;
}
/*
* Perform the equivalent of PQgetResult(), but watch for interrupts.
*/
static inline PGresult *
libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
{
/*
* Collect data until PQgetResult is ready to get the result without
* blocking.
*/
while (PQisBusy(conn))
{
int rc;
rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
WL_SOCKET_READABLE,
PQsocket(conn),
0,
wait_event_info);
/* Interrupted? */
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
/* Consume whatever data is available from the socket */
if (PQconsumeInput(conn) == 0)
{
/* trouble; expect PQgetResult() to return NULL */
break;
}
}
/* Now we can collect and return the next PGresult */
return PQgetResult(conn);
}
/*
* Submit a cancel request to the given connection, waiting only until
* the given time.
*
* We sleep interruptibly until we receive confirmation that the cancel
* request has been accepted, and if it is, return NULL; if the cancel
* request fails, return an error message string (which is not to be
* freed).
*
* For other problems (to wit: OOM when strdup'ing an error message from
* libpq), this function can ereport(ERROR).
*
* Note: this function leaks a string's worth of memory when reporting
* libpq errors. Make sure to call it in a transient memory context.
*/
static inline char *
libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
{
PGcancelConn *cancel_conn;
char *error = NULL;
cancel_conn = PQcancelCreate(conn);
if (cancel_conn == NULL)
return _("out of memory");
/* In what follows, do not leak any PGcancelConn on any errors. */
PG_TRY();
{
if (!PQcancelStart(cancel_conn))
{
error = pchomp(PQcancelErrorMessage(cancel_conn));
goto exit;
}
for (;;)
{
PostgresPollingStatusType pollres;
TimestampTz now;
long cur_timeout;
int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
pollres = PQcancelPoll(cancel_conn);
if (pollres == PGRES_POLLING_OK)
break; /* success! */
/* If timeout has expired, give up, else get sleep time. */
now = GetCurrentTimestamp();
cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
if (cur_timeout <= 0)
{
error = _("cancel request timed out");
break;
}
switch (pollres)
{
case PGRES_POLLING_READING:
waitEvents |= WL_SOCKET_READABLE;
break;
case PGRES_POLLING_WRITING:
waitEvents |= WL_SOCKET_WRITEABLE;
break;
default:
error = pchomp(PQcancelErrorMessage(cancel_conn));
goto exit;
}
/* Sleep until there's something to do */
WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
cur_timeout, PG_WAIT_CLIENT);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
exit: ;
}
PG_FINALLY();
{
PQcancelFinish(cancel_conn);
}
PG_END_TRY();
return error;
}
#endif /* LIBPQ_BE_FE_HELPERS_H */