Fix data race in bleve indexer (#16474) (#16509)

* Fix data race in bleve indexer

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
pull/16511/head
6543 2021-07-22 05:42:32 +02:00 committed by GitHub
parent 1f5011dff7
commit c4f3f5bdf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 6 deletions

View File

@ -0,0 +1,59 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bleve
import (
"github.com/blevesearch/bleve/v2"
)
// FlushingBatch is a batch of operations that automatically flushes to the
// underlying index once it reaches a certain size.
type FlushingBatch struct {
maxBatchSize int
batch *bleve.Batch
index bleve.Index
}
// NewFlushingBatch creates a new flushing batch for the specified index. Once
// the number of operations in the batch reaches the specified limit, the batch
// automatically flushes its operations to the index.
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
return &FlushingBatch{
maxBatchSize: maxBatchSize,
batch: index.NewBatch(),
index: index,
}
}
// Index add a new index to batch
func (b *FlushingBatch) Index(id string, data interface{}) error {
if err := b.batch.Index(id, data); err != nil {
return err
}
return b.flushIfFull()
}
// Delete add a delete index to batch
func (b *FlushingBatch) Delete(id string) error {
b.batch.Delete(id)
return b.flushIfFull()
}
func (b *FlushingBatch) flushIfFull() error {
if b.batch.Size() < b.maxBatchSize {
return nil
}
return b.Flush()
}
// Flush submit the batch and create a new one
func (b *FlushingBatch) Flush() error {
err := b.index.Batch(b.batch)
if err != nil {
return err
}
b.batch = b.index.NewBatch()
return nil
}

View File

@ -18,6 +18,7 @@ import (
"code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git"
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
return indexer, created, err return indexer, created, err
} }
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string,
update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
// Ignore vendored files in code search // Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
return nil return nil
@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *
}) })
} }
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
id := filenameIndexerID(repo.ID, filename) id := filenameIndexerID(repo.ID, filename)
return batch.Delete(id) return batch.Delete(id)
} }
@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() {
// Index indexes the data // Index indexes the data
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
if len(changes.Updates) > 0 { if len(changes.Updates) > 0 {
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
if err != nil { if err != nil {
return err return err
} }
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, hit := range result.Hits { for _, hit := range result.Hits {
if err = batch.Delete(hit.ID); err != nil { if err = batch.Delete(hit.ID); err != nil {
return err return err

View File

@ -9,8 +9,10 @@ import (
"os" "os"
"strconv" "strconv"
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom" "github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
"github.com/blevesearch/bleve/v2/analysis/token/lowercase" "github.com/blevesearch/bleve/v2/analysis/token/lowercase"
@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() {
// Index will save the index data // Index will save the index data
func (b *BleveIndexer) Index(issues []*IndexerData) error { func (b *BleveIndexer) Index(issues []*IndexerData) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, issue := range issues { for _, issue := range issues {
if err := batch.Index(indexerID(issue.ID), struct { if err := batch.Index(indexerID(issue.ID), struct {
RepoID int64 RepoID int64
@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error {
// Delete deletes indexes by ids // Delete deletes indexes by ids
func (b *BleveIndexer) Delete(ids ...int64) error { func (b *BleveIndexer) Delete(ids ...int64) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
for _, id := range ids { for _, id := range ids {
if err := batch.Delete(indexerID(id)); err != nil { if err := batch.Delete(indexerID(id)); err != nil {
return err return err