Skip to content

Commit

Permalink
Keep expiration times for individual IPs in FQDN cache (#6732)
Browse files Browse the repository at this point in the history
The FQDN cache is used to enforce NetworkPolicy rules with FQDN targets.
This patch enables tracking of expiration times for individual IPs in the
FQDN cache, based on the TTL received in DNS responses. Prior to
this change, a "global" expiration time was used for all IPs corresponding
to a given FQDN.

Fixes #6722 

Signed-off-by: Hemant <hkbiet@gmail.com>
  • Loading branch information
hkiiita authored Nov 5, 2024
1 parent 92454fb commit 232d582
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 67 deletions.
158 changes: 103 additions & 55 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package networkpolicy
import (
"context"
"fmt"
"math"
"net"
"os"
"regexp"
Expand All @@ -32,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/types"
Expand Down Expand Up @@ -76,11 +76,15 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool {
// expirationTime of the records, which is the DNS response
// receiving time plus lowest applicable TTL.
type dnsMeta struct {
expirationTime time.Time
// Key for responseIPs is the string representation of the IP.
// It helps to quickly identify IP address updates when a
// new DNS response is received.
responseIPs map[string]net.IP
responseIPs map[string]ipWithExpiration
}

type ipWithExpiration struct {
ip net.IP
expirationTime time.Time
}

// subscriber is a entity that subsribes for datapath rule realization
Expand Down Expand Up @@ -152,9 +156,11 @@ type fqdnController struct {
ipv4Enabled bool
ipv6Enabled bool
gwPort uint32
// clock allows injecting a custom (fake) clock in unit tests.
clock clock.Clock
}

func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32) (*fqdnController, error) {
func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.WithTicker) (*fqdnController, error) {
controller := &fqdnController{
ofClient: client,
dirtyRuleHandler: dirtyRuleHandler,
Expand All @@ -163,7 +169,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer
dnsQueryQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "fqdn",
Name: "fqdn",
Clock: clock,
},
),
dnsEntryCache: map[string]dnsMeta{},
Expand All @@ -174,6 +181,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer
ipv4Enabled: v4Enabled,
ipv6Enabled: v6Enabled,
gwPort: gwPort,
clock: clock,
}
if controller.ofClient != nil {
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil {
Expand Down Expand Up @@ -253,8 +261,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP {
}
for fqdn := range fqdnsMatched {
if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok {
for _, ip := range dnsMeta.responseIPs {
matchedIPs = append(matchedIPs, ip)
for _, ipData := range dnsMeta.responseIPs {
matchedIPs = append(matchedIPs, ipData.ip)
}
}
}
Expand Down Expand Up @@ -405,80 +413,108 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error {

func (f *fqdnController) onDNSResponse(
fqdn string,
responseIPs map[string]net.IP,
lowestTTL uint32,
newIPsWithExpiration map[string]ipWithExpiration,
waitCh chan error,
) {
if len(responseIPs) == 0 {
if len(newIPsWithExpiration) == 0 {
klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn)
if waitCh != nil {
waitCh <- nil
}
return
}
// mustCacheResponse is only true if the FQDN is already tracked by this
// controller, or it matches at least one fqdnSelectorItem from the policy rules.
// addressUpdate is only true if there has been an update in IP addresses
// corresponded with the FQDN.
mustCacheResponse, addressUpdate := false, false
recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second)

addressUpdate := false
currentTime := f.clock.Now()
ipWithExpirationMap := make(map[string]ipWithExpiration)

// timeToRequery sets the interval for sending a new DNS query for the FQDN,
// based on the shortest expiration time of cached IPs.
var timeToRequery *time.Time

updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) {
ipWithExpirationMap[ip] = ipMeta
if timeToRequery == nil || ipMeta.expirationTime.Before(*timeToRequery) {
timeToRequery = &ipMeta.expirationTime
}
}

f.fqdnSelectorMutex.Lock()
defer f.fqdnSelectorMutex.Unlock()
oldDNSMeta, exist := f.dnsEntryCache[fqdn]
cachedDNSMeta, exist := f.dnsEntryCache[fqdn]
if exist {
mustCacheResponse = true
for ipStr := range responseIPs {
if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok {
// check for new IPs.
for newIPStr, newIPMeta := range newIPsWithExpiration {
if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist {
updateIPWithExpiration(newIPStr, newIPMeta)
addressUpdate = true
break
}
}
for oldIPStr, oldIP := range oldDNSMeta.responseIPs {
if _, ok := responseIPs[oldIPStr]; !ok {
if oldDNSMeta.expirationTime.Before(time.Now()) {
// This IP entry has already expired and not seen in the latest DNS response.
// It should be removed from the cache.

// check for presence of already cached IPs in the new response.
for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs {
if newIPMeta, exist := newIPsWithExpiration[cachedIPStr]; !exist {
// The IP was not found in current response.
if cachedIPMeta.expirationTime.Before(currentTime) {
// this IP is expired and stale, remove it by not including it but also signal an update to syncRules.
addressUpdate = true
} else {
// Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed.
responseIPs[oldIPStr] = oldIP
if oldDNSMeta.expirationTime.Before(recordTTL) {
recordTTL = oldDNSMeta.expirationTime
}
// It hasn't expired yet, so just retain it with its existing expirationTime.
updateIPWithExpiration(cachedIPStr, cachedIPMeta)
}
} else {
// The cached IP is included in the current response; update its expiration time to the later of the new and existing values.
updateIPWithExpiration(cachedIPStr, ipWithExpiration{
ip: cachedIPMeta.ip,
expirationTime: laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime),
})
}
}

} else {
// This domain is being encountered for the first time.
// Check if it should be tracked by matching it against existing selectorItemToRuleIDs.

addToCache := false
for selectorItem := range f.selectorItemToRuleIDs {
// Only track the FQDN if there is at least one fqdnSelectorItem matching it.
if selectorItem.matches(fqdn) {
mustCacheResponse, addressUpdate = true, true
// A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but
// keep iterating to create mapping of multiple selectorItems against same FQDN.
addToCache = true
f.setFQDNMatchSelector(fqdn, selectorItem)
}
}
if addToCache {
for ipStr, ipMeta := range newIPsWithExpiration {
updateIPWithExpiration(ipStr, ipMeta)
}
addressUpdate = true
}
}
if mustCacheResponse {

// ipWithExpirationMap remains empty and timeToRequery is nil only when FQDN doesn't match any selector.
if len(ipWithExpirationMap) > 0 {
f.dnsEntryCache[fqdn] = dnsMeta{
expirationTime: recordTTL,
responseIPs: responseIPs,
responseIPs: ipWithExpirationMap,
}
f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now()))
f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime))
}

f.syncDirtyRules(fqdn, waitCh, addressUpdate)
}

// onDNSResponseMsg handles a DNS response message intercepted.
func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) {
fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg)
fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg)
if err != nil {
klog.V(2).InfoS("Failed to parse DNS response")
if waitCh != nil {
waitCh <- fmt.Errorf("failed to parse DNS response: %v", err)
}
return
}
f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh)
f.onDNSResponse(fqdn, responseIPs, waitCh)
}

// syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response
Expand Down Expand Up @@ -594,38 +630,39 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) {
}

// parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response.
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) {
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) {
if len(msg.Question) == 0 {
return "", nil, 0, fmt.Errorf("invalid DNS message")
return "", nil, fmt.Errorf("invalid DNS message")
}
fqdn := strings.ToLower(msg.Question[0].Name)
lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs
responseIPs := map[string]net.IP{}
responseIPs := map[string]ipWithExpiration{}
currentTime := f.clock.Now()
for _, ans := range msg.Answer {
switch r := ans.(type) {
case *dns.A:
if f.ipv4Enabled {
responseIPs[r.A.String()] = r.A
if r.Header().Ttl < lowestTTL {
lowestTTL = r.Header().Ttl
responseIPs[r.A.String()] = ipWithExpiration{
ip: r.A,
expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second),
}

}
case *dns.AAAA:
if f.ipv6Enabled {
responseIPs[r.AAAA.String()] = r.AAAA
if r.Header().Ttl < lowestTTL {
lowestTTL = r.Header().Ttl
responseIPs[r.AAAA.String()] = ipWithExpiration{
ip: r.AAAA,
expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second),
}
}
}
}
if len(responseIPs) > 0 {
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL)
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs)
}
if strings.HasSuffix(fqdn, ".") {
fqdn = fqdn[:len(fqdn)-1]
}
return fqdn, responseIPs, lowestTTL, nil
return fqdn, responseIPs, nil
}

func (f *fqdnController) worker() {
Expand Down Expand Up @@ -662,24 +699,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error {

var errs []error

makeResponseIPs := func(ips []net.IP) map[string]net.IP {
responseIPs := make(map[string]net.IP)
makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration {
responseIPs := make(map[string]ipWithExpiration)
for _, ip := range ips {
responseIPs[ip.String()] = ip
responseIPs[ip.String()] = ipWithExpiration{
ip: ip,
expirationTime: f.clock.Now().Add(time.Duration(defaultTTL) * time.Second),
}
}
return responseIPs
}

if f.ipv4Enabled {
if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil)
} else {
errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err))
}
}
if f.ipv6Enabled {
if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil)
} else {
errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err))
}
Expand Down Expand Up @@ -818,3 +858,11 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
return f.ofClient.ResumePausePacket(pktIn)
}
}

// laterOf returns the later of the two given time.Time values.
func laterOf(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}
Loading

0 comments on commit 232d582

Please sign in to comment.