Skip to content

Commit

Permalink
refactor(helpers): update "NewSubtaskStateManager" (#7861) (#7866)
Browse files Browse the repository at this point in the history
* refactor(helpers): update "NewSubtaskStateManager"

* fix(helper): fix unit test

* refactor(helpers): rename calculateStateManagerIncrementalMode

Co-authored-by: Lynwee <1507509064@qq.com>
  • Loading branch information
github-actions[bot] and d4x1 authored Aug 8, 2024
1 parent 7a805c1 commit 382c58d
Showing 1 changed file with 72 additions and 33 deletions.
105 changes: 72 additions & 33 deletions backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
)

Expand Down Expand Up @@ -75,57 +75,96 @@ type SubtaskStateManager struct {
// NewSubtaskStateManager create a new SubtaskStateManager
func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskStateManager, err errors.Error) {
db := args.GetDal()
syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
plugin := args.SubTaskContext.TaskContext().GetName()
subtask := args.SubTaskContext.GetName()
// load sync policy and make sure it is not nil
syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
if syncPolicy == nil {
syncPolicy = &models.SyncPolicy{}
}

plugin := args.SubTaskContext.TaskContext().GetName()
subtask := args.SubTaskContext.GetName()
params := args.GetRawDataParams()
// load the previous state from the database
state := &models.SubtaskState{}
err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params))
preState, err := loadPreviousState(db, plugin, subtask, params)
if err != nil {
if db.IsErrorNotFound(err) {
state = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: params,
}
err = nil
} else {
err = errors.Default.Wrap(err, "failed to load the previous subtask state")
return
}
return
}
// fullsync by default

isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig))

now := time.Now()
stateManager = &SubtaskStateManager{
db: db,
state: state,
state: preState,
syncPolicy: syncPolicy,
isIncremental: false,
since: syncPolicy.TimeAfter,
isIncremental: isIncremental,
since: since,
until: &now,
config: utils.ToJsonString(args.SubtaskConfig),
}
// fallback to the previous timeAfter if no new value
if stateManager.since == nil {
stateManager.since = state.TimeAfter
stateManager.since = preState.TimeAfter
}
// if fullsync is set or no previous success start time, we are in the full sync mode
if syncPolicy.FullSync || state.PrevStartedAt == nil {
return
return
}

func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.SubtaskState, errors.Error) {
// load the previous state from the database
preState := &models.SubtaskState{}
err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params))
if err != nil {
if db.IsErrorNotFound(err) {
preState = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load the previous subtask state")
}
}

return preState, nil
}

// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time.
func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
if preState == nil || syncPolicy == nil {
panic("preState or syncPolicy is nil")
}

// User click 'Collect Data in Full Refresh Mode'
// No matter whether there is a successful pipeline.
if syncPolicy.FullSync {
return false, syncPolicy.TimeAfter
}
// if timeAfter is not set or NOT before the previous value, we are in the incremental mode
if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
// and the previous config is the same as the current config
(state.PrevConfig == "" || state.PrevConfig == stateManager.config) {
stateManager.isIncremental = true
stateManager.since = state.PrevStartedAt
// No previous success state means this pipeline has never been executed.
if preState.PrevStartedAt == nil {
return false, syncPolicy.TimeAfter
}
return
// When subtask config has changed, state manager should NOT in incremental mode.
if subTaskConfigHasChanged(preState, newSubtaskConfig) {
return false, syncPolicy.TimeAfter
}
// There is a sync policy and sync policy is earlier than latest successful pipeline's timeAfter
if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil && syncPolicy.TimeAfter.Before(*preState.TimeAfter) {
return false, syncPolicy.TimeAfter
}

// No need to do a full refresh, run task incrementally.
// New state manager's start time is previous state's finished time.
// But there is no such field, so use previous state's PrevStartedAt time.
return true, preState.PrevStartedAt
}

// subTaskConfigHasChanged checks whether the previous sub-task config is the same as the current sub-task config
// When plugin's scope config changes, Subtask's config may change.
func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig string) bool {
if preState == nil {
return true
}
preConfig := preState.PrevConfig
return preConfig != "" && preConfig != newSubtaskConfig
}

func (c *SubtaskStateManager) IsIncremental() bool {
Expand Down

0 comments on commit 382c58d

Please sign in to comment.