Skip to content

Commit

Permalink
Merge branch 'master' of github.com:bnb-chain/greenfield-storage-prov…
Browse files Browse the repository at this point in the history
…ider
  • Loading branch information
constwz committed Apr 30, 2024
2 parents 396cca7 + ec154a1 commit 0537c3c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 22 deletions.
2 changes: 1 addition & 1 deletion modular/executor/execute_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core
if task.GetIsAgentUpload() {
expectCheckSums, makeErr := e.makeCheckSumsForAgentUpload(ctx, task.GetObjectInfo(), len(task.GetSecondaryEndpoints()), task.GetStorageParams())
if makeErr != nil {
log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", err)
log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", makeErr)
err = makeErr
return
}
Expand Down
33 changes: 27 additions & 6 deletions modular/receiver/receive_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,34 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
log.CtxErrorw(ctx, "failed to write integrity meta to db", "task", task, "error", err)
return nil, ErrGfSpDBWithDetail("failed to write integrity meta to db, error: " + err.Error())
}

// only in the case when the secondary sp is the primary SP in a GVG and it is delegated upload, it should not delete the piece hash at this moment.(will be deleted when the sealing is success)
// in all other cases, piece hash should be deleted from DB
var skipDeleteChecksum bool
if task.GetIsAgentUploadTask() {
gvg, queryErr := r.baseApp.Consensus().QueryGlobalVirtualGroup(ctx, task.GetGlobalVirtualGroupId())
if queryErr != nil {
log.CtxErrorw(ctx, "failed to QueryGlobalVirtualGroup", "error", queryErr)
return nil, ErrGfSpDBWithDetail("failed to QueryGlobalVirtualGroup, error: " + queryErr.Error())
}
spID, idErr := r.getSPID()
if idErr != nil {
log.CtxErrorw(ctx, "failed to getSPID", "error", idErr)
return nil, ErrGfSpDBWithDetail("failed to getSPID, error: " + idErr.Error())
}
if spID == gvg.PrimarySpId {
skipDeleteChecksum = true
}
}
deletePieceHashTime := time.Now()
if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized(
task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil {
log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err)
// ignore the error,let the request go, the background task will gc the meta again later
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Observe(time.Since(deletePieceHashTime).Seconds())
if !skipDeleteChecksum {
if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized(
task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil {
log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err)
// ignore the error,let the request go, the background task will gc the meta again later
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Observe(time.Since(deletePieceHashTime).Seconds())
}
}

metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Expand Down
14 changes: 14 additions & 0 deletions modular/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type ReceiveModular struct {
baseApp *gfspapp.GfSpBaseApp
scope rcmgr.ResourceScope
receiveQueue taskqueue.TQueueOnStrategy

spID uint32
}

func (r *ReceiveModular) Name() string {
Expand Down Expand Up @@ -50,3 +52,15 @@ func (r *ReceiveModular) ReserveResource(ctx context.Context, state *rcmgr.Scope
func (r *ReceiveModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan) {
span.Done()
}

func (e *ReceiveModular) getSPID() (uint32, error) {
if e.spID != 0 {
return e.spID, nil
}
spInfo, err := e.baseApp.Consensus().QuerySP(context.Background(), e.baseApp.OperatorAddress())
if err != nil {
return 0, err
}
e.spID = spInfo.GetId()
return e.spID, nil
}
18 changes: 7 additions & 11 deletions store/sqldb/object_integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-sql-driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"

corespdb "github.com/bnb-chain/greenfield-storage-provider/core/spdb"
"github.com/bnb-chain/greenfield-storage-provider/pkg/metrics"
Expand Down Expand Up @@ -408,17 +409,12 @@ func (s *SpDBImpl) SetReplicatePieceChecksum(objectID uint64, segmentIdx uint32,
PieceChecksum: hex.EncodeToString(checksum),
Version: version,
}
result = s.db.Create(insertPieceHash)
if result.Error != nil && MysqlErrCode(result.Error) == ErrDuplicateEntryCode {
// If all columns are identical to previous, the db.Save will also encounter ErrDuplicateEntryCode, then it should skip.
err = s.db.Save(insertPieceHash).Error
if MysqlErrCode(err) == ErrDuplicateEntryCode {
return nil
}
return err
}
if result.Error != nil || result.RowsAffected != 1 {
err = fmt.Errorf("failed to insert piece hash record: %s", result.Error)
result = s.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "object_id"}, {Name: "segment_index"}, {Name: "redundancy_index"}},
UpdateAll: true,
}).Create(insertPieceHash)
if result.Error != nil {
err = fmt.Errorf("failed to insert piece hash record: %v", result.Error)
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions store/sqldb/object_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumSuccess(t *testing.T) {
)
s, mock := setupDB(t)
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?)").
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)").
WithArgs(objectID, segmentIdx, redundancyIdx, hex.EncodeToString(pieceChecksum), version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
err := s.SetReplicatePieceChecksum(objectID, segmentIdx, redundancyIdx, pieceChecksum, version)
Expand All @@ -435,7 +435,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumFailure1(t *testing.T) {
)
s, mock := setupDB(t)
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?)").
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)").
WithArgs(objectID, segmentIdx, redundancyIdx, hex.EncodeToString(pieceChecksum), version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
err := s.SetReplicatePieceChecksum(objectID, segmentIdx, redundancyIdx, pieceChecksum, version)
Expand All @@ -452,7 +452,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumFailure2(t *testing.T) {
)
s, mock := setupDB(t)
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`) VALUES (?,?,?,?)").
mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)").
WillReturnError(mockDBInternalError)
mock.ExpectRollback()
mock.ExpectCommit()
Expand Down
2 changes: 1 addition & 1 deletion store/sqldb/shadow_object_integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *SpDBImpl) ListShadowIntegrityMeta() ([]*corespdb.ShadowIntegrityMeta, e
err = s.db.Table(ShadowIntegrityMetaTableName).
Select("*").
Limit(ListShadowIntegrityMetaDefaultSize).
Order("object_id asc").
Order("object_id desc").
Find(&shadowIntegrityMetas).Error

for _, sim := range shadowIntegrityMetas {
Expand Down

0 comments on commit 0537c3c

Please sign in to comment.