Skip to content

Commit

Permalink
Merge branch 'master' of github.com:bnb-chain/greenfield-storage-prov…
Browse files Browse the repository at this point in the history
…ider
  • Loading branch information
constwz committed Apr 29, 2024
2 parents 7b73865 + b425248 commit 396cca7
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 44 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## v1.7.0
BUGFIXES
* [#1394](https://github.com/bnb-chain/greenfield-storage-provider/pull/1394) fix: pick new gvg when retry failed replicate piece task
* [#1391](https://github.com/bnb-chain/greenfield-storage-provider/pull/1391) fix: check if it is AgentUploadTask
* [#1390](https://github.com/bnb-chain/greenfield-storage-provider/pull/1390) fix: delegate upload param check
* [#1389](https://github.com/bnb-chain/greenfield-storage-provider/pull/1389) fix: delegate upload param check
* [#1387](https://github.com/bnb-chain/greenfield-storage-provider/pull/1387) fix: upgrade deps for fixing vulnerabilities
* [#1386](https://github.com/bnb-chain/greenfield-storage-provider/pull/1386) fix: check if BucketExtraInfo is nil
* [#1384](https://github.com/bnb-chain/greenfield-storage-provider/pull/1384) fix: fix db override

FEATURES
* [#1392](https://github.com/bnb-chain/greenfield-storage-provider/pull/1392) feat: provide recommended vgf

## v1.6.0

BUGFIXES
Expand Down
1 change: 1 addition & 0 deletions core/task/null_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,4 @@ func (t *NullTask) SetDelegateCreateObject(object *storagetypes.MsgDelegateCreat
func (t *NullTask) GetIsAgentUpload() bool { return false }
func (t *NullTask) InitReplicatePieceTask(object *storagetypes.ObjectInfo, params *storagetypes.Params, priority TPriority, timeout int64, retry int64, isAgentUpload bool) {
}
func (t *NullTask) GetIsAgentUploadTask() bool { return false }
2 changes: 2 additions & 0 deletions core/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ type ReceivePieceTask interface {
GetBucketMigration() bool
// SetBucketMigration sets the bucket migration
SetBucketMigration(bool)
// GetIsAgentUploadTask set the is agent upload flag
GetIsAgentUploadTask() bool
}

// SealObjectTask is an abstract interface to record the information for sealing object on Greenfield chain.
Expand Down
14 changes: 14 additions & 0 deletions core/task/task_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion modular/executor/execute_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ func (e *ExecuteModular) doReplicatePiece(ctx context.Context, waitGroup *sync.W
}
receive.SetSignature(signature)
replicateOnePieceTime := time.Now()

if err = retry.Do(func() error {
// timeout for single piece replication
ctxWithTimeout, cancel := context.WithTimeout(ctx, replicateTimeOut)
Expand Down
10 changes: 9 additions & 1 deletion modular/gater/admin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,20 @@ func (g *GateModular) replicateHandler(w http.ResponseWriter, r *http.Request) {
return
}

if receiveTask.GetObjectInfo() == nil || (!receiveTask.GetIsAgentUploadTask() && int(receiveTask.GetRedundancyIdx()) >= len(receiveTask.GetObjectInfo().GetChecksums())) {
log.CtxInfow(reqCtx.Context(), "debugInfo", "receiveTask", receiveTask)

if receiveTask.GetObjectInfo() == nil {
log.CtxErrorw(reqCtx.Context(), "receive task params error")
err = ErrInvalidHeader
return
}

if !receiveTask.GetIsAgentUploadTask() && int(receiveTask.GetRedundancyIdx()) >= len(receiveTask.GetObjectInfo().GetChecksums()) {
log.CtxErrorw(reqCtx.Context(), "receive task params error", "object_id", receiveTask.GetObjectInfo().Id, "object_name", receiveTask.GetObjectInfo().GetObjectName())
err = ErrInvalidHeader
return
}

readDataTime := time.Now()
data, err = io.ReadAll(r.Body)
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_read_piece_time").Observe(time.Since(readDataTime).Seconds())
Expand Down
61 changes: 61 additions & 0 deletions modular/gater/bucket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsperrors"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfspserver"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask"
coremodule "github.com/bnb-chain/greenfield-storage-provider/core/module"
modelgateway "github.com/bnb-chain/greenfield-storage-provider/model/gateway"
metadatatypes "github.com/bnb-chain/greenfield-storage-provider/modular/metadata/types"
Expand Down Expand Up @@ -444,3 +445,63 @@ func (g *GateModular) getBucketReadQuotaCountHandler(w http.ResponseWriter, r *h
}
log.CtxDebugw(ctx, "succeed to get bucket quota count", "xml_info", xmlInfo)
}

func (g *GateModular) getRecommendedVGFIDHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
reqCtx *RequestContext
)
startTime := time.Now()
defer func() {
reqCtx.Cancel()
if err != nil {
reqCtx.SetError(gfsperrors.MakeGfSpError(err))
reqCtx.SetHTTPCode(int(gfsperrors.MakeGfSpError(err).GetHttpStatusCode()))
modelgateway.MakeErrorResponse(w, gfsperrors.MakeGfSpError(err))
metrics.ReqCounter.WithLabelValues(GatewayTotalFailure).Inc()
metrics.ReqTime.WithLabelValues(GatewayTotalFailure).Observe(time.Since(startTime).Seconds())
} else {
reqCtx.SetHTTPCode(http.StatusOK)
metrics.ReqCounter.WithLabelValues(GatewayTotalSuccess).Inc()
metrics.ReqTime.WithLabelValues(GatewayTotalSuccess).Observe(time.Since(startTime).Seconds())
}
log.CtxDebugw(reqCtx.Context(), reqCtx.String())
}()

reqCtx, err = NewRequestContext(r, g)
if err != nil {
return
}

vgfID, err := g.baseApp.GfSpClient().PickVirtualGroupFamilyID(
reqCtx.Context(), &gfsptask.GfSpCreateBucketApprovalTask{
Task: &gfsptask.GfSpTask{},
CreateBucketInfo: &storagetypes.MsgCreateBucket{},
})
if err != nil {
log.CtxErrorw(reqCtx.Context(), "failed to get recommended virtual group family", "error", err)
return
}

var xmlInfo = struct {
XMLName xml.Name `xml:"VirtualGroupFamily"`
Id uint32 `xml:"Id"`
}{
Id: vgfID,
}

xmlBody, err := xml.Marshal(&xmlInfo)
if err != nil {
log.Errorw("failed to marshal xml", "error", err)
err = ErrEncodeResponseWithDetail("failed to marshal xml, error: " + err.Error())
return
}
w.Header().Set(ContentTypeHeader, ContentTypeXMLHeaderValue)

if _, err = w.Write(xmlBody); err != nil {
log.Errorw("failed to write body", "error", err)
err = ErrEncodeResponseWithDetail("failed to write body, error: " + err.Error())
return
}
log.CtxDebugw(reqCtx.Context(), "succeed to get recommended virtual group family", "xml_info", xmlInfo)
}
2 changes: 2 additions & 0 deletions modular/gater/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (

// GetApprovalPath defines get-approval path style suffix
GetApprovalPath = "/greenfield/admin/v1/get-approval"
// GetRecommendedVirtualGroupFamilyPath defines get-recommended-vgf path style suffix
GetRecommendedVirtualGroupFamilyPath = "/greenfield/admin/v1/get-recommended-vgf"
// ActionQuery defines get-approval's type, currently include create bucket and create object
ActionQuery = "action"
// UploadProgressQuery defines upload progress query, which is used to route request
Expand Down
14 changes: 10 additions & 4 deletions modular/gater/object_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ func (g *GateModular) delegatePutObjectHandler(w http.ResponseWriter, r *http.Re
err = ErrNoPermission
return
}
contentType = reqCtx.vars["content_type"]
contentType = r.Header.Get(ContentTypeHeader)
if contentType == "" {
contentType = ContentDefault
}
Expand Down Expand Up @@ -1154,10 +1154,12 @@ func (g *GateModular) delegatePutObjectHandler(w http.ResponseWriter, r *http.Re
if err != nil && !strings.Contains(err.Error(), "No such object") {
log.CtxErrorw(reqCtx.ctx, "failed to QueryObjectInfo", "error", err)
return
} else if objectInfo != nil && (objectInfo.ObjectStatus != storagetypes.OBJECT_STATUS_CREATED || (objectInfo.Creator != reqCtx.account && objectInfo.Owner != reqCtx.account) || objectInfo.PayloadSize != payloadSize) {
}
if objectInfo != nil && (objectInfo.ObjectStatus != storagetypes.OBJECT_STATUS_CREATED || (objectInfo.Creator != reqCtx.account && objectInfo.Owner != reqCtx.account) || objectInfo.PayloadSize != payloadSize) {
err = ErrInvalidQuery
return
} else {
}
if objectInfo == nil {
var visibilityInt int64
visibilityStr := queryParams.Get("visibility")
visibilityInt, err = strconv.ParseInt(visibilityStr, 10, 32)
Expand Down Expand Up @@ -1353,6 +1355,10 @@ func (g *GateModular) delegateResumablePutObjectHandler(w http.ResponseWriter, r
return
}
fingerprint = commonhash.GenerateChecksum(approvalMsg)
contentType = r.Header.Get(ContentTypeHeader)
if contentType == "" {
contentType = ContentDefault
}
startTime := time.Now()

if isUpdate {
Expand Down Expand Up @@ -1571,7 +1577,7 @@ func (g *GateModular) delegateCreateFolderHandler(w http.ResponseWriter, r *http
err = ErrNoPermission
return
}
contentType = reqCtx.vars["content_type"]
contentType = r.Header.Get(ContentTypeHeader)
if contentType == "" {
contentType = ContentDefault
}
Expand Down
4 changes: 4 additions & 0 deletions modular/gater/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
getSPMigratingBucketNumberRouterName = "GetSPMigratingBucketNumber"
verifyMigrateGVGPermissionRouterName = "VerifyMigrateGVGPermission"
getBucketSizeRouterName = "GetBucketSize"
getRecommendedVGFRouterName = "GetRecommendedVGF"
)

const (
Expand Down Expand Up @@ -120,6 +121,9 @@ func (g *GateModular) RegisterHandler(router *mux.Router) {
router.Path(GetApprovalPath).Name(approvalRouterName).Methods(http.MethodGet).HandlerFunc(g.getApprovalHandler).
Queries(ActionQuery, "{action}")

// Query recommended virtual group family for creating bucket
router.Path(GetRecommendedVirtualGroupFamilyPath).Name(getRecommendedVGFRouterName).Methods(http.MethodGet).HandlerFunc(g.getRecommendedVGFIDHandler)

// get challenge info
router.Path(GetChallengeInfoPath).Name(getChallengeInfoRouterName).Methods(http.MethodGet).HandlerFunc(g.getChallengeInfoHandler)
router.Path(GetChallengeInfoV2Path).Name(getChallengeInfoV2RouterName).Methods(http.MethodGet).HandlerFunc(g.getChallengeInfoV2Handler)
Expand Down
34 changes: 18 additions & 16 deletions modular/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,23 +508,25 @@ func (m *ManageModular) LoadTaskFromDB() error {
replicateTask.InitReplicatePieceTask(objectInfo, storageParams, m.baseApp.TaskPriority(replicateTask),
m.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), m.baseApp.TaskMaxRetry(replicateTask), meta.IsAgentUpload)

if meta.GlobalVirtualGroupID == 0 {
bucketInfo, err := m.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err)
return err
}
gvgMeta, err := m.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams)
log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err)
if err != nil {
return err
}
replicateTask.GlobalVirtualGroupId = gvgMeta.ID
replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
} else {
replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID
replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints
//retrieve objects from the database that have not completed the replicate piece, reselect gvg, and then add them to the replicate queue
bucketInfo, err := m.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err)
return err
}
gvgMeta, err := m.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams)
log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err)
if err != nil {
return err
}
replicateTask.GlobalVirtualGroupId = gvgMeta.ID
replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
meta.GlobalVirtualGroupID = gvgMeta.ID
meta.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
if err = m.baseApp.GfSpDB().UpdateUploadProgress(meta); err != nil {
log.Errorw("failed to update object task state", "task_info", replicateTask.Info(), "error", err)
}

pushErr := m.replicateQueue.Push(replicateTask)
if pushErr != nil {
log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr)
Expand Down
5 changes: 4 additions & 1 deletion modular/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ func TestManageModular_LoadTaskFromDB(t *testing.T) {
m3.EXPECT().GetBucketByBucketName(gomock.Any(), gomock.Any(), gomock.Any()).Return(
&types.Bucket{BucketInfo: &types0.BucketInfo{
GlobalVirtualGroupFamilyId: 1,
}}, nil)
}}, nil).AnyTimes()

m1.EXPECT().UpdateUploadProgress(gomock.Any()).Return(
nil).AnyTimes()

vgm := vgmgr.NewMockVirtualGroupManager(ctrl)
manage.virtualGroupManager = vgm
Expand Down
39 changes: 18 additions & 21 deletions modular/manager/task_retry_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"time"

"github.com/bnb-chain/greenfield-common/go/hash"
"github.com/bnb-chain/greenfield-storage-provider/core/piecestore"

"github.com/bnb-chain/greenfield-storage-provider/base/gfspapp"
"github.com/bnb-chain/greenfield-storage-provider/base/gfsptqueue"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask"
"github.com/bnb-chain/greenfield-storage-provider/core/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/core/spdb"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"github.com/bnb-chain/greenfield-storage-provider/store/sqldb"
Expand Down Expand Up @@ -212,26 +211,24 @@ func (s *TaskRetryScheduler) retryReplicateTask(meta *spdb.UploadObjectMeta) err
replicateTask.InitReplicatePieceTask(objectInfo, storageParams, s.manager.baseApp.TaskPriority(replicateTask),
s.manager.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), s.manager.baseApp.TaskMaxRetry(replicateTask), meta.IsAgentUpload)

// for objects that have been uploaded but not starting the replication yet, it doesn't have the GVG info the UploadObjectMeta,
// so it needs to pick one to start the replicate task.
if meta.GlobalVirtualGroupID == 0 {
bucketInfo, err := s.manager.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err)
return err
}
gvgMeta, err := s.manager.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams)
log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err)
if err != nil {
return err
}
replicateTask.GlobalVirtualGroupId = gvgMeta.ID
replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
} else {
replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID
replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints
//retrieve objects from the database that have not completed the replicate piece, reselect gvg, and then add them to the replicate queue
bucketInfo, err := s.manager.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err)
return err
}
gvgMeta, err := s.manager.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams)
log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err)
if err != nil {
return err
}
replicateTask.GlobalVirtualGroupId = gvgMeta.ID
replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
meta.GlobalVirtualGroupID = gvgMeta.ID
meta.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
if err = s.manager.baseApp.GfSpDB().UpdateUploadProgress(meta); err != nil {
log.Errorw("failed to update object task state", "task_info", replicateTask.Info(), "error", err)
}

err = s.manager.replicateQueue.Push(replicateTask)
if err != nil {
if errors.Is(err, gfsptqueue.ErrTaskQueueExceed) {
Expand Down

0 comments on commit 396cca7

Please sign in to comment.