Skip to content

Commit

Permalink
Update internal/client/v1/client/discoverer/discover.go
Browse files Browse the repository at this point in the history
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Signed-off-by: Yusuke Kato <kpango@vdaas.org>
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango and coderabbitai[bot] committed Nov 8, 2024
1 parent 3f18b2b commit 7b9ac32
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
15 changes: 15 additions & 0 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,24 @@ func (c *client) discover(ctx context.Context) (err error) {
}
return nil, false, err
}
log.Debugf("discovered with backoff connected IPs: %v", connected)
return nil, false, nil
})
} else {
connected, err = c.updateDiscoveryInfo(ctx)
log.Debugf("discovered and updated connected IPs: %v", connected)
}
if err != nil {
log.Warnf("failed to discover addrs from discoverer API, error: %v,\ttrying to dns discovery from %s...", err, c.dns)
connected, err = c.dnsDiscovery(ctx)
if err != nil {
return err
}
log.Debugf("discovered with dns connected IPs: %v", connected)
}

oldAddrs := c.GetAddrs(ctx)
log.Debugf("discovered connected IPs: %v", connected)
c.addrs.Store(&connected)
return c.disconnectOldAddrs(ctx, oldAddrs, connected)
}
Expand Down Expand Up @@ -358,11 +362,16 @@ func (c *client) discoverNodes(ctx context.Context) (nodes *payload.Info_Nodes,
return nil, err
}
slices.SortFunc(nodes.Nodes, func(left, right *payload.Info_Node) int {
if left.GetMemory() == nil || right.GetMemory() == nil {
return 0 // Default comparison value; adjust as needed.
}
return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage())
})
return nodes, nil
}

var dId atomic.Uint64

func (c *client) discoverAddrs(
ctx context.Context, nodes *payload.Info_Nodes,
) (addrs []string, err error) {
Expand All @@ -379,6 +388,9 @@ func (c *client) discoverAddrs(
maxPodLen = l
}
slices.SortFunc(nodes.Nodes[i].Pods.Pods, func(left, right *payload.Info_Pod) int {
if left.GetMemory() == nil || right.GetMemory() == nil {
return 0 // Default comparison value; adjust as needed.
}
return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage())
})
}
Expand All @@ -387,6 +399,7 @@ func (c *client) discoverAddrs(
if err == nil && nbody != nil {
log.Debug(string(nbody))
}
id := dId.Add(0)
addrs = make([]string, 0, podLength)
for i := 0; i < maxPodLen; i++ {
for _, node := range nodes.GetNodes() {
Expand All @@ -403,13 +416,15 @@ func (c *client) discoverAddrs(
log.Debugf("resource based discovery connect from discoverer API for addr = %s failed %v", addr, errors.ErrAddrCouldNotDiscover(err, addr))
err = nil
} else {
log.Debugf("%d:\tdiscovered healthy pods IP: %s, Node Usage: %f, Pod Usage: %f", id, addr, node.GetMemory().GetUsage(), node.GetPods().GetPods()[i].GetMemory().GetUsage())
addrs = append(addrs, addr)
}

}
}
}
}
log.Debugf("%d:\tdiscovered healthy pods IPs: %v", id, addrs)
return addrs, nil
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
discoverer.WithDiscoverDuration(cfg.Corrector.Discoverer.Duration),
discoverer.WithOptions(acOpts...),
discoverer.WithNodeName(cfg.Corrector.NodeName),
discoverer.WithOnDiscoverFunc(func(_ context.Context, _ discoverer.Client, addrs []string) error {
slices.Reverse(addrs)
return nil
}),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 7b9ac32

Please sign in to comment.