Skip to content

Commit

Permalink
Flag to force sending out deal (#393)
Browse files Browse the repository at this point in the history
resolves #364
  • Loading branch information
xinaxu authored Oct 19, 2023
1 parent 2775d73 commit 4077d58
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 16 deletions.
3 changes: 3 additions & 0 deletions client/swagger/models/model_schedule.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/swagger/models/schedule_create_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/swagger/models/schedule_update_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cmd/deal/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ var CreateCmd = &cli.Command{
Usage: "Max total deal number for this request, i.e. 1000",
DefaultText: "Unlimited",
},
&cli.BoolFlag{
Name: "force",
Category: "Restrictions",
Usage: "Force to send out deals regardless of replication restriction",
},
&cli.StringFlag{
Name: "schedule-deal-size",
Category: "Scheduling",
Expand Down Expand Up @@ -233,6 +238,7 @@ var CreateCmd = &cli.Command{
MaxPendingDealSize: c.String("max-pending-deal-size"),
MaxPendingDealNumber: c.Int("max-pending-deal-number"),
AllowedPieceCIDs: allowedPieceCIDs,
Force: c.Bool("force"),
}
lotusClient := util.NewLotusClient(c.String("lotus-api"), c.String("lotus-token"))
schedule, err := schedule.Default.CreateHandler(c.Context, db, lotusClient, request)
Expand Down
8 changes: 8 additions & 0 deletions cmd/deal/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ var UpdateCmd = &cli.Command{
Usage: "Whether to announce the deal to IPNI",
Value: true,
},
&cli.BoolFlag{
Name: "force",
Category: "Restrictions",
Usage: "Force to send out deals regardless of replication restriction",
},
&cli.BoolFlag{
Name: "keep-unsealed",
Category: "Deal Proposal",
Expand Down Expand Up @@ -239,6 +244,9 @@ var UpdateCmd = &cli.Command{
if c.IsSet("max-pending-deal-number") {
request.MaxPendingDealNumber = ptr.Of(c.Int("max-pending-deal-number"))
}
if c.IsSet("force") {
request.Force = ptr.Of(c.Bool("force"))
}

id, err := strconv.ParseUint(c.Args().Get(0), 10, 32)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cmd/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func TestScheduleCreateHandler(t *testing.T) {
err := os.WriteFile(filepath.Join(tmp, "cid.txt"), []byte(testutil.TestCid.String()), 0644)
require.NoError(t, err)
mockHandler.On("CreateHandler", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&testSchedule, nil)
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule create --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider",
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule create --force --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider",
testutil.EscapePath(filepath.Join(tmp, "cid.txt")),
testutil.TestCid.String()))
require.NoError(t, err)

_, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule create --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider",
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule create --force --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider",
testutil.EscapePath(filepath.Join(tmp, "cid.txt")),
testutil.TestCid.String()))
require.NoError(t, err)
Expand All @@ -143,12 +143,12 @@ func TestScheduleUpdateHandler(t *testing.T) {
err := os.WriteFile(filepath.Join(tmp, "cid.txt"), []byte(testutil.TestCid.String()), 0644)
require.NoError(t, err)
mockHandler.On("UpdateHandler", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&testSchedule, nil)
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule update -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1",
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule update --force -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1",
testutil.EscapePath(filepath.Join(tmp, "cid.txt")),
testutil.TestCid.String()))
require.NoError(t, err)

_, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule update -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1",
_, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule update --force -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1",
testutil.EscapePath(filepath.Join(tmp, "cid.txt")),
testutil.TestCid.String()))
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions docs/en/cli-reference/deal/schedule/create.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/en/cli-reference/deal/schedule/update.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions docs/swagger/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions docs/swagger/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions docs/swagger/swagger.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions handler/deal/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CreateRequest struct {
MaxPendingDealNumber int `json:"maxPendingDealNumber"` // Max pending deal number
//nolint:tagliatelle
AllowedPieceCIDs []string `json:"allowedPieceCids"` // Allowed piece CIDs in this schedule
Force bool `json:"force"` // Force to send out deals regardless of replication restriction
}

func argToDuration(s string) (time.Duration, error) {
Expand Down Expand Up @@ -204,6 +205,7 @@ func (DefaultHandler) CreateHandler(
PricePerGB: request.PricePerGB,
PricePerDeal: request.PricePerDeal,
ScheduleCronPerpetual: request.ScheduleCronPerpetual,
Force: request.Force,
}

if err := database.DoRetry(ctx, func() error {
Expand Down
2 changes: 2 additions & 0 deletions handler/deal/schedule/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var createRequest = CreateRequest{
MaxPendingDealNumber: 100,
AllowedPieceCIDs: []string{"baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq"},
ScheduleCronPerpetual: true,
Force: true,
}

func TestCreateHandler_DatasetNotFound(t *testing.T) {
Expand Down Expand Up @@ -259,6 +260,7 @@ func TestCreateHandler_Success(t *testing.T) {
schedule, err := Default.CreateHandler(ctx, db, getMockLotusClient(), createRequest)
require.NoError(t, err)
require.NotNil(t, schedule)
require.True(t, createRequest.Force)
})
})
}
Expand Down
5 changes: 5 additions & 0 deletions handler/deal/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type UpdateRequest struct {
MaxPendingDealNumber *int `json:"maxPendingDealNumber"` // Max pending deal number
//nolint:tagliatelle
AllowedPieceCIDs []string `json:"allowedPieceCids"` // Allowed piece CIDs in this schedule
Force *bool `json:"force"` // Force to send out deals regardless of replication restriction
}

// UpdateHandler modifies an existing schedule record based on the provided update request.
Expand Down Expand Up @@ -231,6 +232,10 @@ func (DefaultHandler) UpdateHandler(
updates["max_pending_deal_size"] = maxPendingDealSize
}

if request.Force != nil {
updates["force"] = *request.Force
}

err = db.Model(&schedule).Updates(updates).Error
if err != nil {
return nil, errors.WithStack(err)
Expand Down
2 changes: 2 additions & 0 deletions handler/deal/schedule/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var updateRequest = UpdateRequest{
MaxPendingDealNumber: ptr.Of(100),
AllowedPieceCIDs: []string{"baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq"},
ScheduleCronPerpetual: ptr.Of(true),
Force: ptr.Of(true),
}

func TestUpdateHandler_DatasetNotFound(t *testing.T) {
Expand Down Expand Up @@ -202,6 +203,7 @@ func TestUpdateHandler_Success(t *testing.T) {
schedule, err := Default.UpdateHandler(ctx, db, 1, updateRequest)
require.NoError(t, err)
require.NotNil(t, schedule)
require.True(t, schedule.Force)
})
}

Expand Down
1 change: 1 addition & 0 deletions model/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type Schedule struct {
Notes string `json:"notes"`
ErrorMessage string `json:"errorMessage" table:"verbose"`
AllowedPieceCIDs StringSlice `gorm:"type:JSON;column:allowed_piece_cids" json:"allowedPieceCids" table:"verbose"`
Force bool `json:"force"`

// Associations
PreparationID PreparationID `json:"preparationId"`
Expand Down
23 changes: 11 additions & 12 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,26 +294,25 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
return "", nil
}

existingPieceCIDQuery := db.Table("deals").Select("piece_cid").
Where("provider = ? AND state IN (?)",
schedule.Provider,
[]model.DealState{
model.DealProposed, model.DealPublished, model.DealActive,
})
if schedule.Force {
existingPieceCIDQuery = db.Table("deals").Select("piece_cid").Where("schedule_id = ?", schedule.ID)
}
if len(allowedPieceCIDs) == 0 {
err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?)",
underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }),
db.Table("deals").Select("piece_cid").
Where("provider = ? AND state IN (?)",
schedule.Provider,
[]model.DealState{
model.DealProposed, model.DealPublished, model.DealActive,
})).First(&car).Error
existingPieceCIDQuery).First(&car).Error
} else {
pieceCIDChunks := util.ChunkSlice(allowedPieceCIDs, util.BatchSize)
for _, pieceCIDChunk := range pieceCIDChunks {
err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?) AND piece_cid IN ?",
underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }),
db.Table("deals").Select("piece_cid").
Where("provider = ? AND state IN (?)",
schedule.Provider,
[]model.DealState{
model.DealProposed, model.DealPublished, model.DealActive,
}), pieceCIDChunk).First(&car).Error
existingPieceCIDQuery, pieceCIDChunk).First(&car).Error
if err == nil {
break
}
Expand Down
59 changes: 59 additions & 0 deletions service/dealpusher/dealpusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,65 @@ func TestDealMakerService_ScheduleWithConstraints(t *testing.T) {
})
}

func TestDealmakerService_Force(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
require.NoError(t, err)
mockDealmaker := new(MockDealMaker)
service.dealMaker = mockDealmaker
pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provider := "f0miner"
client := "f0client"
schedule := model.Schedule{
Preparation: &model.Preparation{
Wallets: []model.Wallet{
{
ID: client, Address: "f0xx",
},
},
SourceStorages: []model.Storage{{}},
},
State: model.ScheduleActive,
Provider: provider,
Force: true,
}
err = db.Create(&schedule).Error
require.NoError(t, err)
mockDealmaker.On("MakeDeal", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Deal{
ScheduleID: &schedule.ID,
}, nil)

err = db.Create([]model.Car{
{
AttachmentID: ptr.Of(model.SourceAttachmentID(1)),
PreparationID: 1,
PieceCID: pieceCID,
PieceSize: 1024,
StoragePath: "0",
},
}).Error
require.NoError(t, err)
err = db.Create([]model.Deal{
{
Provider: provider,
ClientID: client,
PieceCID: pieceCID,
PieceSize: 1024,
State: model.DealProposed,
},
}).Error
require.NoError(t, err)
service.runOnce(ctx)
time.Sleep(time.Second)
var deals []model.Deal
err = db.Find(&deals).Error
require.NoError(t, err)
require.Len(t, deals, 2)
})
}

func TestDealMakerService_NewScheduleOneOff(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)
Expand Down

0 comments on commit 4077d58

Please sign in to comment.