diff --git a/service.go b/service.go index 6f1a1b0..5345835 100644 --- a/service.go +++ b/service.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/lotus/node/config" cbor "github.com/ipfs/go-ipld-cbor" "github.com/samber/lo" + "go.uber.org/zap" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -93,7 +94,7 @@ func NewService(ctx context.Context, db *gorm.DB, api API, maxWait time.Duration } func (s *Service) Shutdown(_ context.Context) error { - log.Infof("waiting for services to shutdown") + log.Info("waiting for services to shutdown") if err := s.watchingMessages.Range(func(k uint, wm *watchMessage) error { wm.Cancel() @@ -109,17 +110,39 @@ func (s *Service) Shutdown(_ context.Context) error { func (s *Service) runProcessor(ctx context.Context) { defer s.wg.Done() - log.Infof("starting processor") + log.Info("starting extend processor") tk := time.NewTicker(5 * time.Second) defer tk.Stop() + + processingRequests := NewSafeMap[uint, struct{}]() for { select { case <-ctx.Done(): - log.Infof("context done, stopping processor") + log.Info("context done, stopping extend processor") return case <-tk.C: - if err := s.processRequest(ctx); err != nil { - log.Errorf("failed to process request: %s", err) + var requests []*Request + if err := s.db.Preload(clause.Associations). + Where("status = ?", RequestStatusCreated). + Find(&requests).Error; err != nil { + log.Errorf("failed to get request: %s", err) + continue + } + if len(requests) == 0 { + continue + } + log.Debugf("found %d new created requests", len(requests)) + for _, request := range requests { + if processingRequests.Has(request.ID) { + continue + } + processingRequests.Set(request.ID, struct{}{}) + go func(request *Request) { + defer processingRequests.Delete(request.ID) + if err := s.processRequest(ctx, request); err != nil { + log.Errorf("failed to process request %d: %s", request.ID, err) + } + }(request) } } } @@ -127,31 +150,49 @@ func (s *Service) runProcessor(ctx context.Context) { func (s *Service) runMessageChecker(ctx context.Context) { defer s.wg.Done() - log.Infof("starting message checker") + log.Info("starting message checker") tk := time.NewTicker(builtin.EpochDurationSeconds * time.Second) // check interval is 30 seconds defer tk.Stop() + + checkingRequests := NewSafeMap[uint, struct{}]() for { select { case <-ctx.Done(): - log.Infof("context done, stopping message checker") + log.Info("context done, stopping message checker") return case <-tk.C: - if err := s.checkMessage(ctx); err != nil { - log.Errorf("failed to check messages: %s", err) + var requests []*Request + if err := s.db.Preload(clause.Associations). + Where("status = ?", RequestStatusPending). + Find(&requests).Error; err != nil { + log.Errorf("failed to get request: %s", err) + continue + } + if len(requests) == 0 { + continue + } + log.Debugf("found %d pending requests", len(requests)) + + for _, request := range requests { + if checkingRequests.Has(request.ID) { + continue + } + checkingRequests.Set(request.ID, struct{}{}) + go func(request *Request) { + defer checkingRequests.Delete(request.ID) + if err := s.checkMessage(ctx, request); err != nil { + log.Errorf("failed to process request %d: %s", request.ID, err) + } + }(request) } } } } -func (s *Service) processRequest(ctx context.Context) error { - var request Request - if err := s.db.First(&request, "status = ?", "created").Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil - } - return fmt.Errorf("failed to get request: %w", err) - } - log.Infow("processing request", "id", request.ID, +func (s *Service) processRequest(ctx context.Context, request *Request) error { + sLog := log.With("id", request.ID) + + sLog.Infow("processing request", "miner", request.Miner.Address, "from", request.From, "to", request.To, "extension", request.Extension, "new_expiration", request.NewExpiration, "tolerance", request.Tolerance, "dry_run", request.DryRun) @@ -161,7 +202,7 @@ func (s *Service) processRequest(ctx context.Context) error { start := time.Now() result, terr := s.extend(ctx, request.Miner.Address, fromEpoch, toEpoch, request.Extension, request.NewExpiration, request.Tolerance, - request.MaxSectors, request.DryRun) + request.MaxSectors, request.DryRun, sLog) if terr != nil { request.Error = terr.Error() @@ -170,7 +211,7 @@ func (s *Service) processRequest(ctx context.Context) error { if terr == nil { request.Status = RequestStatusSuccess // no sectors need to extend } else { - log.Errorf("processing request %d failed: %s, took: %s", request.ID, terr, time.Since(start)) + sLog.Errorf("processing request failed: %s, took: %s", terr, time.Since(start)) request.Status = RequestStatusFailed } } else { @@ -180,7 +221,7 @@ func (s *Service) processRequest(ctx context.Context) error { if request.DryRun { b, err := json.MarshalIndent(result.DryRuns, "", " ") if err != nil { - log.Errorf("failed to marshal dry runs: %s", err) + sLog.Errorf("failed to marshal dry runs: %s", err) } else { request.DryRunResult = string(b) } @@ -196,10 +237,10 @@ func (s *Service) processRequest(ctx context.Context) error { request.Messages = result.Messages } } - log.Infof("processing request %d, status: %s, took: %s", request.ID, request.Status, time.Since(start)) + sLog.Infow("processing request", "status", request.Status, "took", time.Since(start)) request.Took = time.Since(start).Seconds() if err := s.db.Save(&request).Error; err != nil { - log.Errorf("failed to save request: %s", err) + sLog.Errorf("failed to save request: %s", err) } return nil } @@ -260,7 +301,7 @@ func (s *Service) createRequest(ctx context.Context, minerAddr address.Address, func (s *Service) getRequest(_ context.Context, id uint) (*Request, error) { var request Request - if err := s.db.Preload("Messages").First(&request, id).Error; err != nil { + if err := s.db.Preload(clause.Associations).First(&request, id).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, fmt.Errorf("request not found") } @@ -271,13 +312,13 @@ func (s *Service) getRequest(_ context.Context, id uint) (*Request, error) { func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi.ChainEpoch, extension *abi.ChainEpoch, newExpiration *abi.ChainEpoch, tolerance abi.ChainEpoch, - maxSectors int, dryRun bool) (*extendResult, error) { + maxSectors int, dryRun bool, sLog *zap.SugaredLogger) (*extendResult, error) { if extension == nil && newExpiration == nil { return nil, fmt.Errorf("either extension or new expiration must be set") } - log.Infow("extending sectors", "miner", addr, "from", from, "to", to, "extension", extension, "new_expiration", newExpiration, "dry_run", dryRun) + sLog.Infow("extending sectors", "miner", addr, "from", from, "to", to, "extension", extension, "new_expiration", newExpiration, "dry_run", dryRun) head, err := s.api.ChainHead(ctx) if err != nil { @@ -307,7 +348,7 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi if err != nil { return nil, fmt.Errorf("failed to get active set: %w", err) } - log.Infof("got active set with %d sectors, took: %s", len(activeSet), time.Since(time1)) + sLog.Infow("got active set", "took", time.Since(time1), "sectors", len(activeSet)) var sectors []abi.SectorNumber activeSectorsInfo := make(map[abi.SectorNumber]*miner.SectorOnChainInfo, len(activeSet)) for _, info := range activeSet { @@ -316,9 +357,9 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi sectors = append(sectors, info.SectorNumber) } } - log.Infof("found %d sectors to extend", len(sectors)) + sLog.Infof("found %d sectors to extend", len(sectors)) if len(sectors) == 0 { - log.Infof("nothing to extend, break") + sLog.Info("nothing to extend, break") return &extendResult{}, nil } @@ -349,7 +390,7 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi }); err != nil { return nil, err } - log.Infof("got active sectors location, took: %s", time.Since(time2)) + sLog.Infof("got active sectors location, took: %s", time.Since(time2)) maxExtension, err := policy.GetMaxSectorExpirationExtension(nv) if err != nil { @@ -381,10 +422,10 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi if !found { return nil, fmt.Errorf("location for sector %d not found", si.SectorNumber) } - log.Debugf("extending sector %d from %d to %d", si.SectorNumber, si.Expiration, newExp) + sLog.Debugf("extending sector %d from %d to %d", si.SectorNumber, si.Expiration, newExp) es, found := extensions[*l] if !found { - log.Debugw(si.SectorNumber.String(), "found", found, "exp", si.Expiration, "newExp", newExp) + sLog.Debugw(si.SectorNumber.String(), "found", found, "exp", si.Expiration, "newExp", newExp) ne := make(map[abi.ChainEpoch][]abi.SectorNumber) ne[newExp] = []abi.SectorNumber{si.SectorNumber} extensions[*l] = ne @@ -394,12 +435,12 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi if withinTolerance(tolerance)(newExp, exp) { es[exp] = append(es[exp], si.SectorNumber) added = true - log.Debugw(si.SectorNumber.String(), "tolerance", tolerance, "exp", si.Expiration, "newExp", exp) + sLog.Debugw(si.SectorNumber.String(), "tolerance", tolerance, "exp", si.Expiration, "newExp", exp) break } } if !added { - log.Debugw(si.SectorNumber.String(), "exp", si.Expiration, "newExp", newExp) + sLog.Debugw(si.SectorNumber.String(), "exp", si.Expiration, "newExp", newExp) es[newExp] = []abi.SectorNumber{si.SectorNumber} } } @@ -436,7 +477,7 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi for l, exts := range extensions { for newExp, numbers := range exts { d1 += len(numbers) - log.Debugf("extending sectors for partition %d-%d, extend %d sectors to %d", l.Deadline, l.Partition, len(numbers), newExp) + sLog.Debugf("extending sectors for partition %d-%d, extend %d sectors to %d", l.Deadline, l.Partition, len(numbers), newExp) for len(numbers) > addrSectors { var currentNumbers []abi.SectorNumber currentNumbers, numbers = numbers[:addrSectors], numbers[addrSectors:] @@ -469,13 +510,13 @@ func (s *Service) extend(ctx context.Context, addr address.Address, from, to abi } } } - log.Infof("total %d sectors to extend, %d cannot extend", d1, len(cannotExtendSectors)) + sLog.Infof("total %d sectors to extend, %d cannot extend", d1, len(cannotExtendSectors)) // if we have any sectors, then one last append is needed here if scount != 0 { params = append(params, p) } if len(params) == 0 { - log.Info("nothing to extend") + sLog.Info("nothing to extend") return &extendResult{ TotalSectors: len(sectors), }, nil @@ -501,7 +542,7 @@ loopParams: } scount += int(count) } - log.Infof("extending %d sectors in message %d", scount, i) + sLog.Infof("extending %d sectors on message index: %d", scount, i) stotal += scount if dryRun { @@ -529,15 +570,15 @@ loopParams: Params: sp, }, nil) if err != nil { - log.Errorf("failed to push message %d: %s", i, err) + sLog.Errorf("failed to push message %d: %s", i, err) errMsgs = append(errMsgs, fmt.Errorf("mpool push message: %w", err).Error()) continue } published += scount - log.Infow("pushed extend message", "cid", smsg.Cid(), "to", addr, "from", mi.Worker, "sectors", scount) + sLog.Infow("pushed extend message", "cid", smsg.Cid(), "to", addr, "from", mi.Worker, "sectors", scount) exts, err := NewExtension2FromParams(params[i]) if err != nil { - log.Errorf("creating extension2 from params: %s", err) + sLog.Errorf("creating extension2 from params: %s", err) } msg := &Message{ @@ -549,14 +590,14 @@ loopParams: messages = append(messages, msg) } if stotal != len(sectors) { - log.Warnw("not all sectors are build to extend", "total", len(sectors), "extended", stotal) + sLog.Warnw("not all sectors are build to extend", "total", len(sectors), "extended", stotal) } else { - log.Infof("all sectors are build to extend: %d", stotal) + sLog.Infof("all sectors are build to extend: %d", stotal) } if published != len(sectors) { - log.Warnw("not all sectors are published", "total", len(sectors), "published", published) + sLog.Warnw("not all sectors are published", "total", len(sectors), "published", published) } else { - log.Infof("all sectors are published: %d", published) + sLog.Infof("all sectors are published: %d", published) } result := &extendResult{ @@ -660,15 +701,9 @@ func (s *Service) speedupRequest(ctx context.Context, id uint, mss *api.MessageS return nil } -func (s *Service) checkMessage(ctx context.Context) error { - var request Request - if err := s.db.Preload("Messages").First(&request, "status = ?", RequestStatusPending).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil - } - return fmt.Errorf("failed to get request: %w", err) - } - log.Debugw("check request messages", "id", request.ID, +func (s *Service) checkMessage(ctx context.Context, request *Request) error { + sLog := log.With("request", request.ID) + sLog.Debugw("check request messages", "miner", request.Miner.Address, "message count", len(request.MessageCids())) var allOnChain = true @@ -696,10 +731,10 @@ func (s *Service) checkMessage(ctx context.Context) error { if allSuccess { if len(request.Error) == 0 { request.Status = RequestStatusSuccess - log.Infof("request [%d] all messages on chain, status: %s", request.ID, request.Status) + sLog.Infof("request all messages on chain, status: %s", request.Status) } else { request.Status = RequestStatusPartfailed - log.Infof("request [%d] all messages on chain, but got preceding error: %s", request.ID, request.Error) + sLog.Infof("request all messages on chain, but got preceding error: %s", request.Error) } } else { if len(errorMsgs) == len(request.Messages) { @@ -712,7 +747,7 @@ func (s *Service) checkMessage(ctx context.Context) error { errorMsgs = append([]string{request.Error}, errorMsgs...) } request.Error = strings.Join(errorMsgs, ";") - log.Infof("request %d messages on chain, status: %s, failed: %d", request.ID, request.Status, len(errorMsgs)) + sLog.Infof("messages on chain, status: %s, failed: %d", request.Status, len(errorMsgs)) } if err := s.db.Save(&request).Error; err != nil { return fmt.Errorf("failed to save request: %w", err) @@ -725,25 +760,28 @@ func (s *Service) watchMessage(ctx context.Context, id uint) { ctx, cancel := context.WithCancel(ctx) defer cancel() + sLog := log.With("message", id) + var msg Message if err := s.db.First(&msg, id).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - log.Errorf("message not found: %d", id) + sLog.Error("message not found") } else { - log.Errorf("failed to get message: %s", err) + sLog.Errorf("failed to get message: %s", err) } return } + sLog = sLog.With("cid", msg.Cid.String(), "request", msg.RequestID) if s.watchingMessages.Has(msg.ID) { - log.Warnf("message [%d]%s is already watching", msg.ID, msg.Cid.String()) + sLog.Warn("message is already watching") return } wm := newWatchMessage(&msg) s.watchingMessages.Set(msg.ID, wm) defer s.watchingMessages.Delete(msg.ID) - log.Infow("watching message", "id", msg.ID, "request", msg.RequestID, "cid", msg.Cid) + sLog.Info("watching message") resultChan := make(chan *api.MsgLookup, 1) errorChan := make(chan error, 1) @@ -759,32 +797,32 @@ func (s *Service) watchMessage(ctx context.Context, id uint) { select { case receipt := <-resultChan: - log.Infof("message [%d]%s on chain", msg.ID, msg.Cid) + sLog.Info("message on chain") msg.OnChain = true msg.ExitCode = receipt.Receipt.ExitCode msg.Return = receipt.Receipt.Return msg.GasUsed = receipt.Receipt.GasUsed if err := s.db.Save(&msg).Error; err != nil { - log.Errorf("failed to save message: %s", err) + sLog.Errorf("failed to save message: %s", err) } case err := <-errorChan: - log.Errorf("failed to wait message: %s", err) + sLog.Errorf("failed to wait message: %s", err) case <-ctx.Done(): - log.Infof("context done, stopping watching message") + sLog.Info("context done, stopping watching message") case <-wm.cancelCh: - log.Infof("cancel watching message [%d]%s", msg.ID, msg.Cid) + sLog.Info("cancel watching message") } } func (s *Service) runPendingChecker(ctx context.Context) { defer s.wg.Done() - log.Infof("starting pending checker") + log.Info("starting pending checker") tk := time.NewTicker(builtin.EpochDurationSeconds * time.Second) // check interval is 30 seconds defer tk.Stop() for { select { case <-ctx.Done(): - log.Infof("context done, stopping pending checker") + log.Info("context done, stopping pending checker") return case <-tk.C: func() { @@ -825,8 +863,8 @@ func (s *Service) replaceMessage(ctx context.Context, id uint, mss *api.MessageS } return fmt.Errorf("failed to get request: %w", err) } - - log.Infow("replacing message", "id", id, "cid", m.Cid.String()) + sLog := log.With("request", m.RequestID, "id", id, "cid", m.Cid.String()) + sLog.Info("replacing message") // get the message from the chain cm, err := s.api.ChainGetMessage(ctx, m.Cid.Cid) @@ -904,7 +942,7 @@ func (s *Service) replaceMessage(ctx context.Context, id uint, mss *api.MessageS if err := tx.Create(newMsg).Error; err != nil { return err } - log.Infow("replaced message", "old id", id, "new id", newMsg.ID, "old cid", m.Cid, "new cid", newID) + sLog.Infow("replaced message", "new id", newMsg.ID, "new cid", newID) // remove old watching message if owm, ok := s.watchingMessages.Get(id); ok { @@ -1005,7 +1043,7 @@ func warpActiveSectors(ctx context.Context, api API, addr address.Address, cache if cache { v, err := activeSetFromCache(addr) if err == nil { - log.Warn("using cached active set") + log.Warnf("%s using cached active set", addr) return v, nil } }