Skip to content

Commit

Permalink
harmony storage (#11647)
Browse files Browse the repository at this point in the history
* harmony storage

* clean-up in lotus-provider

* lints

* Do() api change

* rm Do() chg and storagemgr

* harmony: Address storage iface review

---------

Co-authored-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
snadrus and magik6k authored Feb 27, 2024
1 parent 066a9fd commit d3ca54d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions cmd/lotus-provider/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
}
}
}

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions itests/harmonytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func withDbSetup(t *testing.T, f func(*kit.TestMiner)) {
f(miner)
}

func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
func (t *task1) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
if !stillOwned() {
return false, errors.New("Why not still owned?")
}
t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock()
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[taskID]))
return true, nil
}
func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
Expand Down Expand Up @@ -104,8 +104,8 @@ type passthru struct {
adder func(add harmonytask.AddTaskFunc)
}

func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
func (t *passthru) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(taskID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return t.canAccept(list, e)
Expand Down
25 changes: 25 additions & 0 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,34 @@ top:
return false
}

releaseStorage := func() {
}
if h.TaskTypeDetails.Cost.Storage != nil {
if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
return false
}
releaseStorage = func() {
if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(int(*tID)); err != nil {
log.Errorw("Could not release storage", "error", err)
}
}
}

// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)

releaseStorage()
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
releaseStorage()

var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
Expand Down Expand Up @@ -134,6 +152,7 @@ top:
}
h.Count.Add(-1)

releaseStorage()
h.recordCompletion(*tID, workStart, done, doErr)
if done {
for _, fs := range h.TaskEngine.follows[h.Name] { // Do we know of any follows for this task type?
Expand Down Expand Up @@ -247,5 +266,11 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
}

if h.TaskTypeDetails.Cost.Storage != nil {
if !h.TaskTypeDetails.Cost.Storage.HasCapacity() {
return errors.New("Did not accept " + h.Name + " task: out of available Storage")
}
}
return nil
}
13 changes: 13 additions & 0 deletions lib/harmony/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ type Resources struct {
Gpu float64
Ram uint64
MachineID int
Storage
}

// Optional Storage management.
type Storage interface {
HasCapacity() bool

// This allows some other system to claim space for this task.
Claim(taskID int) error

// This allows some other system to consider the task done.
// It's up to the caller to remove the data, if that applies.
MarkComplete(taskID int) error
}
type Reg struct {
Resources
Expand Down

0 comments on commit d3ca54d

Please sign in to comment.