This commit is contained in:
Chongyi Zheng 2024-05-03 23:31:42 -07:00 committed by GitHub
commit c40345222f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 425 additions and 261 deletions

File diff suppressed because one or more lines are too long

7
go.mod
View File

@ -30,6 +30,7 @@ require (
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5
github.com/dustin/go-humanize v1.0.1
github.com/editorconfig/editorconfig-core-go/v2 v2.6.1
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/emersion/go-imap v1.2.1
github.com/emirpasic/gods v1.18.1
github.com/ethantkoenig/rupture v1.0.1
@ -82,7 +83,6 @@ require (
github.com/msteinert/pam v1.2.0
github.com/nektos/act v0.2.52
github.com/niklasfasching/go-org v1.7.0
github.com/olivere/elastic/v7 v7.0.32
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
github.com/pkg/errors v0.9.1
@ -174,6 +174,7 @@ require (
github.com/davidmz/go-pageant v1.0.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
@ -184,6 +185,8 @@ require (
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.23.0 // indirect
github.com/go-openapi/errors v0.22.0 // indirect
github.com/go-openapi/inflect v0.21.0 // indirect
@ -263,6 +266,7 @@ require (
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/skeema/knownhosts v1.2.2 // indirect
github.com/smartystreets/assertions v1.1.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
@ -285,6 +289,7 @@ require (
go.etcd.io/bbolt v1.3.9 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect

17
go.sum
View File

@ -236,6 +236,10 @@ github.com/dvyukov/go-fuzz v0.0.0-20210429054444-fca39067bc72/go.mod h1:11Gm+ccJ
github.com/editorconfig/editorconfig-core-go/v2 v2.6.1 h1:iPCqofzMO41WVbcS/B5Ym7AwHQg9cyQ7Ie/R2XU5L3A=
github.com/editorconfig/editorconfig-core-go/v2 v2.6.1/go.mod h1:VY4oyqUnpULFB3SCRpl24GFDIN1PmfiQIvN/G4ScSNg=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0=
github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg=
github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI=
github.com/elazarl/go-bindata-assetfs v1.0.1/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
@ -256,8 +260,6 @@ github.com/felixge/fgprof v0.9.4 h1:ocDNwMFlnA0NU0zSB3I52xkO4sFXk80VK9lXjLClu88=
github.com/felixge/fgprof v0.9.4/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -306,6 +308,11 @@ github.com/go-git/go-git/v5 v5.11.0 h1:XIZc1p+8YzypNr34itUfSvYJcv+eYdTnTvOZ2vD3c
github.com/go-git/go-git/v5 v5.11.0/go.mod h1:6GFcX2P3NM7FPBfpePbpLd21XxsgdAt+lKqXmCUiUCY=
github.com/go-ldap/ldap/v3 v3.4.6 h1:ert95MdbiG7aWo/oPYp9btL3KJlMPKnP58r09rI8T+A=
github.com/go-ldap/ldap/v3 v3.4.6/go.mod h1:IGMQANNtxpsOzj7uUAMjpGBaOVTC4DYyIy8VsTdxmtc=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/analysis v0.23.0 h1:aGday7OWupfMs+LbmLZG4k0MYXIANxcuBTYUC03zFCU=
github.com/go-openapi/analysis v0.23.0/go.mod h1:9mz9ZWaSlV8TvjQHLl2mUW2PbZtemkE8yA5v22ohupo=
github.com/go-openapi/errors v0.22.0 h1:c4xY/OLxUBSTiepAg3j/MHuAv5mJhnf53LLMWFB+u/w=
@ -609,8 +616,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@ -835,6 +840,10 @@ go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=

View File

@ -13,7 +13,6 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/indexer/code/internal"
indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
@ -24,16 +23,19 @@ import (
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/typesniffer"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/some"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/highlightertype"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/termvectoroption"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype"
"github.com/go-enry/go-enry/v2"
"github.com/olivere/elastic/v7"
)
const (
esRepoIndexerLatestVersion = 1
// multi-match-types, currently only 2 types are used
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
esMultiMatchTypeBestFields = "best_fields"
esMultiMatchTypePhrasePrefix = "phrase_prefix"
)
var _ internal.Indexer = &Indexer{}
@ -54,40 +56,25 @@ func NewIndexer(url, indexerName string) *Indexer {
return indexer
}
const (
defaultMapping = `{
"mappings": {
"properties": {
"repo_id": {
"type": "long",
"index": true
},
"content": {
"type": "text",
"term_vector": "with_positions_offsets",
"index": true
},
"commit_id": {
"type": "keyword",
"index": true
},
"language": {
"type": "keyword",
"index": true
},
"updated_at": {
"type": "long",
"index": true
}
}
}
}`
)
var defaultMapping = &types.TypeMapping{
Properties: map[string]types.Property{
"repo_id": types.NewLongNumberProperty(),
"content": &types.TextProperty{
Fields: make(map[string]types.Property, 0),
Meta: make(map[string]string, 0),
Properties: make(map[string]types.Property, 0),
TermVector: &termvectoroption.Withpositions,
},
"commit_id": types.NewKeywordProperty(),
"language": types.NewKeywordProperty(),
"updated_at": types.NewLongNumberProperty(),
},
}
func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) {
func (b *Indexer) addUpdate(ctx context.Context, blk *bulk.Bulk, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update internal.FileUpdate, repo *repo_model.Repository) error {
// Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
return nil, nil
return nil
}
size := update.Size
@ -96,63 +83,68 @@ func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserErro
var stdout string
stdout, _, err = git.NewCommand(ctx, "cat-file", "-s").AddDynamicArguments(update.BlobSha).RunStdString(&git.RunOpts{Dir: repo.RepoPath()})
if err != nil {
return nil, err
return err
}
if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil {
return nil, fmt.Errorf("misformatted git cat-file output: %w", err)
return fmt.Errorf("misformatted git cat-file output: %w", err)
}
}
if size > setting.Indexer.MaxIndexerFileSize {
return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
return b.addDelete(blk, update.Filename, repo)
}
if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
return nil, err
return err
}
_, _, size, err = git.ReadBatchLine(batchReader)
if err != nil {
return nil, err
return err
}
fileContents, err := io.ReadAll(io.LimitReader(batchReader, size))
if err != nil {
return nil, err
return err
} else if !typesniffer.DetectContentType(fileContents).IsText() {
// FIXME: UTF-16 files will probably fail here
return nil, nil
return nil
}
if _, err = batchReader.Discard(1); err != nil {
return nil, err
return err
}
id := internal.FilenameIndexerID(repo.ID, update.Filename)
return []elastic.BulkableRequest{
elastic.NewBulkIndexRequest().
Index(b.inner.VersionedIndexName()).
Id(id).
Doc(map[string]any{
"repo_id": repo.ID,
"content": string(charset.ToUTF8DropErrors(fileContents, charset.ConvertOpts{})),
"commit_id": sha,
"language": analyze.GetCodeLanguage(update.Filename, fileContents),
"updated_at": timeutil.TimeStampNow(),
}),
}, nil
return blk.IndexOp(types.IndexOperation{
Index_: some.String(b.inner.VersionedIndexName()),
Id_: some.String(id),
}, map[string]any{
"id": id,
"repo_id": repo.ID,
"content": string(fileContents),
"commit_id": sha,
"language": analyze.GetCodeLanguage(update.Filename, fileContents),
"updated_at": timeutil.TimeStampNow(),
})
}
func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest {
func (b *Indexer) addDelete(blk *bulk.Bulk, filename string, repo *repo_model.Repository) error {
id := internal.FilenameIndexerID(repo.ID, filename)
return elastic.NewBulkDeleteRequest().
Index(b.inner.VersionedIndexName()).
Id(id)
return blk.DeleteOp(types.DeleteOperation{
Index_: some.String(b.inner.VersionedIndexName()),
Id_: some.String(id),
})
}
// Index will save the index data
func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error {
reqs := make([]elastic.BulkableRequest, 0)
if len(changes.Updates) == 0 && len(changes.RemovedFilenames) == 0 {
return nil
}
blk := b.inner.Client.Bulk().Index(b.inner.VersionedIndexName())
if len(changes.Updates) > 0 {
// Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first!
if err := git.EnsureValidGitRepository(ctx, repo.RepoPath()); err != nil {
@ -164,41 +156,34 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st
defer cancel()
for _, update := range changes.Updates {
updateReqs, err := b.addUpdate(ctx, batchWriter, batchReader, sha, update, repo)
err := b.addUpdate(ctx, blk, batchWriter, batchReader, sha, update, repo)
if err != nil {
return err
}
if len(updateReqs) > 0 {
reqs = append(reqs, updateReqs...)
}
}
cancel()
}
for _, filename := range changes.RemovedFilenames {
reqs = append(reqs, b.addDelete(filename, repo))
}
if len(reqs) > 0 {
esBatchSize := 50
for i := 0; i < len(reqs); i += esBatchSize {
_, err := b.inner.Client.Bulk().
Index(b.inner.VersionedIndexName()).
Add(reqs[i:min(i+esBatchSize, len(reqs))]...).
Do(ctx)
if err != nil {
return err
}
err := b.addDelete(blk, filename, repo)
if err != nil {
return err
}
}
return nil
_, err := blk.Do(ctx)
return err
}
// Delete deletes indexes by ids
func (b *Indexer) Delete(ctx context.Context, repoID int64) error {
_, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()).
Query(elastic.NewTermsQuery("repo_id", repoID)).
Query(&types.Query{
Term: map[string]types.TermQuery{
"repo_id": {Value: repoID},
},
}).
// Query(elastic.NewTermsQuery("repo_id", repoID)).
Do(ctx)
return err
}
@ -219,7 +204,7 @@ func indexPos(content, start, end string) (int, int) {
return startIdx, startIdx + len(start) + endIdx + len(end)
}
func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) {
func convertResult(searchResult *search.Response, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) {
hits := make([]*internal.SearchResult, 0, pageSize)
for _, hit := range searchResult.Hits.Hits {
// FIXME: There is no way to get the position the keyword on the content currently on the same request.
@ -239,9 +224,9 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int)
panic(fmt.Sprintf("2===%#v", hit.Highlight))
}
repoID, fileName := internal.ParseIndexerID(hit.Id)
repoID, fileName := internal.ParseIndexerID(hit.Id_)
res := make(map[string]any)
if err := json.Unmarshal(hit.Source, &res); err != nil {
if err := json.Unmarshal(hit.Source_, &res); err != nil {
return 0, nil, nil, err
}
@ -260,16 +245,18 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int)
})
}
return searchResult.TotalHits(), hits, extractAggs(searchResult), nil
return searchResult.Hits.Total.Value, hits, extractAggregates(searchResult), nil
}
func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLanguages {
func extractAggregates(searchResult *search.Response) []*internal.SearchResultLanguages {
var searchResultLanguages []*internal.SearchResultLanguages
agg, found := searchResult.Aggregations.Terms("language")
agg, found := searchResult.Aggregations["language"]
if found {
searchResultLanguages = make([]*internal.SearchResultLanguages, 0, 10)
for _, bucket := range agg.Buckets {
languageAgg := agg.(*types.StringTermsAggregate)
buckets := languageAgg.Buckets.([]types.StringTermsBucket)
for _, bucket := range buckets {
searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{
Language: bucket.Key.(string),
Color: enry.GetColor(bucket.Key.(string)),
@ -282,39 +269,69 @@ func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLan
// Search searches for codes and language stats by given conditions.
func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) {
searchType := esMultiMatchTypePhrasePrefix
// searchType := esMultiMatchTypePhrasePrefix
searchType := &textquerytype.Phraseprefix
if opts.IsKeywordFuzzy {
searchType = esMultiMatchTypeBestFields
searchType = &textquerytype.Bestfields
}
kwQuery := elastic.NewMultiMatchQuery(opts.Keyword, "content").Type(searchType)
query := elastic.NewBoolQuery()
query = query.Must(kwQuery)
kwQuery := types.Query{
MultiMatch: &types.MultiMatchQuery{
Query: opts.Keyword,
Fields: []string{"content"},
Type: searchType,
},
}
query := &types.Query{
Bool: &types.BoolQuery{
Must: []types.Query{kwQuery},
},
}
if len(opts.RepoIDs) > 0 {
repoStrs := make([]any, 0, len(opts.RepoIDs))
repoIDs := make([]types.FieldValue, 0, len(opts.RepoIDs))
for _, repoID := range opts.RepoIDs {
repoStrs = append(repoStrs, repoID)
repoIDs = append(repoIDs, types.FieldValue(repoID))
}
repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
query = query.Must(repoQuery)
repoQuery := types.Query{
Terms: &types.TermsQuery{
TermsQuery: map[string]types.TermsQueryField{
"repo_id": repoIDs,
},
},
}
query.Bool.Must = append(query.Bool.Must, repoQuery)
}
var (
start, pageSize = opts.GetSkipTake()
kw = "<em>" + opts.Keyword + "</em>"
aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc()
aggregation = map[string]types.Aggregations{
"language": {
Terms: &types.TermsAggregation{
Field: some.String("language"),
Size: some.Int(10),
Order: map[string]sortorder.SortOrder{
"_count": sortorder.Desc,
},
},
},
}
)
if len(opts.Language) == 0 {
searchResult, err := b.inner.Client.Search().
Index(b.inner.VersionedIndexName()).
Aggregation("language", aggregation).
Aggregations(aggregation).
Query(query).
Highlight(
elastic.NewHighlight().
Field("content").
NumOfFragments(0). // return all highting content on fragments
HighlighterType("fvh"),
&types.Highlight{
Fields: map[string]types.HighlightField{
"content": {
NumberOfFragments: some.Int(0), // return all highting content on fragments
Type: &highlightertype.Fvh,
},
},
},
).
Sort("repo_id", true).
From(start).Size(pageSize).
@ -326,10 +343,16 @@ func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int
return convertResult(searchResult, kw, pageSize)
}
langQuery := elastic.NewMatchQuery("language", opts.Language)
langQuery := types.Query{
Match: map[string]types.MatchQuery{
"language": {
Query: opts.Language,
},
},
}
countResult, err := b.inner.Client.Search().
Index(b.inner.VersionedIndexName()).
Aggregation("language", aggregation).
Aggregations(aggregation).
Query(query).
Size(0). // We only need stats information
Do(ctx)
@ -337,15 +360,19 @@ func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int
return 0, nil, nil, err
}
query = query.Must(langQuery)
query.Bool.Must = append(query.Bool.Must, langQuery)
searchResult, err := b.inner.Client.Search().
Index(b.inner.VersionedIndexName()).
Query(query).
Highlight(
elastic.NewHighlight().
Field("content").
NumOfFragments(0). // return all highting content on fragments
HighlighterType("fvh"),
&types.Highlight{
Fields: map[string]types.HighlightField{
"content": {
NumberOfFragments: some.Int(0), // return all highting content on fragments
Type: &highlightertype.Fvh,
},
},
},
).
Sort("repo_id", true).
From(start).Size(pageSize).
@ -356,5 +383,5 @@ func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int
total, hits, _, err := convertResult(searchResult, kw, pageSize)
return total, hits, extractAggs(countResult), err
return total, hits, extractAggregates(countResult), err
}

View File

@ -9,22 +9,23 @@ import (
"code.gitea.io/gitea/modules/indexer/internal"
"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)
var _ internal.Indexer = &Indexer{}
// Indexer represents a basic elasticsearch indexer implementation
type Indexer struct {
Client *elastic.Client
Client *elasticsearch.TypedClient
url string
indexName string
version int
mapping string
mapping *types.TypeMapping
}
func NewIndexer(url, indexName string, version int, mapping string) *Indexer {
func NewIndexer(url, indexName string, version int, mapping *types.TypeMapping) *Indexer {
return &Indexer{
url: url,
indexName: indexName,
@ -48,7 +49,7 @@ func (i *Indexer) Init(ctx context.Context) (bool, error) {
}
i.Client = client
exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx)
exists, err := i.Client.Indices.Exists(i.VersionedIndexName()).Do(ctx)
if err != nil {
return false, err
}
@ -72,11 +73,11 @@ func (i *Indexer) Ping(ctx context.Context) error {
return fmt.Errorf("indexer is not initialized")
}
resp, err := i.Client.ClusterHealth().Do(ctx)
resp, err := i.Client.Cluster.Health().Do(ctx)
if err != nil {
return err
}
if resp.Status != "green" && resp.Status != "yellow" {
if resp.Status.Name != "green" && resp.Status.Name != "yellow" {
// It's healthy if the status is green, and it's available if the status is yellow,
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html
return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status)

View File

@ -6,11 +6,11 @@ package elasticsearch
import (
"context"
"fmt"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/indices/create"
)
// VersionedIndexName returns the full index name with version
@ -27,12 +27,14 @@ func versionedIndexName(indexName string, version int) string {
}
func (i *Indexer) createIndex(ctx context.Context) error {
createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx)
createIndex, err := i.Client.Indices.Create(i.VersionedIndexName()).Request(&create.Request{
Mappings: i.mapping,
}).Do(ctx)
if err != nil {
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping)
return fmt.Errorf("create index %s failed", i.VersionedIndexName())
}
i.checkOldIndexes(ctx)
@ -40,27 +42,24 @@ func (i *Indexer) createIndex(ctx context.Context) error {
return nil
}
func (i *Indexer) initClient() (*elastic.Client, error) {
opts := []elastic.ClientOptionFunc{
elastic.SetURL(i.url),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10 * time.Second),
elastic.SetGzip(false),
func (i *Indexer) initClient() (*elasticsearch.TypedClient, error) {
cfg := elasticsearch.Config{
Addresses: []string{i.url},
}
logger := log.GetLogger(log.DEFAULT)
// logger := log.GetLogger(log.DEFAULT)
opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace}))
opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info}))
opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error}))
// opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace}))
// opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info}))
// opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error}))
return elastic.NewClient(opts...)
return elasticsearch.NewTypedClient(cfg)
}
func (i *Indexer) checkOldIndexes(ctx context.Context) {
for v := 0; v < i.version; v++ {
indexName := versionedIndexName(i.indexName, v)
exists, err := i.Client.IndexExists(indexName).Do(ctx)
exists, err := i.Client.Indices.Exists(indexName).Do(ctx)
if err == nil && exists {
log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName)
}

View File

@ -4,6 +4,7 @@
package elasticsearch
import (
"bytes"
"context"
"fmt"
"strconv"
@ -13,16 +14,19 @@ import (
indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
"code.gitea.io/gitea/modules/indexer/issues/internal"
"code.gitea.io/gitea/modules/json"
"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
"github.com/elastic/go-elasticsearch/v8/typedapi/some"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype"
)
const (
issueIndexerLatestVersion = 1
// multi-match-types, currently only 2 types are used
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
esMultiMatchTypeBestFields = "best_fields"
esMultiMatchTypePhrasePrefix = "phrase_prefix"
)
var _ internal.Indexer = &Indexer{}
@ -43,42 +47,35 @@ func NewIndexer(url, indexerName string) *Indexer {
return indexer
}
const (
defaultMapping = `
{
"mappings": {
"properties": {
"id": { "type": "integer", "index": true },
"repo_id": { "type": "integer", "index": true },
"is_public": { "type": "boolean", "index": true },
var defaultMapping = &types.TypeMapping{
Properties: map[string]types.Property{
"id": types.NewIntegerNumberProperty(),
"repo_id": types.NewIntegerNumberProperty(),
"is_public": types.NewBooleanProperty(),
"title": { "type": "text", "index": true },
"content": { "type": "text", "index": true },
"comments": { "type" : "text", "index": true },
"title": types.NewTextProperty(),
"content": types.NewTextProperty(),
"comments": types.NewTextProperty(),
"is_pull": { "type": "boolean", "index": true },
"is_closed": { "type": "boolean", "index": true },
"label_ids": { "type": "integer", "index": true },
"no_label": { "type": "boolean", "index": true },
"milestone_id": { "type": "integer", "index": true },
"project_id": { "type": "integer", "index": true },
"project_board_id": { "type": "integer", "index": true },
"poster_id": { "type": "integer", "index": true },
"assignee_id": { "type": "integer", "index": true },
"mention_ids": { "type": "integer", "index": true },
"reviewed_ids": { "type": "integer", "index": true },
"review_requested_ids": { "type": "integer", "index": true },
"subscriber_ids": { "type": "integer", "index": true },
"updated_unix": { "type": "integer", "index": true },
"created_unix": { "type": "integer", "index": true },
"deadline_unix": { "type": "integer", "index": true },
"comment_count": { "type": "integer", "index": true }
}
}
"is_pull": types.NewBooleanProperty(),
"is_closed": types.NewBooleanProperty(),
"label_ids": types.NewIntegerNumberProperty(),
"no_label": types.NewBooleanProperty(),
"milestone_id": types.NewIntegerNumberProperty(),
"project_id": types.NewIntegerNumberProperty(),
"project_board_id": types.NewIntegerNumberProperty(),
"poster_id": types.NewIntegerNumberProperty(),
"assignee_id": types.NewIntegerNumberProperty(),
"mention_ids": types.NewIntegerNumberProperty(),
"reviewed_ids": types.NewIntegerNumberProperty(),
"review_requested_ids": types.NewIntegerNumberProperty(),
"subscriber_ids": types.NewIntegerNumberProperty(),
"updated_unix": types.NewIntegerNumberProperty(),
"created_unix": types.NewIntegerNumberProperty(),
"deadline_unix": types.NewIntegerNumberProperty(),
"comment_count": types.NewIntegerNumberProperty(),
},
}
`
)
// Index will save the index data
func (b *Indexer) Index(ctx context.Context, issues ...*internal.IndexerData) error {
@ -86,27 +83,27 @@ func (b *Indexer) Index(ctx context.Context, issues ...*internal.IndexerData) er
return nil
} else if len(issues) == 1 {
issue := issues[0]
_, err := b.inner.Client.Index().
Index(b.inner.VersionedIndexName()).
raw, err := json.Marshal(issue)
if err != nil {
return err
}
_, err = b.inner.Client.Index(b.inner.VersionedIndexName()).
Id(fmt.Sprintf("%d", issue.ID)).
BodyJson(issue).
Raw(bytes.NewBuffer(raw)).
Do(ctx)
return err
}
reqs := make([]elastic.BulkableRequest, 0)
reqs := make(bulk.Request, 0)
for _, issue := range issues {
reqs = append(reqs,
elastic.NewBulkIndexRequest().
Index(b.inner.VersionedIndexName()).
Id(fmt.Sprintf("%d", issue.ID)).
Doc(issue),
)
reqs = append(reqs, issue)
}
_, err := b.inner.Client.Bulk().
Index(b.inner.VersionedIndexName()).
Add(reqs...).
Request(&reqs).
Do(graceful.GetManager().HammerContext())
return err
}
@ -115,132 +112,240 @@ func (b *Indexer) Index(ctx context.Context, issues ...*internal.IndexerData) er
func (b *Indexer) Delete(ctx context.Context, ids ...int64) error {
if len(ids) == 0 {
return nil
} else if len(ids) == 1 {
_, err := b.inner.Client.Delete().
Index(b.inner.VersionedIndexName()).
Id(fmt.Sprintf("%d", ids[0])).
Do(ctx)
}
if len(ids) == 1 {
_, err := b.inner.Client.Delete(
b.inner.VersionedIndexName(),
fmt.Sprintf("%d", ids[0]),
).Do(ctx)
return err
}
reqs := make([]elastic.BulkableRequest, 0)
for _, id := range ids {
reqs = append(reqs,
elastic.NewBulkDeleteRequest().
Index(b.inner.VersionedIndexName()).
Id(fmt.Sprintf("%d", id)),
)
bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: &elasticsearch.Client{
BaseClient: elasticsearch.BaseClient{
Transport: b.inner.Client.Transport,
},
},
Index: b.inner.VersionedIndexName(),
})
if err != nil {
return err
}
_, err := b.inner.Client.Bulk().
Index(b.inner.VersionedIndexName()).
Add(reqs...).
Do(graceful.GetManager().HammerContext())
return err
for _, id := range ids {
err = bulkIndexer.Add(ctx, esutil.BulkIndexerItem{
Action: "delete",
Index: b.inner.VersionedIndexName(),
DocumentID: fmt.Sprintf("%d", id),
})
if err != nil {
return err
}
}
return bulkIndexer.Close(ctx)
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) {
query := elastic.NewBoolQuery()
query := &types.Query{
Bool: &types.BoolQuery{
Must: make([]types.Query, 0),
},
}
if options.Keyword != "" {
searchType := esMultiMatchTypePhrasePrefix
searchType := &textquerytype.Phraseprefix
if options.IsFuzzyKeyword {
searchType = esMultiMatchTypeBestFields
searchType = &textquerytype.Bestfields
}
query.Must(elastic.NewMultiMatchQuery(options.Keyword, "title", "content", "comments").Type(searchType))
query.Bool.Must = append(query.Bool.Must, types.Query{
MultiMatch: &types.MultiMatchQuery{
Query: options.Keyword,
Fields: []string{"title", "content", "comments"},
Type: searchType,
},
})
}
if len(options.RepoIDs) > 0 {
q := elastic.NewBoolQuery()
q.Should(elastic.NewTermsQuery("repo_id", toAnySlice(options.RepoIDs)...))
if options.AllPublic {
q.Should(elastic.NewTermQuery("is_public", true))
q := types.Query{
Bool: &types.BoolQuery{
Should: make([]types.Query, 0),
},
}
query.Must(q)
if options.AllPublic {
q.Bool.Should = append(q.Bool.Should, types.Query{
Term: map[string]types.TermQuery{
"is_public": {Value: true},
},
})
}
query.Bool.Must = append(query.Bool.Must, q)
}
if options.IsPull.Has() {
query.Must(elastic.NewTermQuery("is_pull", options.IsPull.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"is_pull": {Value: options.IsPull.Value()},
},
})
}
if options.IsClosed.Has() {
query.Must(elastic.NewTermQuery("is_closed", options.IsClosed.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"is_closed": {Value: options.IsClosed.Value()},
},
})
}
if options.NoLabelOnly {
query.Must(elastic.NewTermQuery("no_label", true))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"no_label": {Value: true},
},
})
} else {
if len(options.IncludedLabelIDs) > 0 {
q := elastic.NewBoolQuery()
for _, labelID := range options.IncludedLabelIDs {
q.Must(elastic.NewTermQuery("label_ids", labelID))
q := types.Query{
Bool: &types.BoolQuery{
Must: make([]types.Query, 0),
},
}
query.Must(q)
for _, labelID := range options.IncludedLabelIDs {
q.Bool.Must = append(q.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"label_ids": {Value: labelID},
},
})
}
query.Bool.Must = append(query.Bool.Must, q)
} else if len(options.IncludedAnyLabelIDs) > 0 {
query.Must(elastic.NewTermsQuery("label_ids", toAnySlice(options.IncludedAnyLabelIDs)...))
query.Bool.Must = append(query.Bool.Must, types.Query{
Terms: &types.TermsQuery{
TermsQuery: map[string]types.TermsQueryField{
"label_ids": toAnySlice(options.IncludedAnyLabelIDs),
},
},
})
}
if len(options.ExcludedLabelIDs) > 0 {
q := elastic.NewBoolQuery()
for _, labelID := range options.ExcludedLabelIDs {
q.MustNot(elastic.NewTermQuery("label_ids", labelID))
q := types.Query{
Bool: &types.BoolQuery{
MustNot: make([]types.Query, 0),
},
}
query.Must(q)
for _, labelID := range options.ExcludedLabelIDs {
q.Bool.MustNot = append(q.Bool.MustNot, types.Query{
Term: map[string]types.TermQuery{
"label_ids": {Value: labelID},
},
})
}
query.Bool.Must = append(query.Bool.Must, q)
}
}
if len(options.MilestoneIDs) > 0 {
query.Must(elastic.NewTermsQuery("milestone_id", toAnySlice(options.MilestoneIDs)...))
query.Bool.Must = append(query.Bool.Must, types.Query{
Terms: &types.TermsQuery{
TermsQuery: map[string]types.TermsQueryField{
"milestone_id": toAnySlice(options.MilestoneIDs),
},
},
})
}
if options.ProjectID.Has() {
query.Must(elastic.NewTermQuery("project_id", options.ProjectID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"project_id": {Value: options.ProjectID.Value()},
},
})
}
if options.ProjectBoardID.Has() {
query.Must(elastic.NewTermQuery("project_board_id", options.ProjectBoardID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"project_board_id": {Value: options.ProjectBoardID.Value()},
},
})
}
if options.PosterID.Has() {
query.Must(elastic.NewTermQuery("poster_id", options.PosterID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"poster_id": {Value: options.PosterID.Value()},
},
})
}
if options.AssigneeID.Has() {
query.Must(elastic.NewTermQuery("assignee_id", options.AssigneeID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"assignee_id": {Value: options.AssigneeID.Value()},
},
})
}
if options.MentionID.Has() {
query.Must(elastic.NewTermQuery("mention_ids", options.MentionID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"mention_ids": {Value: options.MentionID.Value()},
},
})
}
if options.ReviewedID.Has() {
query.Must(elastic.NewTermQuery("reviewed_ids", options.ReviewedID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"reviewed_ids": {Value: options.ReviewedID.Value()},
},
})
}
if options.ReviewRequestedID.Has() {
query.Must(elastic.NewTermQuery("review_requested_ids", options.ReviewRequestedID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"review_requested_ids": {Value: options.ReviewRequestedID.Value()},
},
})
}
if options.SubscriberID.Has() {
query.Must(elastic.NewTermQuery("subscriber_ids", options.SubscriberID.Value()))
query.Bool.Must = append(query.Bool.Must, types.Query{
Term: map[string]types.TermQuery{
"subscriber_ids": {Value: options.SubscriberID.Value()},
},
})
}
if options.UpdatedAfterUnix.Has() || options.UpdatedBeforeUnix.Has() {
q := elastic.NewRangeQuery("updated_unix")
rangeQuery := types.NumberRangeQuery{}
if options.UpdatedAfterUnix.Has() {
q.Gte(options.UpdatedAfterUnix.Value())
rangeQuery.Gte = some.Float64(float64(options.UpdatedAfterUnix.Value()))
}
if options.UpdatedBeforeUnix.Has() {
q.Lte(options.UpdatedBeforeUnix.Value())
rangeQuery.Lte = some.Float64(float64(options.UpdatedBeforeUnix.Value()))
}
query.Must(q)
query.Bool.Must = append(query.Bool.Must, types.Query{
Range: map[string]types.RangeQuery{
"updated_unix": rangeQuery,
},
})
}
if options.SortBy == "" {
options.SortBy = internal.SortByCreatedAsc
}
sortBy := []elastic.Sorter{
parseSortBy(options.SortBy),
elastic.NewFieldSort("id").Desc(),
field, fieldSort := parseSortBy(options.SortBy)
sort := []types.SortCombinations{
&types.SortOptions{SortOptions: map[string]types.FieldSort{
field: fieldSort,
"id": {Order: &sortorder.Desc},
}},
}
// See https://stackoverflow.com/questions/35206409/elasticsearch-2-1-result-window-is-too-large-index-max-result-window/35221900
@ -251,7 +356,7 @@ func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (
searchResult, err := b.inner.Client.Search().
Index(b.inner.VersionedIndexName()).
Query(query).
SortBy(sortBy...).
Sort(sort...).
From(skip).Size(limit).
Do(ctx)
if err != nil {
@ -260,14 +365,14 @@ func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (
hits := make([]internal.Match, 0, limit)
for _, hit := range searchResult.Hits.Hits {
id, _ := strconv.ParseInt(hit.Id, 10, 64)
id, _ := strconv.ParseInt(hit.Id_, 10, 64)
hits = append(hits, internal.Match{
ID: id,
})
}
return &internal.SearchResult{
Total: searchResult.TotalHits(),
Total: searchResult.Hits.Total.Value,
Hits: hits,
}, nil
}
@ -280,11 +385,14 @@ func toAnySlice[T any](s []T) []any {
return ret
}
func parseSortBy(sortBy internal.SortBy) elastic.Sorter {
func parseSortBy(sortBy internal.SortBy) (string, types.FieldSort) {
field := strings.TrimPrefix(string(sortBy), "-")
ret := elastic.NewFieldSort(field)
if strings.HasPrefix(string(sortBy), "-") {
ret.Desc()
sort := types.FieldSort{
Order: &sortorder.Asc,
}
return ret
if strings.HasPrefix(string(sortBy), "-") {
sort.Order = &sortorder.Desc
}
return field, sort
}