diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index a603ab4a38..70e132f4e5 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -34,6 +34,7 @@ #include "lib/stringinfo.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "postmaster/fork_process.h" #include "postmaster/postmaster.h" @@ -93,11 +94,14 @@ static char *last_file_name = NULL; static char *last_csv_file_name = NULL; /* - * Buffers for saving partial messages from different backends. We don't expect - * that there will be very many outstanding at one time, so 20 seems plenty of - * leeway. If this array gets full we won't lose messages, but we will lose - * the protocol protection against them being partially written or interleaved. + * Buffers for saving partial messages from different backends. * + * Keep NBUFFER_LISTS lists of these, with the entry for a given source pid + * being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on + * the number of entries we have to examine for any one incoming message. + * There must never be more than one entry for the same source pid. + * + * An inactive buffer is not removed from its list, just held for re-use. * An inactive buffer has pid == 0 and undefined contents of data. */ typedef struct @@ -106,8 +110,8 @@ typedef struct StringInfoData data; /* accumulated data, as a StringInfo */ } save_buffer; -#define CHUNK_SLOTS 20 -static save_buffer saved_chunks[CHUNK_SLOTS]; +#define NBUFFER_LISTS 256 +static List *buffer_lists[NBUFFER_LISTS]; /* These must be exported for EXEC_BACKEND case ... annoying */ #ifndef WIN32 @@ -592,7 +596,7 @@ SysLogger_Start(void) * Now we are done with the write end of the pipe. * CloseHandle() must not be called because the preceding * close() closes the underlying handle. - */ + */ syslogPipe[1] = 0; #endif redirection_done = true; @@ -734,6 +738,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) (p.is_last == 't' || p.is_last == 'f' || p.is_last == 'T' || p.is_last == 'F')) { + List *buffer_list; + ListCell *cell; + save_buffer *existing_slot = NULL, + *free_slot = NULL; + StringInfo str; + chunklen = PIPE_HEADER_SIZE + p.len; /* Fall out of loop if we don't have the whole chunk yet */ @@ -743,52 +753,53 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) dest = (p.is_last == 'T' || p.is_last == 'F') ? LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR; + /* Locate any existing buffer for this source pid */ + buffer_list = buffer_lists[p.pid % NBUFFER_LISTS]; + foreach(cell, buffer_list) + { + save_buffer *buf = (save_buffer *) lfirst(cell); + + if (buf->pid == p.pid) + { + existing_slot = buf; + break; + } + if (buf->pid == 0 && free_slot == NULL) + free_slot = buf; + } + if (p.is_last == 'f' || p.is_last == 'F') { /* - * Save a complete non-final chunk in the per-pid buffer if - * possible - if not just write it out. + * Save a complete non-final chunk in a per-pid buffer */ - int free_slot = -1, - existing_slot = -1; - int i; - StringInfo str; - - for (i = 0; i < CHUNK_SLOTS; i++) + if (existing_slot != NULL) { - if (saved_chunks[i].pid == p.pid) - { - existing_slot = i; - break; - } - if (free_slot < 0 && saved_chunks[i].pid == 0) - free_slot = i; - } - if (existing_slot >= 0) - { - str = &(saved_chunks[existing_slot].data); - appendBinaryStringInfo(str, - cursor + PIPE_HEADER_SIZE, - p.len); - } - else if (free_slot >= 0) - { - saved_chunks[free_slot].pid = p.pid; - str = &(saved_chunks[free_slot].data); - initStringInfo(str); + /* Add chunk to data from preceding chunks */ + str = &(existing_slot->data); appendBinaryStringInfo(str, cursor + PIPE_HEADER_SIZE, p.len); } else { - /* - * If there is no free slot we'll just have to take our - * chances and write out a partial message and hope that - * it's not followed by something from another pid. - */ - write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len, - dest); + /* First chunk of message, save in a new buffer */ + if (free_slot == NULL) + { + /* + * Need a free slot, but there isn't one in the list, + * so create a new one and extend the list with it. + */ + free_slot = palloc(sizeof(save_buffer)); + buffer_list = lappend(buffer_list, free_slot); + buffer_lists[p.pid % NBUFFER_LISTS] = buffer_list; + } + free_slot->pid = p.pid; + str = &(free_slot->data); + initStringInfo(str); + appendBinaryStringInfo(str, + cursor + PIPE_HEADER_SIZE, + p.len); } } else @@ -797,26 +808,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) * Final chunk --- add it to anything saved for that pid, and * either way write the whole thing out. */ - int existing_slot = -1; - int i; - StringInfo str; - - for (i = 0; i < CHUNK_SLOTS; i++) + if (existing_slot != NULL) { - if (saved_chunks[i].pid == p.pid) - { - existing_slot = i; - break; - } - } - if (existing_slot >= 0) - { - str = &(saved_chunks[existing_slot].data); + str = &(existing_slot->data); appendBinaryStringInfo(str, cursor + PIPE_HEADER_SIZE, p.len); write_syslogger_file(str->data, str->len, dest); - saved_chunks[existing_slot].pid = 0; + /* Mark the buffer unused, and reclaim string storage */ + existing_slot->pid = 0; pfree(str->data); } else @@ -872,17 +872,27 @@ static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer) { int i; - StringInfo str; /* Dump any incomplete protocol messages */ - for (i = 0; i < CHUNK_SLOTS; i++) + for (i = 0; i < NBUFFER_LISTS; i++) { - if (saved_chunks[i].pid != 0) + List *list = buffer_lists[i]; + ListCell *cell; + + foreach(cell, list) { - str = &(saved_chunks[i].data); - write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR); - saved_chunks[i].pid = 0; - pfree(str->data); + save_buffer *buf = (save_buffer *) lfirst(cell); + + if (buf->pid != 0) + { + StringInfo str = &(buf->data); + + write_syslogger_file(str->data, str->len, + LOG_DESTINATION_STDERR); + /* Mark the buffer unused, and reclaim string storage */ + buf->pid = 0; + pfree(str->data); + } } }