Skip to content

Commit

Permalink
Add timeout for watch requests to member clusters to prevent request …
Browse files Browse the repository at this point in the history
…hang

Signed-off-by: xigang <wangxigang2014@gmail.com>
  • Loading branch information
xigang committed Oct 27, 2024
1 parent 331145f commit 21c10d3
Showing 1 changed file with 36 additions and 7 deletions.
43 changes: 36 additions & 7 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,44 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe
if cache == nil {
continue
}
w, err := cache.Watch(ctx, options)
if err != nil {

// The following logic adds a 30-second timeout to prevent watch requests to member clusters from hanging,
// which could cause the client watch to hang.
watchChan := make(chan watch.Interface, 1)
errChan := make(chan error, 1)

go func(cluster string) {
w, err := cache.Watch(ctx, options)
if err != nil {
select {
case errChan <- fmt.Errorf("failed to start watch for resource %v in cluster %q: %v", gvr.String(), cluster, err):
case <-ctx.Done():
}
return
}

select {
case watchChan <- w:
case <-ctx.Done():
w.Stop()
}
}(cluster)

select {
case w := <-watchChan:
mux.AddSource(w, func(e watch.Event) {
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
case err := <-errChan:
// If the watch request fails, return the error, and the client will retry.
return nil, err
case <-time.After(30 * time.Second):
// If the watch request times out, return an error, and the client will retry.
return nil, fmt.Errorf("timeout waiting for watch for resource %v in cluster %q", gvr.String(), cluster)
case <-ctx.Done():
return nil, ctx.Err()
}

mux.AddSource(w, func(e watch.Event) {
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
}
mux.Start()
return mux, nil
Expand Down

0 comments on commit 21c10d3

Please sign in to comment.