Use single-byte Boyer-Moore-Horspool search even with multibyte encodings.

The old implementation first converted the input strings to arrays of
wchars, and performed the conversion on those. However, the conversion is
expensive, and for a large input string, consumes a lot of memory.
Allocating the large arrays also meant that these functions could not be
used on strings larger 1 GB / pg_encoding_max_length() (256 MB for UTF-8).

Avoid the conversion, and instead use the single-byte algorithm even with
multibyte encodings. That can get fooled, if there is a matching byte
sequence in the middle of a multi-byte character, so to eliminate false
positives like that, we verify any matches by walking the string character
by character with pg_mblen(). Also, if the caller needs the position of
the match, as a character-offset, we also need to walk the string to count
the characters.

Performance testing shows that walking the whole string with pg_mblen() is
somewhat slower than converting the whole string to wchars. It's still
often a win, though, because we don't need to do it if there is no match,
and even when there is, we only need to walk up to the point where the
match is, not the whole string. Even in the worst case, there would be
room for optimization: Much of the CPU time in the current loop with
pg_mblen() is function call overhead, and could be improved by inlining
pg_mblen() and/or the encoding-specific mblen() functions. But I didn't
attempt to do that as part of this patch.

Most of the callers of text_position_setup/next functions were actually
not interested in the position of the match, counted in characters. To
cater for them, refactor the text_position_next() interface into two
parts: searching for the next match (text_position_next()), and returning
the current match's position as a pointer (text_position_get_match_ptr())
or as a character offset (text_position_get_match_pos()). Getting the
pointer to the match is a more convenient API for many callers, and with
UTF-8, it allows skipping the character-walking step altogether, because
UTF-8 can't have false matches even when treated like raw byte strings.

Reviewed-by: John Naylor
Discussion: https://www.postgresql.org/message-id/3173d989-bc1c-fc8a-3b69-f24246f73876%40iki.fi
This commit is contained in:
Heikki Linnakangas 2019-01-25 16:25:05 +02:00
parent a5be6e9a1d
commit 9556aa01c6
1 changed files with 266 additions and 235 deletions

View File

@ -43,18 +43,33 @@ int bytea_output = BYTEA_OUTPUT_HEX;
typedef struct varlena unknown;
typedef struct varlena VarString;
/*
* State for text_position_* functions.
*/
typedef struct
{
bool use_wchar; /* T if multibyte encoding */
char *str1; /* use these if not use_wchar */
char *str2; /* note: these point to original texts */
pg_wchar *wstr1; /* use these if use_wchar */
pg_wchar *wstr2; /* note: these are palloc'd */
int len1; /* string lengths in logical characters */
bool is_multibyte; /* T if multibyte encoding */
bool is_multibyte_char_in_char;
char *str1; /* haystack string */
char *str2; /* needle string */
int len1; /* string lengths in bytes */
int len2;
/* Skip table for Boyer-Moore-Horspool search algorithm: */
int skiptablemask; /* mask for ANDing with skiptable subscripts */
int skiptable[256]; /* skip distance for given mismatched char */
char *last_match; /* pointer to last match in 'str1' */
/*
* Sometimes we need to convert the byte position of a match to a
* character position. These store the last position that was converted,
* so that on the next call, we can continue from that point, rather than
* count characters from the very beginning.
*/
char *refpoint; /* pointer within original haystack string */
int refpos; /* 0-based character offset of the same point */
} TextPositionState;
typedef struct
@ -109,7 +124,10 @@ static text *text_substring(Datum str,
static text *text_overlay(text *t1, text *t2, int sp, int sl);
static int text_position(text *t1, text *t2);
static void text_position_setup(text *t1, text *t2, TextPositionState *state);
static int text_position_next(int start_pos, TextPositionState *state);
static bool text_position_next(TextPositionState *state);
static char *text_position_next_internal(char *start_ptr, TextPositionState *state);
static char *text_position_get_match_ptr(TextPositionState *state);
static int text_position_get_match_pos(TextPositionState *state);
static void text_position_cleanup(TextPositionState *state);
static int text_cmp(text *arg1, text *arg2, Oid collid);
static bytea *bytea_catenate(bytea *t1, bytea *t2);
@ -1099,8 +1117,14 @@ text_position(text *t1, text *t2)
TextPositionState state;
int result;
if (VARSIZE_ANY_EXHDR(t1) < 1 || VARSIZE_ANY_EXHDR(t2) < 1)
return 0;
text_position_setup(t1, t2, &state);
result = text_position_next(1, &state);
if (!text_position_next(&state))
result = 0;
else
result = text_position_get_match_pos(&state);
text_position_cleanup(&state);
return result;
}
@ -1112,9 +1136,14 @@ text_position(text *t1, text *t2)
*
* These are broken out so that a string can be efficiently searched for
* multiple occurrences of the same pattern. text_position_next may be
* called multiple times with increasing values of start_pos, which is
* the 1-based character position to start the search from. The "state"
* variable is normally just a local variable in the caller.
* called multiple times, and it advances to the next match on each call.
* text_position_get_match_ptr() and text_position_get_match_pos() return
* a pointer or 1-based character position of the last match, respectively.
*
* The "state" variable is normally just a local variable in the caller.
*
* NOTE: text_position_next skips over the matched portion. For example,
* searching for "xx" in "xxx" returns only one match, not two.
*/
static void
@ -1123,33 +1152,42 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
int len1 = VARSIZE_ANY_EXHDR(t1);
int len2 = VARSIZE_ANY_EXHDR(t2);
Assert(len1 > 0);
Assert(len2 > 0);
/*
* Even with a multi-byte encoding, we perform the search using the raw
* byte sequence, ignoring multibyte issues. For UTF-8, that works fine,
* because in UTF-8 the byte sequence of one character cannot contain
* another character. For other multi-byte encodings, we do the search
* initially as a simple byte search, ignoring multibyte issues, but
* verify afterwards that the match we found is at a character boundary,
* and continue the search if it was a false match.
*/
if (pg_database_encoding_max_length() == 1)
{
/* simple case - single byte encoding */
state->use_wchar = false;
state->str1 = VARDATA_ANY(t1);
state->str2 = VARDATA_ANY(t2);
state->len1 = len1;
state->len2 = len2;
state->is_multibyte = false;
state->is_multibyte_char_in_char = false;
}
else if (GetDatabaseEncoding() == PG_UTF8)
{
state->is_multibyte = true;
state->is_multibyte_char_in_char = false;
}
else
{
/* not as simple - multibyte encoding */
pg_wchar *p1,
*p2;
p1 = (pg_wchar *) palloc((len1 + 1) * sizeof(pg_wchar));
len1 = pg_mb2wchar_with_len(VARDATA_ANY(t1), p1, len1);
p2 = (pg_wchar *) palloc((len2 + 1) * sizeof(pg_wchar));
len2 = pg_mb2wchar_with_len(VARDATA_ANY(t2), p2, len2);
state->use_wchar = true;
state->wstr1 = p1;
state->wstr2 = p2;
state->len1 = len1;
state->len2 = len2;
state->is_multibyte = true;
state->is_multibyte_char_in_char = true;
}
state->str1 = VARDATA_ANY(t1);
state->str2 = VARDATA_ANY(t2);
state->len1 = len1;
state->len2 = len2;
state->last_match = NULL;
state->refpoint = state->str1;
state->refpos = 0;
/*
* Prepare the skip table for Boyer-Moore-Horspool searching. In these
* notes we use the terminology that the "haystack" is the string to be
@ -1166,6 +1204,7 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
int skiptablemask;
int last;
int i;
const char *str2 = state->str2;
/*
* First we must determine how much of the skip table to use. The
@ -1212,165 +1251,182 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
*/
last = len2 - 1;
if (!state->use_wchar)
{
const char *str2 = state->str2;
for (i = 0; i < last; i++)
state->skiptable[(unsigned char) str2[i] & skiptablemask] = last - i;
}
else
{
const pg_wchar *wstr2 = state->wstr2;
for (i = 0; i < last; i++)
state->skiptable[wstr2[i] & skiptablemask] = last - i;
}
for (i = 0; i < last; i++)
state->skiptable[(unsigned char) str2[i] & skiptablemask] = last - i;
}
}
static int
text_position_next(int start_pos, TextPositionState *state)
/*
* Advance to the next match, starting from the end of the previous match
* (or the beginning of the string, on first call). Returns true if a match
* is found.
*/
static bool
text_position_next(TextPositionState *state)
{
int needle_len = state->len2;
char *start_ptr;
char *matchptr;
if (needle_len <= 0)
return false; /* result for empty pattern */
/* Start from the point right after the previous match. */
if (state->last_match)
start_ptr = state->last_match + needle_len;
else
start_ptr = state->str1;
retry:
matchptr = text_position_next_internal(start_ptr, state);
if (!matchptr)
return false;
/*
* Found a match for the byte sequence. If this is a multibyte encoding,
* where one character's byte sequence can appear inside a longer
* multi-byte character, we need to verify that the match was at a
* character boundary, not in the middle of a multi-byte character.
*/
if (state->is_multibyte_char_in_char)
{
/* Walk one character at a time, until we reach the match. */
/* the search should never move backwards. */
Assert(state->refpoint <= matchptr);
while (state->refpoint < matchptr)
{
/* step to next character. */
state->refpoint += pg_mblen(state->refpoint);
state->refpos++;
/*
* If we stepped over the match's start position, then it was a
* false positive, where the byte sequence appeared in the middle
* of a multi-byte character. Skip it, and continue the search at
* the next character boundary.
*/
if (state->refpoint > matchptr)
{
start_ptr = state->refpoint;
goto retry;
}
}
}
state->last_match = matchptr;
return true;
}
/*
* Subroutine of text_position_next(). This searches for the raw byte
* sequence, ignoring any multi-byte encoding issues. Returns the first
* match starting at 'start_ptr', or NULL if no match is found.
*/
static char *
text_position_next_internal(char *start_ptr, TextPositionState *state)
{
int haystack_len = state->len1;
int needle_len = state->len2;
int skiptablemask = state->skiptablemask;
const char *haystack = state->str1;
const char *needle = state->str2;
const char *haystack_end = &haystack[haystack_len];
const char *hptr;
Assert(start_pos > 0); /* else caller error */
Assert(start_ptr >= haystack && start_ptr <= haystack_end);
if (needle_len <= 0)
return start_pos; /* result for empty pattern */
start_pos--; /* adjust for zero based arrays */
/* Done if the needle can't possibly fit */
if (haystack_len < start_pos + needle_len)
return 0;
if (!state->use_wchar)
if (needle_len == 1)
{
/* simple case - single byte encoding */
const char *haystack = state->str1;
const char *needle = state->str2;
const char *haystack_end = &haystack[haystack_len];
const char *hptr;
/* No point in using B-M-H for a one-character needle */
char nchar = *needle;
if (needle_len == 1)
hptr = start_ptr;
while (hptr < haystack_end)
{
/* No point in using B-M-H for a one-character needle */
char nchar = *needle;
hptr = &haystack[start_pos];
while (hptr < haystack_end)
{
if (*hptr == nchar)
return hptr - haystack + 1;
hptr++;
}
}
else
{
const char *needle_last = &needle[needle_len - 1];
/* Start at startpos plus the length of the needle */
hptr = &haystack[start_pos + needle_len - 1];
while (hptr < haystack_end)
{
/* Match the needle scanning *backward* */
const char *nptr;
const char *p;
nptr = needle_last;
p = hptr;
while (*nptr == *p)
{
/* Matched it all? If so, return 1-based position */
if (nptr == needle)
return p - haystack + 1;
nptr--, p--;
}
/*
* No match, so use the haystack char at hptr to decide how
* far to advance. If the needle had any occurrence of that
* character (or more precisely, one sharing the same
* skiptable entry) before its last character, then we advance
* far enough to align the last such needle character with
* that haystack position. Otherwise we can advance by the
* whole needle length.
*/
hptr += state->skiptable[(unsigned char) *hptr & skiptablemask];
}
if (*hptr == nchar)
return (char *) hptr;
hptr++;
}
}
else
{
/* The multibyte char version. This works exactly the same way. */
const pg_wchar *haystack = state->wstr1;
const pg_wchar *needle = state->wstr2;
const pg_wchar *haystack_end = &haystack[haystack_len];
const pg_wchar *hptr;
const char *needle_last = &needle[needle_len - 1];
if (needle_len == 1)
/* Start at startpos plus the length of the needle */
hptr = start_ptr + needle_len - 1;
while (hptr < haystack_end)
{
/* No point in using B-M-H for a one-character needle */
pg_wchar nchar = *needle;
/* Match the needle scanning *backward* */
const char *nptr;
const char *p;
hptr = &haystack[start_pos];
while (hptr < haystack_end)
nptr = needle_last;
p = hptr;
while (*nptr == *p)
{
if (*hptr == nchar)
return hptr - haystack + 1;
hptr++;
/* Matched it all? If so, return 1-based position */
if (nptr == needle)
return (char *) p;
nptr--, p--;
}
}
else
{
const pg_wchar *needle_last = &needle[needle_len - 1];
/* Start at startpos plus the length of the needle */
hptr = &haystack[start_pos + needle_len - 1];
while (hptr < haystack_end)
{
/* Match the needle scanning *backward* */
const pg_wchar *nptr;
const pg_wchar *p;
nptr = needle_last;
p = hptr;
while (*nptr == *p)
{
/* Matched it all? If so, return 1-based position */
if (nptr == needle)
return p - haystack + 1;
nptr--, p--;
}
/*
* No match, so use the haystack char at hptr to decide how
* far to advance. If the needle had any occurrence of that
* character (or more precisely, one sharing the same
* skiptable entry) before its last character, then we advance
* far enough to align the last such needle character with
* that haystack position. Otherwise we can advance by the
* whole needle length.
*/
hptr += state->skiptable[*hptr & skiptablemask];
}
/*
* No match, so use the haystack char at hptr to decide how far to
* advance. If the needle had any occurrence of that character
* (or more precisely, one sharing the same skiptable entry)
* before its last character, then we advance far enough to align
* the last such needle character with that haystack position.
* Otherwise we can advance by the whole needle length.
*/
hptr += state->skiptable[(unsigned char) *hptr & skiptablemask];
}
}
return 0; /* not found */
}
/*
* Return a pointer to the current match.
*
* The returned pointer points into correct position in the original
* the haystack string.
*/
static char *
text_position_get_match_ptr(TextPositionState *state)
{
return state->last_match;
}
/*
* Return the offset of the current match.
*
* The offset is in characters, 1-based.
*/
static int
text_position_get_match_pos(TextPositionState *state)
{
if (!state->is_multibyte)
return state->last_match - state->str1 + 1;
else
{
/* Convert the byte position to char position. */
while (state->refpoint < state->last_match)
{
state->refpoint += pg_mblen(state->refpoint);
state->refpos++;
}
Assert(state->refpoint == state->last_match);
return state->refpos + 1;
}
}
static void
text_position_cleanup(TextPositionState *state)
{
if (state->use_wchar)
{
pfree(state->wstr1);
pfree(state->wstr2);
}
/* no cleanup needed */
}
/* varstr_cmp()
@ -4050,39 +4106,32 @@ replace_text(PG_FUNCTION_ARGS)
int from_sub_text_len;
TextPositionState state;
text *ret_text;
int start_posn;
int curr_posn;
int chunk_len;
char *curr_ptr;
char *start_ptr;
StringInfoData str;
bool found;
text_position_setup(src_text, from_sub_text, &state);
/*
* Note: we check the converted string length, not the original, because
* they could be different if the input contained invalid encoding.
*/
src_text_len = state.len1;
from_sub_text_len = state.len2;
src_text_len = VARSIZE_ANY_EXHDR(src_text);
from_sub_text_len = VARSIZE_ANY_EXHDR(from_sub_text);
/* Return unmodified source string if empty source or pattern */
if (src_text_len < 1 || from_sub_text_len < 1)
{
text_position_cleanup(&state);
PG_RETURN_TEXT_P(src_text);
}
start_posn = 1;
curr_posn = text_position_next(1, &state);
text_position_setup(src_text, from_sub_text, &state);
found = text_position_next(&state);
/* When the from_sub_text is not found, there is nothing to do. */
if (curr_posn == 0)
if (!found)
{
text_position_cleanup(&state);
PG_RETURN_TEXT_P(src_text);
}
/* start_ptr points to the start_posn'th character of src_text */
curr_ptr = text_position_get_match_ptr(&state);
start_ptr = VARDATA_ANY(src_text);
initStringInfo(&str);
@ -4092,19 +4141,18 @@ replace_text(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS();
/* copy the data skipped over by last text_position_next() */
chunk_len = charlen_to_bytelen(start_ptr, curr_posn - start_posn);
chunk_len = curr_ptr - start_ptr;
appendBinaryStringInfo(&str, start_ptr, chunk_len);
appendStringInfoText(&str, to_sub_text);
start_posn = curr_posn;
start_ptr += chunk_len;
start_posn += from_sub_text_len;
start_ptr += charlen_to_bytelen(start_ptr, from_sub_text_len);
start_ptr = curr_ptr + from_sub_text_len;
curr_posn = text_position_next(start_posn, &state);
found = text_position_next(&state);
if (found)
curr_ptr = text_position_get_match_ptr(&state);
}
while (curr_posn > 0);
while (found);
/* copy trailing data */
chunk_len = ((char *) src_text + VARSIZE_ANY(src_text)) - start_ptr;
@ -4405,9 +4453,10 @@ split_text(PG_FUNCTION_ARGS)
int inputstring_len;
int fldsep_len;
TextPositionState state;
int start_posn;
int end_posn;
char *start_ptr;
char *end_ptr;
text *result_text;
bool found;
/* field number is 1 based */
if (fldnum < 1)
@ -4415,21 +4464,12 @@ split_text(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("field position must be greater than zero")));
text_position_setup(inputstring, fldsep, &state);
/*
* Note: we check the converted string length, not the original, because
* they could be different if the input contained invalid encoding.
*/
inputstring_len = state.len1;
fldsep_len = state.len2;
inputstring_len = VARSIZE_ANY_EXHDR(inputstring);
fldsep_len = VARSIZE_ANY_EXHDR(fldsep);
/* return empty string for empty input string */
if (inputstring_len < 1)
{
text_position_cleanup(&state);
PG_RETURN_TEXT_P(cstring_to_text(""));
}
/* empty field separator */
if (fldsep_len < 1)
@ -4442,12 +4482,14 @@ split_text(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(cstring_to_text(""));
}
text_position_setup(inputstring, fldsep, &state);
/* identify bounds of first field */
start_posn = 1;
end_posn = text_position_next(1, &state);
start_ptr = VARDATA_ANY(inputstring);
found = text_position_next(&state);
/* special case if fldsep not found at all */
if (end_posn == 0)
if (!found)
{
text_position_cleanup(&state);
/* if field 1 requested, return input string, else empty string */
@ -4456,12 +4498,15 @@ split_text(PG_FUNCTION_ARGS)
else
PG_RETURN_TEXT_P(cstring_to_text(""));
}
end_ptr = text_position_get_match_ptr(&state);
while (end_posn > 0 && --fldnum > 0)
while (found && --fldnum > 0)
{
/* identify bounds of next field */
start_posn = end_posn + fldsep_len;
end_posn = text_position_next(start_posn, &state);
start_ptr = end_ptr + fldsep_len;
found = text_position_next(&state);
if (found)
end_ptr = text_position_get_match_ptr(&state);
}
text_position_cleanup(&state);
@ -4471,20 +4516,19 @@ split_text(PG_FUNCTION_ARGS)
/* N'th field separator not found */
/* if last field requested, return it, else empty string */
if (fldnum == 1)
result_text = text_substring(PointerGetDatum(inputstring),
start_posn,
-1,
true);
{
int last_len = start_ptr - VARDATA_ANY(inputstring);
result_text = cstring_to_text_with_len(start_ptr,
inputstring_len - last_len);
}
else
result_text = cstring_to_text("");
}
else
{
/* non-last field requested */
result_text = text_substring(PointerGetDatum(inputstring),
start_posn,
end_posn - start_posn,
false);
result_text = cstring_to_text_with_len(start_ptr, end_ptr - start_ptr);
}
PG_RETURN_TEXT_P(result_text);
@ -4570,26 +4614,14 @@ text_to_array_internal(PG_FUNCTION_ARGS)
*/
TextPositionState state;
int fldnum;
int start_posn;
int end_posn;
int chunk_len;
text_position_setup(inputstring, fldsep, &state);
/*
* Note: we check the converted string length, not the original,
* because they could be different if the input contained invalid
* encoding.
*/
inputstring_len = state.len1;
fldsep_len = state.len2;
inputstring_len = VARSIZE_ANY_EXHDR(inputstring);
fldsep_len = VARSIZE_ANY_EXHDR(fldsep);
/* return empty array for empty input string */
if (inputstring_len < 1)
{
text_position_cleanup(&state);
PG_RETURN_ARRAYTYPE_P(construct_empty_array(TEXTOID));
}
/*
* empty field separator: return the input string as a one-element
@ -4602,7 +4634,6 @@ text_to_array_internal(PG_FUNCTION_ARGS)
int dims[1];
int lbs[1];
text_position_cleanup(&state);
/* single element can be a NULL too */
is_null = null_string ? text_isequal(inputstring, null_string) : false;
@ -4616,17 +4647,19 @@ text_to_array_internal(PG_FUNCTION_ARGS)
TEXTOID, -1, false, 'i'));
}
start_posn = 1;
/* start_ptr points to the start_posn'th character of inputstring */
text_position_setup(inputstring, fldsep, &state);
start_ptr = VARDATA_ANY(inputstring);
for (fldnum = 1;; fldnum++) /* field number is 1 based */
{
bool found;
char *end_ptr;
CHECK_FOR_INTERRUPTS();
end_posn = text_position_next(start_posn, &state);
if (end_posn == 0)
found = text_position_next(&state);
if (!found)
{
/* fetch last field */
chunk_len = ((char *) inputstring + VARSIZE_ANY(inputstring)) - start_ptr;
@ -4634,7 +4667,8 @@ text_to_array_internal(PG_FUNCTION_ARGS)
else
{
/* fetch non-last field */
chunk_len = charlen_to_bytelen(start_ptr, end_posn - start_posn);
end_ptr = text_position_get_match_ptr(&state);
chunk_len = end_ptr - start_ptr;
}
/* must build a temp text datum to pass to accumArrayResult */
@ -4650,13 +4684,10 @@ text_to_array_internal(PG_FUNCTION_ARGS)
pfree(result_text);
if (end_posn == 0)
if (!found)
break;
start_posn = end_posn;
start_ptr += chunk_len;
start_posn += fldsep_len;
start_ptr += charlen_to_bytelen(start_ptr, fldsep_len);
start_ptr = end_ptr + fldsep_len;
}
text_position_cleanup(&state);