From 4bcc1f8a9f496688058fa1cf3c8974830b2a4bb6 Mon Sep 17 00:00:00 2001 From: Hemant Date: Sat, 26 Oct 2024 03:15:04 +0530 Subject: [PATCH 1/2] Modified tests to check if the fqdn has been queued to dnsQueryQueue, when expected. Added test case to check where existingDNSCache is not impacted by a dns response. nit changes as shared in review. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 69 ++++++++---------- .../controller/networkpolicy/fqdn_test.go | 73 +++++++++++++------ 2 files changed, 81 insertions(+), 61 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 1b4c8c88052..081523d9915 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -78,10 +78,10 @@ type dnsMeta struct { // Key for responseIPs is the string representation of the IP. // It helps to quickly identify IP address updates when a // new DNS response is received. - responseIPs map[string]ipWithTTL + responseIPs map[string]ipWithExpiration } -type ipWithTTL struct { +type ipWithExpiration struct { ip net.IP expirationTime time.Time } @@ -408,10 +408,10 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { func (f *fqdnController) onDNSResponse( fqdn string, - newIPWithTTLMap map[string]ipWithTTL, + newIPWithExpiration map[string]ipWithExpiration, waitCh chan error, ) { - if len(newIPWithTTLMap) == 0 { + if len(newIPWithExpiration) == 0 { klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) if waitCh != nil { waitCh <- nil @@ -421,15 +421,15 @@ func (f *fqdnController) onDNSResponse( addressUpdate := false currentTime := time.Now() - ipWithTTLMap := make(map[string]ipWithTTL) + ipWithExpirationMap := make(map[string]ipWithExpiration) - //minTimeToRequery establishes a maximum reference time for tracking the minimum re-query time to DNS, as IPs expire. - minTimeToRequery := currentTime.Add(24 * time.Hour) + // timeToRequery establishes a maximum reference time for tracking the minimum re-query time to DNS, as IPs expire. + timeToRequery := currentTime.Add(24 * time.Hour) - addIPtoIPWithTTLMap := func(ip string, ipMeta ipWithTTL) { - ipWithTTLMap[ip] = ipMeta - if ipMeta.expirationTime.Before(minTimeToRequery) { - minTimeToRequery = ipMeta.expirationTime + updateMapWithIP := func(ip string, ipMeta ipWithExpiration) { + ipWithExpirationMap[ip] = ipMeta + if ipMeta.expirationTime.Before(timeToRequery) { + timeToRequery = ipMeta.expirationTime } } @@ -438,30 +438,28 @@ func (f *fqdnController) onDNSResponse( cachedDNSMeta, exist := f.dnsEntryCache[fqdn] if exist { // check for new IPs. - for newIPStr, newIPMeta := range newIPWithTTLMap { + for newIPStr, newIPMeta := range newIPWithExpiration { if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { - addIPtoIPWithTTLMap(newIPStr, newIPMeta) - if !addressUpdate { - addressUpdate = true - } + updateMapWithIP(newIPStr, newIPMeta) + addressUpdate = true } } // check for presence of already cached IPs in the new response. for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { - if newIPMeta, exist := newIPWithTTLMap[cachedIPStr]; !exist { + if newIPMeta, exist := newIPWithExpiration[cachedIPStr]; !exist { // An already cached IP has been found in new response. if cachedIPMeta.expirationTime.Before(currentTime) { // this IP is expired and stale, remove it by not including it but also signal an update to syncRules. addressUpdate = true } else { // It hasn't expired yet, so just retain it with its existing expirationTime. - addIPtoIPWithTTLMap(cachedIPStr, cachedIPMeta) + updateMapWithIP(cachedIPStr, cachedIPMeta) } } else { // This already cached IP is part of the current response, so update it with max time between received time and its old cached time. expTime := laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime) - addIPtoIPWithTTLMap(cachedIPStr, ipWithTTL{ + updateMapWithIP(cachedIPStr, ipWithExpiration{ ip: cachedIPMeta.ip, expirationTime: expTime, }) @@ -470,9 +468,7 @@ func (f *fqdnController) onDNSResponse( } else { // First time seeing this domain. - // check if this needs to be tracked, by checking its presence in the datapath rules. - // If a FQDN policy had been applied then there must be rule records but because it's - // not in cache hence its FQDN:SelectorItem mapping may not be present. + // check if this needs to be tracked, by checking if it matches any Antrea FQDN policy selectors. // iterate over current rules mapping addToCache := false @@ -481,26 +477,23 @@ func (f *fqdnController) onDNSResponse( if selectorItem.matches(fqdn) { // A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but // keep iterating to create mapping of multiple selectorItems against same FQDN. - if !addToCache { - addToCache = true - } - + addToCache = true f.setFQDNMatchSelector(fqdn, selectorItem) } } if addToCache { - for ipStr, ipMeta := range newIPWithTTLMap { - addIPtoIPWithTTLMap(ipStr, ipMeta) + for ipStr, ipMeta := range newIPWithExpiration { + updateMapWithIP(ipStr, ipMeta) } addressUpdate = true } } - if len(ipWithTTLMap) > 0 { + if len(ipWithExpirationMap) > 0 { f.dnsEntryCache[fqdn] = dnsMeta{ - responseIPs: ipWithTTLMap, + responseIPs: ipWithExpirationMap, } - f.dnsQueryQueue.AddAfter(fqdn, minTimeToRequery.Sub(currentTime)) + f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } f.syncDirtyRules(fqdn, waitCh, addressUpdate) @@ -632,18 +625,18 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) { } // parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response. -func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithTTL, error) { +func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) { if len(msg.Question) == 0 { return "", nil, fmt.Errorf("invalid DNS message") } fqdn := strings.ToLower(msg.Question[0].Name) - responseIPs := map[string]ipWithTTL{} + responseIPs := map[string]ipWithExpiration{} currentTime := time.Now() for _, ans := range msg.Answer { switch r := ans.(type) { case *dns.A: if f.ipv4Enabled { - responseIPs[r.A.String()] = ipWithTTL{ + responseIPs[r.A.String()] = ipWithExpiration{ ip: r.A, expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } @@ -651,7 +644,7 @@ func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWi } case *dns.AAAA: if f.ipv6Enabled { - responseIPs[r.AAAA.String()] = ipWithTTL{ + responseIPs[r.AAAA.String()] = ipWithExpiration{ ip: r.AAAA, expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } @@ -701,10 +694,10 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { var errs []error - makeResponseIPs := func(ips []net.IP) map[string]ipWithTTL { - responseIPs := make(map[string]ipWithTTL) + makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration { + responseIPs := make(map[string]ipWithExpiration) for _, ip := range ips { - responseIPs[ip.String()] = ipWithTTL{ + responseIPs[ip.String()] = ipWithExpiration{ ip: ip, expirationTime: time.Now().Add(time.Duration(defaultTTL) * time.Second), } diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 4f977cf2caa..72b5e0c89c9 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -17,6 +17,7 @@ package networkpolicy import ( "context" "fmt" + "k8s.io/client-go/util/workqueue" "net" "testing" "time" @@ -38,6 +39,12 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe if dnsServer != nil { dnsServerAddr = *dnsServer } + mockDnsQueryQueue := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 100*time.Millisecond), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "fqdn", + }, + ) f, err := newFQDNController( mockOFClient, newIDAllocator(testAsyncDeleteInterval), @@ -47,6 +54,7 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe false, config.DefaultHostGatewayOFPort, ) + f.dnsQueryQueue = mockDnsQueryQueue require.NoError(t, err) return f, mockOFClient } @@ -402,7 +410,7 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { }, existingDNSCache: map[string]dnsMeta{ "test.antrea.io": { - responseIPs: map[string]ipWithTTL{ + responseIPs: map[string]ipWithExpiration{ "127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()}, "192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()}, "192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()}, @@ -586,24 +594,26 @@ func TestSyncDirtyRules(t *testing.T) { } func TestOnDNSResponse(t *testing.T) { + testFQDN := "fqdn-test-pod.lfx.test" currentTime := time.Now() tests := []struct { name string existingDNSCache map[string]dnsMeta - dnsResponseIPs map[string]ipWithTTL + dnsResponseIPs map[string]ipWithExpiration expectedIPs map[string]time.Time + expectedItem string }{ { name: "new IP added", existingDNSCache: map[string]dnsMeta{ "fqdn-test-pod.lfx.test": { - responseIPs: map[string]ipWithTTL{ + responseIPs: map[string]ipWithExpiration{ "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, }, }, }, - dnsResponseIPs: map[string]ipWithTTL{ + dnsResponseIPs: map[string]ipWithExpiration{ "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, }, expectedIPs: map[string]time.Time{ @@ -611,56 +621,74 @@ func TestOnDNSResponse(t *testing.T) { "192.1.1.2": currentTime.Add(6 * time.Second), "192.1.1.3": currentTime.Add(10 * time.Second), }, + expectedItem: testFQDN, + }, + { + name: "existing DNS cache not impacted", + existingDNSCache: map[string]dnsMeta{ + "fqdn-test-pod.lfx.test": { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{}, + expectedIPs: map[string]time.Time{ + "192.1.1.1": currentTime.Add(5 * time.Second), + "192.1.1.2": currentTime.Add(6 * time.Second), + }, }, { name: "old IP resent in DNS response is retained with an updated TTL fetched from response", existingDNSCache: map[string]dnsMeta{ "fqdn-test-pod.lfx.test": { - responseIPs: map[string]ipWithTTL{ + responseIPs: map[string]ipWithExpiration{ "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, }, }, }, - dnsResponseIPs: map[string]ipWithTTL{ + dnsResponseIPs: map[string]ipWithExpiration{ "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, }, expectedIPs: map[string]time.Time{ "192.1.1.1": currentTime.Add(10 * time.Second), }, }, - { name: "stale IP with expired TTL is evicted", existingDNSCache: map[string]dnsMeta{ "fqdn-test-pod.lfx.test": { - responseIPs: map[string]ipWithTTL{ + responseIPs: map[string]ipWithExpiration{ "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(-1 * time.Second)}, }, }, }, - dnsResponseIPs: map[string]ipWithTTL{ + dnsResponseIPs: map[string]ipWithExpiration{ "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, }, expectedIPs: map[string]time.Time{ "192.1.1.3": currentTime.Add(10 * time.Second), }, + expectedItem: testFQDN, }, { name: "stale IP with unexpired TTL are retained", existingDNSCache: map[string]dnsMeta{ "fqdn-test-pod.lfx.test": { - responseIPs: map[string]ipWithTTL{ + responseIPs: map[string]ipWithExpiration{ "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, }, }, }, - dnsResponseIPs: map[string]ipWithTTL{ + dnsResponseIPs: map[string]ipWithExpiration{ "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, }, expectedIPs: map[string]time.Time{ "192.1.1.2": currentTime.Add(5 * time.Second), "192.1.1.3": currentTime.Add(10 * time.Second), }, + expectedItem: testFQDN, }, } @@ -670,25 +698,24 @@ func TestOnDNSResponse(t *testing.T) { f, _ := newMockFQDNController(t, controller, nil) f.dnsEntryCache = tc.existingDNSCache - f.onDNSResponse("fqdn-test-pod.lfx.test", tc.dnsResponseIPs, nil) + f.onDNSResponse(testFQDN, tc.dnsResponseIPs, nil) - cachedDnsMetaData := f.dnsEntryCache["fqdn-test-pod.lfx.test"] + cachedDnsMetaData := f.dnsEntryCache[testFQDN] assert.Equal(t, len(cachedDnsMetaData.responseIPs), len(tc.expectedIPs), "Expected %d IPs in cache, got %d", len(tc.expectedIPs), len(cachedDnsMetaData.responseIPs)) for ipStr, expectedTTL := range tc.expectedIPs { cachedIPMeta, exists := cachedDnsMetaData.responseIPs[ipStr] - if !exists { - t.Errorf("Expected %s to be found in dns cache.", ipStr) - continue - } - if cachedIPMeta.expirationTime.Before(time.Now()) { - t.Errorf("Expected %s to be found with a valid TTL,but it has expired.", ipStr) - } + assert.True(t, exists, "Expected %s to be found in dns cache.", ipStr) + assert.False(t, cachedIPMeta.expirationTime.Before(time.Now()), "Expected %s to have a valid TTL, but it has expired.", ipStr) + assert.Equal(t, expectedTTL, cachedIPMeta.expirationTime, "Expected TTL for %s to be %v, got %v", ipStr, expectedTTL, cachedIPMeta.expirationTime) - if !cachedIPMeta.expirationTime.Equal(expectedTTL) { - t.Errorf("Expected TTL for %s to be %v, got %v", ipStr, expectedTTL, cachedIPMeta.expirationTime) - } + } + + if tc.expectedItem == testFQDN { + actualItem, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(actualItem) + assert.Equal(t, tc.expectedItem, actualItem) } }) } From 019e8586879c46cbad4ebdd665451e2b62438dfd Mon Sep 17 00:00:00 2001 From: Hemant Date: Tue, 29 Oct 2024 02:51:02 +0530 Subject: [PATCH 2/2] fixed golangci-lint error Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 72b5e0c89c9..c9ae31a913b 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -17,7 +17,6 @@ package networkpolicy import ( "context" "fmt" - "k8s.io/client-go/util/workqueue" "net" "testing" "time" @@ -26,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "antrea.io/antrea/pkg/agent/config" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"