Refactor the syslogger pipe protocol to use a bitmask for its options

The previous protocol expected a set of matching characters to check if
a message sent was the last one or not, that changed depending on the
destination wanted:
- 't' and 'f' tracked the last message of a log sent to stderr.
- 'T' and 'F' tracked the last message of a log sent to csvlog.

This could be extended with more characters when introducing new
destinations, but using a bitmask is much more elegant.  This commit
changes the protocol so as a bitmask is used in the header of a log
chunk message sent to the syslogger, with the following options
available for now:
- log_destination as stderr.
- log_destination as csvlog.
- if a message is the last chunk of a message.

Sehrope found this issue in a patch set to introduce JSON as an option
for log_destination, but his patch made the size of the protocol header
larger.  This commit keeps the same size as the original, and adapts the
protocol as wanted.

Thanks also to Andrew Dunstan and Greg Stark for the discussion.

Author: Michael Paquier, Sehrope Sarkuni
Discussion: https://postgr.es/m/CAH7T-aqswBM6JWe4pDehi1uOiufqe06DJWaU5=X7dDLyqUExHg@mail.gmail.com
This commit is contained in:
Michael Paquier 2021-09-13 09:03:45 +09:00
parent e757080e04
commit 2d77d83540
3 changed files with 27 additions and 9 deletions

View File

@ -38,6 +38,7 @@
#include "nodes/pg_list.h"
#include "pgstat.h"
#include "pgtime.h"
#include "port/pg_bitutils.h"
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
@ -885,14 +886,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
{
PipeProtoHeader p;
int chunklen;
bits8 dest_flags;
/* Do we have a valid header? */
memcpy(&p, cursor, offsetof(PipeProtoHeader, data));
dest_flags = p.flags & (PIPE_PROTO_DEST_STDERR | PIPE_PROTO_DEST_CSVLOG);
if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
p.pid != 0 &&
(p.is_last == 't' || p.is_last == 'f' ||
p.is_last == 'T' || p.is_last == 'F'))
pg_popcount((char *) &dest_flags, 1) == 1)
{
List *buffer_list;
ListCell *cell;
@ -906,8 +908,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
if (count < chunklen)
break;
dest = (p.is_last == 'T' || p.is_last == 'F') ?
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
if ((p.flags & PIPE_PROTO_DEST_STDERR) != 0)
dest = LOG_DESTINATION_STDERR;
else if ((p.flags & PIPE_PROTO_DEST_CSVLOG) != 0)
dest = LOG_DESTINATION_CSVLOG;
else
{
/* this should never happen as of the header validation */
Assert(false);
}
/* Locate any existing buffer for this source pid */
buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
@ -924,7 +933,7 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
free_slot = buf;
}
if (p.is_last == 'f' || p.is_last == 'F')
if ((p.flags & PIPE_PROTO_IS_LAST) == 0)
{
/*
* Save a complete non-final chunk in a per-pid buffer

View File

@ -3250,11 +3250,16 @@ write_pipe_chunks(char *data, int len, int dest)
p.proto.nuls[0] = p.proto.nuls[1] = '\0';
p.proto.pid = MyProcPid;
p.proto.flags = 0;
if (dest == LOG_DESTINATION_STDERR)
p.proto.flags |= PIPE_PROTO_DEST_STDERR;
else if (dest == LOG_DESTINATION_CSVLOG)
p.proto.flags |= PIPE_PROTO_DEST_CSVLOG;
/* write all but the last chunk */
while (len > PIPE_MAX_PAYLOAD)
{
p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'F' : 'f');
/* no need to set PIPE_PROTO_IS_LAST yet */
p.proto.len = PIPE_MAX_PAYLOAD;
memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
rc = write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
@ -3264,7 +3269,7 @@ write_pipe_chunks(char *data, int len, int dest)
}
/* write the last chunk */
p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'T' : 't');
p.proto.flags |= PIPE_PROTO_IS_LAST;
p.proto.len = len;
memcpy(p.proto.data, data, len);
rc = write(fd, &p, PIPE_HEADER_SIZE + len);

View File

@ -46,8 +46,7 @@ typedef struct
char nuls[2]; /* always \0\0 */
uint16 len; /* size of this chunk (counts data only) */
int32 pid; /* writer's pid */
char is_last; /* last chunk of message? 't' or 'f' ('T' or
* 'F' for CSV case) */
bits8 flags; /* bitmask of PIPE_PROTO_* */
char data[FLEXIBLE_ARRAY_MEMBER]; /* data payload starts here */
} PipeProtoHeader;
@ -60,6 +59,11 @@ typedef union
#define PIPE_HEADER_SIZE offsetof(PipeProtoHeader, data)
#define PIPE_MAX_PAYLOAD ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
/* flag bits for PipeProtoHeader->flags */
#define PIPE_PROTO_IS_LAST 0x01 /* last chunk of message? */
/* log destinations */
#define PIPE_PROTO_DEST_STDERR 0x10
#define PIPE_PROTO_DEST_CSVLOG 0x20
/* GUC options */
extern bool Logging_collector;