diff --git a/scheduler/pkg/agent/server_test.go b/scheduler/pkg/agent/server_test.go index e95783e91a..75b6adf56b 100644 --- a/scheduler/pkg/agent/server_test.go +++ b/scheduler/pkg/agent/server_test.go @@ -12,6 +12,7 @@ package agent import ( "context" "fmt" + "sync" "testing" "time" @@ -1042,16 +1043,23 @@ func TestSubscribe(t *testing.T) { } time.Sleep(100 * time.Millisecond) + mu := sync.Mutex{} streams := make([]*grpc.ClientConn, 0) for _, a := range test.agents { go func(id uint32) { conn := getStream(id, context.Background(), port) + mu.Lock() streams = append(streams, conn) + mu.Unlock() }(a.id) } - time.Sleep(500 * time.Millisecond) - + maxCount := 10 + count := 0 + for len(server.agents) != test.expectedAgentsCount && count < maxCount { + time.Sleep(100 * time.Millisecond) + count++ + } g.Expect(len(server.agents)).To(Equal(test.expectedAgentsCount)) for idx, s := range streams { @@ -1062,8 +1070,11 @@ func TestSubscribe(t *testing.T) { }(idx, s) } - time.Sleep(10 * time.Second) - + count = 0 + for len(server.agents) != test.expectedAgentsCountAfterClose && count < maxCount { + time.Sleep(100 * time.Millisecond) + count++ + } g.Expect(len(server.agents)).To(Equal(test.expectedAgentsCountAfterClose)) server.StopAgentStreams() diff --git a/scheduler/pkg/kafka/dataflow/server_test.go b/scheduler/pkg/kafka/dataflow/server_test.go index ee862e53c1..e52780a9a1 100644 --- a/scheduler/pkg/kafka/dataflow/server_test.go +++ b/scheduler/pkg/kafka/dataflow/server_test.go @@ -13,6 +13,7 @@ import ( "context" "fmt" "os" + "sync" "testing" "time" @@ -639,7 +640,6 @@ func TestPipelineRebalance(t *testing.T) { func TestPipelineSubscribe(t *testing.T) { g := NewGomegaWithT(t) - type ag struct { id uint32 doClose bool @@ -729,16 +729,23 @@ func TestPipelineSubscribe(t *testing.T) { time.Sleep(100 * time.Millisecond) + mu := sync.Mutex{} streams := make([]*grpc.ClientConn, 0) for _, a := range test.agents { go func(id uint32) { conn := getStream(id, context.Background(), port) + mu.Lock() streams = append(streams, conn) + mu.Unlock() }(a.id) } - time.Sleep(700 * time.Millisecond) - + maxCount := 10 + count := 0 + for len(s.streams) != test.expectedAgentsCount && count < maxCount { + time.Sleep(100 * time.Millisecond) + count++ + } g.Expect(len(s.streams)).To(Equal(test.expectedAgentsCount)) for idx, s := range streams { @@ -749,8 +756,11 @@ func TestPipelineSubscribe(t *testing.T) { }(idx, s) } - time.Sleep(10 * time.Second) - + count = 0 + for len(s.streams) != test.expectedAgentsCountAfterClose && count < maxCount { + time.Sleep(100 * time.Millisecond) + count++ + } g.Expect(len(s.streams)).To(Equal(test.expectedAgentsCountAfterClose)) s.StopSendPipelineEvents()