From 232d5829aa966202e382aa9257591aa6c800dc97 Mon Sep 17 00:00:00 2001 From: HEMANT KUMAR Date: Wed, 6 Nov 2024 02:25:57 +0530 Subject: [PATCH] Keep expiration times for individual IPs in FQDN cache (#6732) The FQDN cache is used to enforce NetworkPolicy rules with FQDN targets. This patch enables tracking of expiration times for individual IPs in the FQDN cache, based on the TTL received in DNS responses. Prior to this change, a "global" expiration time was used for all IPs corresponding to a given FQDN. Fixes #6722 Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 158 +++++++++------ .../controller/networkpolicy/fqdn_test.go | 180 +++++++++++++++++- .../networkpolicy/networkpolicy_controller.go | 3 +- .../networkpolicy/pod_reconciler_test.go | 2 +- 4 files changed, 276 insertions(+), 67 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 1a54290fc83..8c077c707fe 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -17,7 +17,6 @@ package networkpolicy import ( "context" "fmt" - "math" "net" "os" "regexp" @@ -32,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/types" @@ -76,11 +76,15 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool { // expirationTime of the records, which is the DNS response // receiving time plus lowest applicable TTL. type dnsMeta struct { - expirationTime time.Time // Key for responseIPs is the string representation of the IP. // It helps to quickly identify IP address updates when a // new DNS response is received. - responseIPs map[string]net.IP + responseIPs map[string]ipWithExpiration +} + +type ipWithExpiration struct { + ip net.IP + expirationTime time.Time } // subscriber is a entity that subsribes for datapath rule realization @@ -152,9 +156,11 @@ type fqdnController struct { ipv4Enabled bool ipv6Enabled bool gwPort uint32 + // clock allows injecting a custom (fake) clock in unit tests. + clock clock.Clock } -func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32) (*fqdnController, error) { +func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.WithTicker) (*fqdnController, error) { controller := &fqdnController{ ofClient: client, dirtyRuleHandler: dirtyRuleHandler, @@ -163,7 +169,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer dnsQueryQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), workqueue.TypedRateLimitingQueueConfig[string]{ - Name: "fqdn", + Name: "fqdn", + Clock: clock, }, ), dnsEntryCache: map[string]dnsMeta{}, @@ -174,6 +181,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer ipv4Enabled: v4Enabled, ipv6Enabled: v6Enabled, gwPort: gwPort, + clock: clock, } if controller.ofClient != nil { if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil { @@ -253,8 +261,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP { } for fqdn := range fqdnsMatched { if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok { - for _, ip := range dnsMeta.responseIPs { - matchedIPs = append(matchedIPs, ip) + for _, ipData := range dnsMeta.responseIPs { + matchedIPs = append(matchedIPs, ipData.ip) } } } @@ -405,72 +413,100 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { func (f *fqdnController) onDNSResponse( fqdn string, - responseIPs map[string]net.IP, - lowestTTL uint32, + newIPsWithExpiration map[string]ipWithExpiration, waitCh chan error, ) { - if len(responseIPs) == 0 { + if len(newIPsWithExpiration) == 0 { klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) if waitCh != nil { waitCh <- nil } return } - // mustCacheResponse is only true if the FQDN is already tracked by this - // controller, or it matches at least one fqdnSelectorItem from the policy rules. - // addressUpdate is only true if there has been an update in IP addresses - // corresponded with the FQDN. - mustCacheResponse, addressUpdate := false, false - recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second) + + addressUpdate := false + currentTime := f.clock.Now() + ipWithExpirationMap := make(map[string]ipWithExpiration) + + // timeToRequery sets the interval for sending a new DNS query for the FQDN, + // based on the shortest expiration time of cached IPs. + var timeToRequery *time.Time + + updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { + ipWithExpirationMap[ip] = ipMeta + if timeToRequery == nil || ipMeta.expirationTime.Before(*timeToRequery) { + timeToRequery = &ipMeta.expirationTime + } + } f.fqdnSelectorMutex.Lock() defer f.fqdnSelectorMutex.Unlock() - oldDNSMeta, exist := f.dnsEntryCache[fqdn] + cachedDNSMeta, exist := f.dnsEntryCache[fqdn] if exist { - mustCacheResponse = true - for ipStr := range responseIPs { - if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok { + // check for new IPs. + for newIPStr, newIPMeta := range newIPsWithExpiration { + if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { + updateIPWithExpiration(newIPStr, newIPMeta) addressUpdate = true - break } } - for oldIPStr, oldIP := range oldDNSMeta.responseIPs { - if _, ok := responseIPs[oldIPStr]; !ok { - if oldDNSMeta.expirationTime.Before(time.Now()) { - // This IP entry has already expired and not seen in the latest DNS response. - // It should be removed from the cache. + + // check for presence of already cached IPs in the new response. + for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { + if newIPMeta, exist := newIPsWithExpiration[cachedIPStr]; !exist { + // The IP was not found in current response. + if cachedIPMeta.expirationTime.Before(currentTime) { + // this IP is expired and stale, remove it by not including it but also signal an update to syncRules. addressUpdate = true } else { - // Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed. - responseIPs[oldIPStr] = oldIP - if oldDNSMeta.expirationTime.Before(recordTTL) { - recordTTL = oldDNSMeta.expirationTime - } + // It hasn't expired yet, so just retain it with its existing expirationTime. + updateIPWithExpiration(cachedIPStr, cachedIPMeta) } + } else { + // The cached IP is included in the current response; update its expiration time to the later of the new and existing values. + updateIPWithExpiration(cachedIPStr, ipWithExpiration{ + ip: cachedIPMeta.ip, + expirationTime: laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime), + }) } } + } else { + // This domain is being encountered for the first time. + // Check if it should be tracked by matching it against existing selectorItemToRuleIDs. + + addToCache := false for selectorItem := range f.selectorItemToRuleIDs { // Only track the FQDN if there is at least one fqdnSelectorItem matching it. if selectorItem.matches(fqdn) { - mustCacheResponse, addressUpdate = true, true + // A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but + // keep iterating to create mapping of multiple selectorItems against same FQDN. + addToCache = true f.setFQDNMatchSelector(fqdn, selectorItem) } } + if addToCache { + for ipStr, ipMeta := range newIPsWithExpiration { + updateIPWithExpiration(ipStr, ipMeta) + } + addressUpdate = true + } } - if mustCacheResponse { + + // ipWithExpirationMap remains empty and timeToRequery is nil only when FQDN doesn't match any selector. + if len(ipWithExpirationMap) > 0 { f.dnsEntryCache[fqdn] = dnsMeta{ - expirationTime: recordTTL, - responseIPs: responseIPs, + responseIPs: ipWithExpirationMap, } - f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now())) + f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } + f.syncDirtyRules(fqdn, waitCh, addressUpdate) } // onDNSResponseMsg handles a DNS response message intercepted. func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { - fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg) + fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg) if err != nil { klog.V(2).InfoS("Failed to parse DNS response") if waitCh != nil { @@ -478,7 +514,7 @@ func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { } return } - f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh) + f.onDNSResponse(fqdn, responseIPs, waitCh) } // syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response @@ -594,38 +630,39 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) { } // parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response. -func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) { +func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) { if len(msg.Question) == 0 { - return "", nil, 0, fmt.Errorf("invalid DNS message") + return "", nil, fmt.Errorf("invalid DNS message") } fqdn := strings.ToLower(msg.Question[0].Name) - lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs - responseIPs := map[string]net.IP{} + responseIPs := map[string]ipWithExpiration{} + currentTime := f.clock.Now() for _, ans := range msg.Answer { switch r := ans.(type) { case *dns.A: if f.ipv4Enabled { - responseIPs[r.A.String()] = r.A - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.A.String()] = ipWithExpiration{ + ip: r.A, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } + } case *dns.AAAA: if f.ipv6Enabled { - responseIPs[r.AAAA.String()] = r.AAAA - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.AAAA.String()] = ipWithExpiration{ + ip: r.AAAA, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } } } } if len(responseIPs) > 0 { - klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL) + klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs) } if strings.HasSuffix(fqdn, ".") { fqdn = fqdn[:len(fqdn)-1] } - return fqdn, responseIPs, lowestTTL, nil + return fqdn, responseIPs, nil } func (f *fqdnController) worker() { @@ -662,24 +699,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { var errs []error - makeResponseIPs := func(ips []net.IP) map[string]net.IP { - responseIPs := make(map[string]net.IP) + makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration { + responseIPs := make(map[string]ipWithExpiration) for _, ip := range ips { - responseIPs[ip.String()] = ip + responseIPs[ip.String()] = ipWithExpiration{ + ip: ip, + expirationTime: f.clock.Now().Add(time.Duration(defaultTTL) * time.Second), + } } return responseIPs } if f.ipv4Enabled { if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) } } if f.ipv6Enabled { if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) } @@ -818,3 +858,11 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return f.ofClient.ResumePausePacket(pktIn) } } + +// laterOf returns the later of the two given time.Time values. +func laterOf(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 46bcffa53b0..55e6a21a058 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -25,12 +25,14 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/clock" + "k8s.io/utils/ptr" "antrea.io/antrea/pkg/agent/config" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ) -func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string) (*fqdnController, *openflowtest.MockClient) { +func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string, clockToInject clock.WithTicker) (*fqdnController, *openflowtest.MockClient) { mockOFClient := openflowtest.NewMockClient(controller) mockOFClient.EXPECT().NewDNSPacketInConjunction(gomock.Any()).Return(nil).AnyTimes() dirtyRuleHandler := func(rule string) {} @@ -38,6 +40,9 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe if dnsServer != nil { dnsServerAddr = *dnsServer } + if clockToInject == nil { + clockToInject = clock.RealClock{} + } f, err := newFQDNController( mockOFClient, newIDAllocator(testAsyncDeleteInterval), @@ -46,6 +51,7 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe true, false, config.DefaultHostGatewayOFPort, + clockToInject, ) require.NoError(t, err) return f, mockOFClient @@ -164,7 +170,7 @@ func TestAddFQDNRule(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { controller := gomock.NewController(t) - f, c := newMockFQDNController(t, controller, nil) + f, c := newMockFQDNController(t, controller, nil, nil) if tt.addressAdded { c.EXPECT().AddAddressToDNSConjunction(dnsInterceptRuleID, gomock.Any()).Times(1) } @@ -325,7 +331,7 @@ func TestDeleteFQDNRule(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { controller := gomock.NewController(t) - f, c := newMockFQDNController(t, controller, nil) + f, c := newMockFQDNController(t, controller, nil, nil) c.EXPECT().AddAddressToDNSConjunction(dnsInterceptRuleID, gomock.Any()).Times(len(tt.previouslyAddedRules)) f.dnsEntryCache = tt.existingDNSCache if tt.addressRemoved { @@ -344,7 +350,7 @@ func TestDeleteFQDNRule(t *testing.T) { func TestLookupIPFallback(t *testing.T) { controller := gomock.NewController(t) dnsServer := "" // force a fallback to local resolver - f, _ := newMockFQDNController(t, controller, &dnsServer) + f, _ := newMockFQDNController(t, controller, &dnsServer, nil) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // not ideal as a unit test because it requires the ability to resolve @@ -402,10 +408,10 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { }, existingDNSCache: map[string]dnsMeta{ "test.antrea.io": { - responseIPs: map[string]net.IP{ - "127.0.0.1": net.ParseIP("127.0.0.1"), - "192.155.12.1": net.ParseIP("192.155.12.1"), - "192.158.1.38": net.ParseIP("192.158.1.38"), + responseIPs: map[string]ipWithExpiration{ + "127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()}, + "192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()}, + "192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()}, }, }, }, @@ -421,7 +427,7 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) if tc.existingSelectorItemToFQDN != nil { f.selectorItemToFQDN = tc.existingSelectorItemToFQDN } @@ -539,7 +545,7 @@ func TestSyncDirtyRules(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) var dirtyRuleSyncCalls []string f.dirtyRuleHandler = func(s string) { dirtyRuleSyncCalls = append(dirtyRuleSyncCalls, s) @@ -584,3 +590,157 @@ func TestSyncDirtyRules(t *testing.T) { }) } } + +func TestOnDNSResponse(t *testing.T) { + testFQDN := "fqdn-test-pod.lfx.test" + selectorItem1 := fqdnSelectorItem{ + matchName: testFQDN, + } + selectorItem2 := fqdnSelectorItem{ + matchName: "random-domain.com", + } + currentTime := time.Now() + + tests := []struct { + name string + existingDNSCache map[string]dnsMeta + dnsResponseIPs map[string]ipWithExpiration + expectedIPs map[string]ipWithExpiration + expectedRequeryAfter *time.Duration + mockSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] + }{ + { + name: "new IP added", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + }, + { + name: "empty DNS response", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{}, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + { + name: "old IP present in DNS response is retained with an updated TTL fetched from response", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(1 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(1 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + }, + { + name: "stale IP with expired TTL is evicted", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(-1 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + }, + { + name: "existingDNSCache is empty, the new response matches a selector.", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + }, + { + name: "existingDNSCache is empty, the new response doesn't match any selector", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: nil, + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem2: sets.New[string]("mockRule2"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeClock := newFakeClock(currentTime) + controller := gomock.NewController(t) + f, _ := newMockFQDNController(t, controller, nil, fakeClock) + f.dnsEntryCache = tc.existingDNSCache + if tc.mockSelectorToRuleIDs != nil { + f.selectorItemToRuleIDs = tc.mockSelectorToRuleIDs + } + require.Zero(t, fakeClock.TimersAdded()) + + f.onDNSResponse(testFQDN, tc.dnsResponseIPs, nil) + + cachedDnsMetaData, _ := f.dnsEntryCache[testFQDN] + + assert.Equal(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "FQDN cache doesn't match expected entries") + + if tc.expectedRequeryAfter != nil { + // Wait for the DelayingQueue to create the timer which signals that the + // DNS request for the FQDN item is ready to be sent. + require.Eventually(t, func() bool { return fakeClock.TimersAdded() > 0 }, 1*time.Second, 10*time.Millisecond) + + fakeClock.Step(*tc.expectedRequeryAfter) + // needed to avoid blocking on Get() in case of failure + require.Eventually(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 1*time.Second, 10*time.Millisecond) + item, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(item) + assert.Equal(t, testFQDN, item) + } else { + // make sure that there is no requery + assert.Never(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 100*time.Millisecond, 10*time.Millisecond) + } + }) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index ae0afe124c4..9b5308e549f 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/config" @@ -226,7 +227,7 @@ func NewNetworkPolicyController(antreaClientGetter client.AntreaClientProvider, var err error if antreaPolicyEnabled { - if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { + if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort, clock.RealClock{}); err != nil { return nil, err } diff --git a/pkg/agent/controller/networkpolicy/pod_reconciler_test.go b/pkg/agent/controller/networkpolicy/pod_reconciler_test.go index ec31137a918..390be056682 100644 --- a/pkg/agent/controller/networkpolicy/pod_reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/pod_reconciler_test.go @@ -108,7 +108,7 @@ func newCIDR(cidrStr string) *net.IPNet { } func newTestReconciler(t *testing.T, controller *gomock.Controller, ifaceStore interfacestore.InterfaceStore, ofClient *openflowtest.MockClient, v4Enabled, v6Enabled bool) *podReconciler { - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) ch := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch)}