From cfe6d3fc6d3cc4f93ef703360b674a6a50b01550 Mon Sep 17 00:00:00 2001 From: constwz Date: Tue, 23 Apr 2024 16:31:48 +0800 Subject: [PATCH] fix: delegate upload content type --- .github/workflows/docker-develop.yml | 2 +- base/gfspclient/gater.go | 3 --- modular/executor/execute_replicate.go | 5 ----- modular/gater/object_handler.go | 6 +++++- modular/manager/manage_task.go | 3 --- modular/manager/manager.go | 2 -- modular/manager/task_retry_scheduler.go | 1 - 7 files changed, 6 insertions(+), 16 deletions(-) diff --git a/.github/workflows/docker-develop.yml b/.github/workflows/docker-develop.yml index 506d21070..34dc7e8c8 100644 --- a/.github/workflows/docker-develop.yml +++ b/.github/workflows/docker-develop.yml @@ -2,7 +2,7 @@ name: Docker-CI on: push: - branches: [ develop, master,fix-v1.6.0] + branches: [ develop, master] env: IMAGE_NAME: ghcr.io/bnb-chain/greenfield-storage-provider-invisible diff --git a/base/gfspclient/gater.go b/base/gfspclient/gater.go index e6f925364..898fa48e4 100644 --- a/base/gfspclient/gater.go +++ b/base/gfspclient/gater.go @@ -80,8 +80,6 @@ func (s *GfSpClient) ReplicatePieceToSecondary(ctx context.Context, endpoint str } receiveTask := receive.(*gfsptask.GfSpReceivePieceTask) - log.CtxInfow(ctx, "gateway debug info", "receiveTask", receiveTask) - log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask()) receiveMsg, err := json.Marshal(receiveTask) if err != nil { log.CtxErrorw(ctx, "failed to replicate piece to secondary sp due to marshal error", "error", err) @@ -89,7 +87,6 @@ func (s *GfSpClient) ReplicatePieceToSecondary(ctx context.Context, endpoint str } receiveHeader := hex.EncodeToString(receiveMsg) req.Header.Add(GnfdReceiveMsgHeader, receiveHeader) - log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "header", receiveHeader) resp, err := s.HTTPClient(ctx).Do(req) if err != nil { return err diff --git a/modular/executor/execute_replicate.go b/modular/executor/execute_replicate.go index 536d2430f..99c147e3f 100644 --- a/modular/executor/execute_replicate.go +++ b/modular/executor/execute_replicate.go @@ -38,7 +38,6 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core objectInfo *storagetypes.ObjectInfo ) startReplicateTime := time.Now() - log.Debugw("Debug Info", "object_id", task.GetObjectInfo().Id, "IsAgentUpload", task.GetIsAgentUpload()) defer func() { task.SetError(err) metrics.PerfPutObjectTime.WithLabelValues("background_replicate_cost").Observe(time.Since(startReplicateTime).Seconds()) @@ -129,7 +128,6 @@ func (e *ExecuteModular) handleReplicatePiece(ctx context.Context, rTask coretas ) log.Debugw("replicate task info", "task_sps", rTask.GetSecondaryEndpoints()) - log.Debugw("Debug Info", "object_id", rTask.GetObjectInfo().Id, "IsAgentUpload", rTask.GetIsAgentUpload()) doReplicateECPiece := func(ctx context.Context, segIdx uint32, data [][]byte, errChan chan error) { log.Debug("start to replicate ec piece") @@ -274,7 +272,6 @@ func (e *ExecuteModular) doReplicatePiece(ctx context.Context, waitGroup *sync.W log.CtxErrorw(ctx, "ReplicatePieceTask object info is empty") return ErrInvalidReplicatePieceTask } - log.Debugw("Debug Info", "object_id", rTask.GetObjectInfo().Id, "IsAgentUpload", rTask.GetIsAgentUpload()) rTask.AppendLog(fmt.Sprintf("executor-begin-replicate-piece-sIdx:%d-rIdx-%d", segmentIdx, redundancyIdx)) startTime := time.Now() defer func() { @@ -320,12 +317,10 @@ func (e *ExecuteModular) doReplicatePiece(ctx context.Context, waitGroup *sync.W } receive.SetSignature(signature) replicateOnePieceTime := time.Now() - log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask()) if err = retry.Do(func() error { // timeout for single piece replication ctxWithTimeout, cancel := context.WithTimeout(ctx, replicateTimeOut) defer cancel() - log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask()) return e.baseApp.GfSpClient().ReplicatePieceToSecondary(ctxWithTimeout, spEndpoint, receive, data) }, RtyAttem, RtyDelay, diff --git a/modular/gater/object_handler.go b/modular/gater/object_handler.go index a1a283a16..6328897cf 100644 --- a/modular/gater/object_handler.go +++ b/modular/gater/object_handler.go @@ -1355,6 +1355,10 @@ func (g *GateModular) delegateResumablePutObjectHandler(w http.ResponseWriter, r return } fingerprint = commonhash.GenerateChecksum(approvalMsg) + contentType = r.Header.Get(ContentTypeHeader) + if contentType == "" { + contentType = ContentDefault + } startTime := time.Now() if isUpdate { @@ -1573,7 +1577,7 @@ func (g *GateModular) delegateCreateFolderHandler(w http.ResponseWriter, r *http err = ErrNoPermission return } - contentType = reqCtx.vars["content_type"] + contentType = r.Header.Get(ContentTypeHeader) if contentType == "" { contentType = ContentDefault } diff --git a/modular/manager/manage_task.go b/modular/manager/manage_task.go index 1cad41790..6938aff06 100644 --- a/modular/manager/manage_task.go +++ b/modular/manager/manage_task.go @@ -159,7 +159,6 @@ func (m *ManageModular) pickGVGAndReplicate(ctx context.Context, vgfID uint32, t replicateTask.SetLogs(task.GetLogs()) replicateTask.SetRetry(task.GetRetry()) replicateTask.AppendLog("manager-create-replicate-task") - log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUploadTask()) err = m.replicateQueue.Push(replicateTask) if err != nil { log.CtxErrorw(ctx, "failed to push replicate piece task to queue", "error", err) @@ -268,7 +267,6 @@ func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, replicateTask.GlobalVirtualGroupId = gvgMeta.ID replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints log.Debugw("replicate task info", "task", replicateTask, "gvg_meta", gvgMeta) - log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUploadTask()) err = m.replicateQueue.Push(replicateTask) if err != nil { log.CtxErrorw(ctx, "failed to push replicate piece task to queue", "error", err) @@ -451,7 +449,6 @@ func (m *ManageModular) handleFailedReplicatePieceTask(ctx context.Context, hand "excludedGVGs", shouldFreezeGVGs, "error", rePickAndReplicateErr) return rePickAndReplicateErr } else { - log.Debugw("Debug Info", "object_id", handleTask.GetObjectInfo().Id, "IsAgentUpload", handleTask.GetIsAgentUpload()) pushErr := m.replicateQueue.Push(handleTask) log.CtxDebugw(ctx, "push task again to retry", "task_info", handleTask.Info(), "error", pushErr) return pushErr diff --git a/modular/manager/manager.go b/modular/manager/manager.go index 0e7f25b51..2a072aed1 100644 --- a/modular/manager/manager.go +++ b/modular/manager/manager.go @@ -525,7 +525,6 @@ func (m *ManageModular) LoadTaskFromDB() error { replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints } - log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUpload()) pushErr := m.replicateQueue.Push(replicateTask) if pushErr != nil { log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr) @@ -999,7 +998,6 @@ func (m *ManageModular) backUpTask() { func (m *ManageModular) repushTask(reserved task.Task) { switch t := reserved.(type) { case *gfsptask.GfSpReplicatePieceTask: - log.Debugw("Debug Info", "object_id", t.GetObjectInfo().Id, "IsAgentUpload", t.GetIsAgentUpload()) err := m.replicateQueue.Push(t) log.Infow("retry push replicate task to queue after dispatching", "error", err) case *gfsptask.GfSpSealObjectTask: diff --git a/modular/manager/task_retry_scheduler.go b/modular/manager/task_retry_scheduler.go index 16b263300..d0cd8c31c 100644 --- a/modular/manager/task_retry_scheduler.go +++ b/modular/manager/task_retry_scheduler.go @@ -231,7 +231,6 @@ func (s *TaskRetryScheduler) retryReplicateTask(meta *spdb.UploadObjectMeta) err replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints } - log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUpload()) err = s.manager.replicateQueue.Push(replicateTask) if err != nil { if errors.Is(err, gfsptqueue.ErrTaskQueueExceed) {