diff --git a/client/swagger/models/model_schedule.go b/client/swagger/models/model_schedule.go index 0710593a..2451a3f0 100644 --- a/client/swagger/models/model_schedule.go +++ b/client/swagger/models/model_schedule.go @@ -33,6 +33,9 @@ type ModelSchedule struct { // error message ErrorMessage string `json:"errorMessage,omitempty"` + // force + Force bool `json:"force,omitempty"` + // http headers HTTPHeaders ModelConfigMap `json:"httpHeaders,omitempty"` diff --git a/client/swagger/models/schedule_create_request.go b/client/swagger/models/schedule_create_request.go index a3906687..7e2ff099 100644 --- a/client/swagger/models/schedule_create_request.go +++ b/client/swagger/models/schedule_create_request.go @@ -23,6 +23,9 @@ type ScheduleCreateRequest struct { // Duration in epoch or in duration format, i.e. 1500000, 2400h Duration *string `json:"duration,omitempty"` + // Force to send out deals regardless of replication restriction + Force bool `json:"force,omitempty"` + // http headers to be passed with the request (i.e. key=value) HTTPHeaders []string `json:"httpHeaders"` diff --git a/client/swagger/models/schedule_update_request.go b/client/swagger/models/schedule_update_request.go index 148c7ef8..38158dd0 100644 --- a/client/swagger/models/schedule_update_request.go +++ b/client/swagger/models/schedule_update_request.go @@ -23,6 +23,9 @@ type ScheduleUpdateRequest struct { // Duration in epoch or in duration format, i.e. 1500000, 2400h Duration *string `json:"duration,omitempty"` + // Force to send out deals regardless of replication restriction + Force bool `json:"force,omitempty"` + // http headers to be passed with the request (i.e. key=value) HTTPHeaders []string `json:"httpHeaders"` diff --git a/cmd/deal/schedule/create.go b/cmd/deal/schedule/create.go index 7a5d929b..37b3c331 100644 --- a/cmd/deal/schedule/create.go +++ b/cmd/deal/schedule/create.go @@ -145,6 +145,11 @@ var CreateCmd = &cli.Command{ Usage: "Max total deal number for this request, i.e. 1000", DefaultText: "Unlimited", }, + &cli.BoolFlag{ + Name: "force", + Category: "Restrictions", + Usage: "Force to send out deals regardless of replication restriction", + }, &cli.StringFlag{ Name: "schedule-deal-size", Category: "Scheduling", @@ -233,6 +238,7 @@ var CreateCmd = &cli.Command{ MaxPendingDealSize: c.String("max-pending-deal-size"), MaxPendingDealNumber: c.Int("max-pending-deal-number"), AllowedPieceCIDs: allowedPieceCIDs, + Force: c.Bool("force"), } lotusClient := util.NewLotusClient(c.String("lotus-api"), c.String("lotus-token")) schedule, err := schedule.Default.CreateHandler(c.Context, db, lotusClient, request) diff --git a/cmd/deal/schedule/update.go b/cmd/deal/schedule/update.go index 4a8b7431..64326e1e 100644 --- a/cmd/deal/schedule/update.go +++ b/cmd/deal/schedule/update.go @@ -89,6 +89,11 @@ var UpdateCmd = &cli.Command{ Usage: "Whether to announce the deal to IPNI", Value: true, }, + &cli.BoolFlag{ + Name: "force", + Category: "Restrictions", + Usage: "Force to send out deals regardless of replication restriction", + }, &cli.BoolFlag{ Name: "keep-unsealed", Category: "Deal Proposal", @@ -239,6 +244,9 @@ var UpdateCmd = &cli.Command{ if c.IsSet("max-pending-deal-number") { request.MaxPendingDealNumber = ptr.Of(c.Int("max-pending-deal-number")) } + if c.IsSet("force") { + request.Force = ptr.Of(c.Bool("force")) + } id, err := strconv.ParseUint(c.Args().Get(0), 10, 32) if err != nil { diff --git a/cmd/schedule_test.go b/cmd/schedule_test.go index 79c02cdd..f127c472 100644 --- a/cmd/schedule_test.go +++ b/cmd/schedule_test.go @@ -121,12 +121,12 @@ func TestScheduleCreateHandler(t *testing.T) { err := os.WriteFile(filepath.Join(tmp, "cid.txt"), []byte(testutil.TestCid.String()), 0644) require.NoError(t, err) mockHandler.On("CreateHandler", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&testSchedule, nil) - _, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule create --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider", + _, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule create --force --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider", testutil.EscapePath(filepath.Join(tmp, "cid.txt")), testutil.TestCid.String())) require.NoError(t, err) - _, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule create --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider", + _, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule create --force --allowed-piece-cid-file %s --allowed-piece-cid %s --preparation 5 --provider provider", testutil.EscapePath(filepath.Join(tmp, "cid.txt")), testutil.TestCid.String())) require.NoError(t, err) @@ -143,12 +143,12 @@ func TestScheduleUpdateHandler(t *testing.T) { err := os.WriteFile(filepath.Join(tmp, "cid.txt"), []byte(testutil.TestCid.String()), 0644) require.NoError(t, err) mockHandler.On("UpdateHandler", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&testSchedule, nil) - _, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule update -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1", + _, _, err = runner.Run(ctx, fmt.Sprintf("singularity deal schedule update --force -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1", testutil.EscapePath(filepath.Join(tmp, "cid.txt")), testutil.TestCid.String())) require.NoError(t, err) - _, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule update -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1", + _, _, err = runner.Run(ctx, fmt.Sprintf("singularity --verbose deal schedule update --force -H a=b --ipni --url-template \"http://127.0.0.1\" -d 2400h --keep-unsealed --price-per-deal 0 --price-per-gb 0 --price-per-gb-epoch 0 --start-delay 72h --verified --max-pending-deal-number 1 --max-pending-deal-size 1 --total-deal-number 1 --total-deal-size 1 --schedule-cron @daily --schedule-deal-number 1 --schedule-deal-size 1 --notes notes --allowed-piece-cid-file %s --allowed-piece-cid %s 1", testutil.EscapePath(filepath.Join(tmp, "cid.txt")), testutil.TestCid.String())) require.NoError(t, err) diff --git a/docs/en/cli-reference/deal/schedule/create.md b/docs/en/cli-reference/deal/schedule/create.md index 6ba95389..035436a4 100644 --- a/docs/en/cli-reference/deal/schedule/create.md +++ b/docs/en/cli-reference/deal/schedule/create.md @@ -64,6 +64,7 @@ OPTIONS: --allowed-piece-cid value, --piece-cid value [ --allowed-piece-cid value, --piece-cid value ] List of allowed piece CIDs in this schedule (default: Any) --allowed-piece-cid-file value, --piece-cid-file value [ --allowed-piece-cid-file value, --piece-cid-file value ] List of files that contains a list of piece CIDs to allow + --force Force to send out deals regardless of replication restriction (default: false) --max-pending-deal-number value, --pending-number value Max pending deal number overall for this request, i.e. 100TiB (default: Unlimited) --max-pending-deal-size value, --pending-size value Max pending deal sizes overall for this request, i.e. 1000 (default: Unlimited) --total-deal-number value, --total-number value Max total deal number for this request, i.e. 1000 (default: Unlimited) diff --git a/docs/en/cli-reference/deal/schedule/update.md b/docs/en/cli-reference/deal/schedule/update.md index 1297b5d6..a882c1c6 100644 --- a/docs/en/cli-reference/deal/schedule/update.md +++ b/docs/en/cli-reference/deal/schedule/update.md @@ -62,6 +62,7 @@ OPTIONS: --allowed-piece-cid value, --piece-cid value [ --allowed-piece-cid value, --piece-cid value ] List of allowed piece CIDs in this schedule. Append only. --allowed-piece-cid-file value, --piece-cid-file value [ --allowed-piece-cid-file value, --piece-cid-file value ] List of files that contains a list of piece CIDs to allow. Append only. + --force Force to send out deals regardless of replication restriction (default: false) --max-pending-deal-number value, --pending-number value Max pending deal number overall for this request, i.e. 100TiB (default: 0) --max-pending-deal-size value, --pending-size value Max pending deal sizes overall for this request, i.e. 1000 --total-deal-number value, --total-number value Max total deal number for this request, i.e. 1000 (default: 0) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 8c8179b8..c4b6d69c 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -6235,6 +6235,9 @@ const docTemplate = `{ "errorMessage": { "type": "string" }, + "force": { + "type": "boolean" + }, "httpHeaders": { "$ref": "#/definitions/model.ConfigMap" }, @@ -6404,6 +6407,10 @@ const docTemplate = `{ "type": "string", "default": "12840h" }, + "force": { + "description": "Force to send out deals regardless of replication restriction", + "type": "boolean" + }, "httpHeaders": { "description": "http headers to be passed with the request (i.e. key=value)", "type": "array", @@ -6511,6 +6518,10 @@ const docTemplate = `{ "type": "string", "default": "12840h" }, + "force": { + "description": "Force to send out deals regardless of replication restriction", + "type": "boolean" + }, "httpHeaders": { "description": "http headers to be passed with the request (i.e. key=value)", "type": "array", diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 3c5de7ad..942bb1bf 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -6228,6 +6228,9 @@ "errorMessage": { "type": "string" }, + "force": { + "type": "boolean" + }, "httpHeaders": { "$ref": "#/definitions/model.ConfigMap" }, @@ -6397,6 +6400,10 @@ "type": "string", "default": "12840h" }, + "force": { + "description": "Force to send out deals regardless of replication restriction", + "type": "boolean" + }, "httpHeaders": { "description": "http headers to be passed with the request (i.e. key=value)", "type": "array", @@ -6504,6 +6511,10 @@ "type": "string", "default": "12840h" }, + "force": { + "description": "Force to send out deals regardless of replication restriction", + "type": "boolean" + }, "httpHeaders": { "description": "http headers to be passed with the request (i.e. key=value)", "type": "array", diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 8eb24953..5cb6a26b 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -548,6 +548,8 @@ definitions: type: integer errorMessage: type: string + force: + type: boolean httpHeaders: $ref: '#/definitions/model.ConfigMap' id: @@ -663,6 +665,9 @@ definitions: default: 12840h description: Duration in epoch or in duration format, i.e. 1500000, 2400h type: string + force: + description: Force to send out deals regardless of replication restriction + type: boolean httpHeaders: description: http headers to be passed with the request (i.e. key=value) items: @@ -746,6 +751,9 @@ definitions: default: 12840h description: Duration in epoch or in duration format, i.e. 1500000, 2400h type: string + force: + description: Force to send out deals regardless of replication restriction + type: boolean httpHeaders: description: http headers to be passed with the request (i.e. key=value) items: diff --git a/handler/deal/schedule/create.go b/handler/deal/schedule/create.go index 33e5a866..5dae3611 100644 --- a/handler/deal/schedule/create.go +++ b/handler/deal/schedule/create.go @@ -45,6 +45,7 @@ type CreateRequest struct { MaxPendingDealNumber int `json:"maxPendingDealNumber"` // Max pending deal number //nolint:tagliatelle AllowedPieceCIDs []string `json:"allowedPieceCids"` // Allowed piece CIDs in this schedule + Force bool `json:"force"` // Force to send out deals regardless of replication restriction } func argToDuration(s string) (time.Duration, error) { @@ -204,6 +205,7 @@ func (DefaultHandler) CreateHandler( PricePerGB: request.PricePerGB, PricePerDeal: request.PricePerDeal, ScheduleCronPerpetual: request.ScheduleCronPerpetual, + Force: request.Force, } if err := database.DoRetry(ctx, func() error { diff --git a/handler/deal/schedule/create_test.go b/handler/deal/schedule/create_test.go index 40d36092..4e1da85f 100644 --- a/handler/deal/schedule/create_test.go +++ b/handler/deal/schedule/create_test.go @@ -73,6 +73,7 @@ var createRequest = CreateRequest{ MaxPendingDealNumber: 100, AllowedPieceCIDs: []string{"baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq"}, ScheduleCronPerpetual: true, + Force: true, } func TestCreateHandler_DatasetNotFound(t *testing.T) { @@ -259,6 +260,7 @@ func TestCreateHandler_Success(t *testing.T) { schedule, err := Default.CreateHandler(ctx, db, getMockLotusClient(), createRequest) require.NoError(t, err) require.NotNil(t, schedule) + require.True(t, createRequest.Force) }) }) } diff --git a/handler/deal/schedule/update.go b/handler/deal/schedule/update.go index 0e893c8c..bada5105 100644 --- a/handler/deal/schedule/update.go +++ b/handler/deal/schedule/update.go @@ -38,6 +38,7 @@ type UpdateRequest struct { MaxPendingDealNumber *int `json:"maxPendingDealNumber"` // Max pending deal number //nolint:tagliatelle AllowedPieceCIDs []string `json:"allowedPieceCids"` // Allowed piece CIDs in this schedule + Force *bool `json:"force"` // Force to send out deals regardless of replication restriction } // UpdateHandler modifies an existing schedule record based on the provided update request. @@ -231,6 +232,10 @@ func (DefaultHandler) UpdateHandler( updates["max_pending_deal_size"] = maxPendingDealSize } + if request.Force != nil { + updates["force"] = *request.Force + } + err = db.Model(&schedule).Updates(updates).Error if err != nil { return nil, errors.WithStack(err) diff --git a/handler/deal/schedule/update_test.go b/handler/deal/schedule/update_test.go index d80037dd..2492ed1a 100644 --- a/handler/deal/schedule/update_test.go +++ b/handler/deal/schedule/update_test.go @@ -33,6 +33,7 @@ var updateRequest = UpdateRequest{ MaxPendingDealNumber: ptr.Of(100), AllowedPieceCIDs: []string{"baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq"}, ScheduleCronPerpetual: ptr.Of(true), + Force: ptr.Of(true), } func TestUpdateHandler_DatasetNotFound(t *testing.T) { @@ -202,6 +203,7 @@ func TestUpdateHandler_Success(t *testing.T) { schedule, err := Default.UpdateHandler(ctx, db, 1, updateRequest) require.NoError(t, err) require.NotNil(t, schedule) + require.True(t, schedule.Force) }) } diff --git a/model/replication.go b/model/replication.go index ab559b4b..4bcada94 100644 --- a/model/replication.go +++ b/model/replication.go @@ -134,6 +134,7 @@ type Schedule struct { Notes string `json:"notes"` ErrorMessage string `json:"errorMessage" table:"verbose"` AllowedPieceCIDs StringSlice `gorm:"type:JSON;column:allowed_piece_cids" json:"allowedPieceCids" table:"verbose"` + Force bool `json:"force"` // Associations PreparationID PreparationID `json:"preparationId"` diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index 6c193db6..9eabcd4a 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -294,26 +294,25 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) return "", nil } + existingPieceCIDQuery := db.Table("deals").Select("piece_cid"). + Where("provider = ? AND state IN (?)", + schedule.Provider, + []model.DealState{ + model.DealProposed, model.DealPublished, model.DealActive, + }) + if schedule.Force { + existingPieceCIDQuery = db.Table("deals").Select("piece_cid").Where("schedule_id = ?", schedule.ID) + } if len(allowedPieceCIDs) == 0 { err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?)", underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }), - db.Table("deals").Select("piece_cid"). - Where("provider = ? AND state IN (?)", - schedule.Provider, - []model.DealState{ - model.DealProposed, model.DealPublished, model.DealActive, - })).First(&car).Error + existingPieceCIDQuery).First(&car).Error } else { pieceCIDChunks := util.ChunkSlice(allowedPieceCIDs, util.BatchSize) for _, pieceCIDChunk := range pieceCIDChunks { err = db.Where("attachment_id IN ? AND piece_cid NOT IN (?) AND piece_cid IN ?", underscore.Map(attachments, func(a model.SourceAttachment) model.SourceAttachmentID { return a.ID }), - db.Table("deals").Select("piece_cid"). - Where("provider = ? AND state IN (?)", - schedule.Provider, - []model.DealState{ - model.DealProposed, model.DealPublished, model.DealActive, - }), pieceCIDChunk).First(&car).Error + existingPieceCIDQuery, pieceCIDChunk).First(&car).Error if err == nil { break } diff --git a/service/dealpusher/dealpusher_test.go b/service/dealpusher/dealpusher_test.go index c771ef29..96e3559a 100644 --- a/service/dealpusher/dealpusher_test.go +++ b/service/dealpusher/dealpusher_test.go @@ -362,6 +362,65 @@ func TestDealMakerService_ScheduleWithConstraints(t *testing.T) { }) } +func TestDealmakerService_Force(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1) + require.NoError(t, err) + mockDealmaker := new(MockDealMaker) + service.dealMaker = mockDealmaker + pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024)) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + provider := "f0miner" + client := "f0client" + schedule := model.Schedule{ + Preparation: &model.Preparation{ + Wallets: []model.Wallet{ + { + ID: client, Address: "f0xx", + }, + }, + SourceStorages: []model.Storage{{}}, + }, + State: model.ScheduleActive, + Provider: provider, + Force: true, + } + err = db.Create(&schedule).Error + require.NoError(t, err) + mockDealmaker.On("MakeDeal", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Deal{ + ScheduleID: &schedule.ID, + }, nil) + + err = db.Create([]model.Car{ + { + AttachmentID: ptr.Of(model.SourceAttachmentID(1)), + PreparationID: 1, + PieceCID: pieceCID, + PieceSize: 1024, + StoragePath: "0", + }, + }).Error + require.NoError(t, err) + err = db.Create([]model.Deal{ + { + Provider: provider, + ClientID: client, + PieceCID: pieceCID, + PieceSize: 1024, + State: model.DealProposed, + }, + }).Error + require.NoError(t, err) + service.runOnce(ctx) + time.Sleep(time.Second) + var deals []model.Deal + err = db.Find(&deals).Error + require.NoError(t, err) + require.Len(t, deals, 2) + }) +} + func TestDealMakerService_NewScheduleOneOff(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { service, err := NewDealPusher(db, "https://api.node.glif.io", "", 1)