Avoid needless large memcpys in libpq socket writing

Until now, when calling pq_putmessage to write new data to a libpq
socket, all writes are copied into a buffer and that buffer gets flushed
when full to avoid having to perform small writes to the socket.

There are cases where we must write large amounts of data to the socket,
sometimes larger than the size of the buffer.  In this case, it's
wasteful to memcpy this data into the buffer and flush it out, instead,
we can send it directly from the memory location that the data is already
stored in.

Here we adjust internal_putbytes() so that after having just flushed the
buffer to the socket, if the remaining bytes to send is as big or bigger
than the buffer size, we just send directly rather than needlessly
copying into the PqSendBuffer buffer first.

Examples of operations that write large amounts of data in one message
are; outputting large tuples with SELECT or COPY TO STDOUT and
pg_basebackup.

Author: Melih Mutlu
Reviewed-by: Heikki Linnakangas
Reviewed-by: Jelte Fennema-Nio
Reviewed-by: David Rowley
Reviewed-by: Ranier Vilela
Reviewed-by: Andres Freund
Discussion: https://postgr.es/m/CAGPVpCR15nosj0f6xe-c2h477zFR88q12e6WjEoEZc8ZYkTh3Q@mail.gmail.com
This commit is contained in:
David Rowley 2024-04-07 21:20:18 +12:00
parent a97bbe1f1d
commit c4ab7da606
1 changed files with 53 additions and 21 deletions

View File

@ -120,8 +120,8 @@ static List *sock_paths = NIL;
static char *PqSendBuffer;
static int PqSendBufferSize; /* Size send buffer */
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */
static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
@ -143,8 +143,10 @@ static int socket_flush_if_writable(void);
static bool socket_is_send_pending(void);
static int socket_putmessage(char msgtype, const char *s, size_t len);
static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void);
static inline int internal_putbytes(const char *s, size_t len);
static inline int internal_flush(void);
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start,
size_t *end);
static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
static int Setup_AF_UNIX(const char *sock_path);
@ -1268,11 +1270,9 @@ pq_getmessage(StringInfo s, int maxlen)
}
static int
static inline int
internal_putbytes(const char *s, size_t len)
{
size_t amount;
while (len > 0)
{
/* If buffer is full, then flush it out */
@ -1282,14 +1282,33 @@ internal_putbytes(const char *s, size_t len)
if (internal_flush())
return EOF;
}
amount = PqSendBufferSize - PqSendPointer;
if (amount > len)
amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount);
PqSendPointer += amount;
s += amount;
len -= amount;
/*
* If the buffer is empty and data length is larger than the buffer
* size, send it without buffering. Otherwise, copy as much data as
* possible into the buffer.
*/
if (len >= PqSendBufferSize && PqSendStart == PqSendPointer)
{
size_t start = 0;
socket_set_nonblocking(false);
if (internal_flush_buffer(s, &start, &len))
return EOF;
}
else
{
size_t amount = PqSendBufferSize - PqSendPointer;
if (amount > len)
amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount);
PqSendPointer += amount;
s += amount;
len -= amount;
}
}
return 0;
}
@ -1321,19 +1340,32 @@ socket_flush(void)
* and the socket is in non-blocking mode), or EOF if trouble.
* --------------------------------
*/
static int
static inline int
internal_flush(void)
{
return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer);
}
/* --------------------------------
* internal_flush_buffer - flush the given buffer content
*
* Returns 0 if OK (meaning everything was sent, or operation would block
* and the socket is in non-blocking mode), or EOF if trouble.
* --------------------------------
*/
static pg_noinline int
internal_flush_buffer(const char *buf, size_t *start, size_t *end)
{
static int last_reported_send_errno = 0;
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
const char *bufptr = buf + *start;
const char *bufend = buf + *end;
while (bufptr < bufend)
{
int r;
r = secure_write(MyProcPort, bufptr, bufend - bufptr);
r = secure_write(MyProcPort, (char *) bufptr, bufend - bufptr);
if (r <= 0)
{
@ -1373,7 +1405,7 @@ internal_flush(void)
* flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
* the connection.
*/
PqSendStart = PqSendPointer = 0;
*start = *end = 0;
ClientConnectionLost = 1;
InterruptPending = 1;
return EOF;
@ -1381,10 +1413,10 @@ internal_flush(void)
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
PqSendStart += r;
*start += r;
}
PqSendStart = PqSendPointer = 0;
*start = *end = 0;
return 0;
}