Skip to content

Commit

Permalink
Use an unique ID allocator for OpenFlow groups (#5188)
Browse files Browse the repository at this point in the history
OpenFlow Group IDs are Node global resources. Not only IPv4 and IPv6
specific OpenFlow rules could use Groups, but also IP family independent
rules could use it. Dividing the whole ID pool into two pools makes such
use cases have to choose either IPv4 or IPv6 ID pool, which looks weird.
Besides, there seems no good reason why two ID allocators should be
created to manage the same pool.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Jul 3, 2023
1 parent f32f796 commit 753dd21
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 41 deletions.
10 changes: 5 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,14 @@ func run(o *Options) error {

var groupCounters []proxytypes.GroupCounter
groupIDUpdates := make(chan string, 100)
v4GroupIDAllocator := openflow.NewGroupAllocator(false)
v4GroupCounter := proxytypes.NewGroupCounter(v4GroupIDAllocator, groupIDUpdates)
groupIDAllocator := openflow.NewGroupAllocator()
var v4GroupCounter, v6GroupCounter proxytypes.GroupCounter
if v4Enabled {
v4GroupCounter = proxytypes.NewGroupCounter(groupIDAllocator, groupIDUpdates)
groupCounters = append(groupCounters, v4GroupCounter)
}
v6GroupIDAllocator := openflow.NewGroupAllocator(true)
v6GroupCounter := proxytypes.NewGroupCounter(v6GroupIDAllocator, groupIDUpdates)
if v6Enabled {
v6GroupCounter = proxytypes.NewGroupCounter(groupIDAllocator, groupIDUpdates)
groupCounters = append(groupCounters, v6GroupCounter)
}

Expand Down Expand Up @@ -758,7 +758,7 @@ func run(o *Options) error {
}
mcastController = multicast.NewMulticastController(
ofClient,
v4GroupIDAllocator,
groupIDAllocator,
nodeConfig,
ifaceStore,
multicastSocket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
ch2 := make(chan string, 100)
groupIDAllocator := openflow.NewGroupAllocator(false)
groupIDAllocator := openflow.NewGroupAllocator()
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)}
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{})
reconciler := newMockReconciler()
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newCIDR(cidrStr string) *net.IPNet {
func newTestReconciler(t *testing.T, controller *gomock.Controller, ifaceStore interfacestore.InterfaceStore, ofClient *openflowtest.MockClient, v4Enabled, v6Enabled bool) *reconciler {
f, _ := newMockFQDNController(t, controller, nil)
ch := make(chan string, 100)
groupIDAllocator := openflow.NewGroupAllocator(v6Enabled)
groupIDAllocator := openflow.NewGroupAllocator()
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch)}
r := newReconciler(ofClient, ifaceStore, newIDAllocator(testAsyncDeleteInterval), f, groupCounters, v4Enabled, v6Enabled, true, false)
return r
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ func newMockMulticastController(t *testing.T, isEncap bool) *Controller {
addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}
nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr}
mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any()).Times(1)
groupAllocator := openflow.NewGroupAllocator(false)
groupAllocator := openflow.NewGroupAllocator()
podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100)

clientset = fake.NewSimpleClientset()
Expand Down
8 changes: 2 additions & 6 deletions pkg/agent/openflow/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ func (a *groupAllocator) Release(id binding.GroupIDType) {
a.recycled = append(a.recycled, id)
}

func NewGroupAllocator(isIPv6 bool) GroupAllocator {
var groupIDCounter binding.GroupIDType
if isIPv6 {
groupIDCounter = 0x10000000
}
return &groupAllocator{groupIDCounter: groupIDCounter}
func NewGroupAllocator() GroupAllocator {
return &groupAllocator{}
}
54 changes: 27 additions & 27 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func testClusterIPAdd(t *testing.T,
endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll}
if !endpointSliceEnabled {
options = append(options, withoutEndpointSlice)
Expand Down Expand Up @@ -498,7 +498,7 @@ func testLoadBalancerAdd(t *testing.T,
endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll}
if !proxyLoadBalancerIPs {
options = append(options, withoutProxyLoadBalancerIPs)
Expand Down Expand Up @@ -635,7 +635,7 @@ func testNodePortAdd(t *testing.T,
endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll}
if !endpointSliceEnabled {
options = append(options, withoutEndpointSlice)
Expand Down Expand Up @@ -865,7 +865,7 @@ func TestLoadBalancerAdd(t *testing.T) {
func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(false)
groupAllocator := openflow.NewGroupAllocator()
nodePortAddresses := []net.IP{net.ParseIP("0.0.0.0")}
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll)

Expand Down Expand Up @@ -1106,8 +1106,8 @@ func TestClusterSkipServices(t *testing.T) {
func TestDualStackService(t *testing.T) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
ipv4GroupAllocator := openflow.NewGroupAllocator(false)
ipv6GroupAllocator := openflow.NewGroupAllocator(true)
ipv4GroupAllocator := openflow.NewGroupAllocator()
ipv6GroupAllocator := openflow.NewGroupAllocator()
fpv4 := newFakeProxier(mockRouteClient, mockOFClient, nil, ipv4GroupAllocator, false)
fpv6 := newFakeProxier(mockRouteClient, mockOFClient, nil, ipv6GroupAllocator, true)
metaProxier := k8sproxy.NewMetaProxier(fpv4, fpv6)
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func TestDualStackService(t *testing.T) {
func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bool, nodeLocalInternal, endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll, withSupportNestedService}
if !endpointSliceEnabled {
options = append(options, withoutEndpointSlice)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo
func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP net.IP, isIPv6 bool, endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll}
if !endpointSliceEnabled {
options = append(options, withoutEndpointSlice)
Expand Down Expand Up @@ -1323,7 +1323,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa
func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP, loadBalancerIP net.IP, isIPv6 bool, endpointSliceEnabled bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
options := []proxyOptionsFn{withProxyAll}
if !endpointSliceEnabled {
options = append(options, withoutEndpointSlice)
Expand Down Expand Up @@ -1489,7 +1489,7 @@ func TestLoadBalancerRemove(t *testing.T) {
func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6)

svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil)
Expand Down Expand Up @@ -1521,7 +1521,7 @@ func TestClusterIPNoEndpoint(t *testing.T) {
func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

svc := makeTestNodePortService(&svcPortName,
Expand Down Expand Up @@ -1581,7 +1581,7 @@ func TestNodePortNoEndpoint(t *testing.T) {
func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster
Expand Down Expand Up @@ -1653,7 +1653,7 @@ func TestLoadBalancerNoEndpoint(t *testing.T) {
func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6)

svcPortNameTCP := makeSvcPortName("ns", "svc-tcp", strconv.Itoa(svcPort), corev1.ProtocolTCP)
Expand Down Expand Up @@ -1710,7 +1710,7 @@ func TestClusterIPRemoveSamePortEndpoint(t *testing.T) {
func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6)

svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil)
Expand Down Expand Up @@ -1758,7 +1758,7 @@ func TestClusterIPRemoveEndpoints(t *testing.T) {
func testSessionAffinity(t *testing.T, svcIP net.IP, epIP net.IP, affinitySeconds int32, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6)

svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
Expand Down Expand Up @@ -1821,7 +1821,7 @@ func TestSessionAffinityOverflow(t *testing.T) {
func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6)

timeoutSeconds := corev1.DefaultClientIPServiceAffinitySeconds
Expand Down Expand Up @@ -1871,7 +1871,7 @@ func testServiceClusterIPUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -1972,7 +1972,7 @@ func testServicePortUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -2074,7 +2074,7 @@ func testServiceNodePortUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -2159,7 +2159,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -2264,7 +2264,7 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6, withProxyAll)

internalTrafficPolicyCluster := corev1.ServiceInternalTrafficPolicyCluster
Expand Down Expand Up @@ -2350,7 +2350,7 @@ func testServiceIngressIPsUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var loadBalancerIPStrs, updatedLoadBalancerIPStrs []string
Expand Down Expand Up @@ -2437,7 +2437,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -2536,7 +2536,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T,
isIPv6 bool) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(isIPv6)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll)

var svc, updatedSvc *corev1.Service
Expand Down Expand Up @@ -2632,7 +2632,7 @@ func TestServiceSessionAffinityTypeUpdate(t *testing.T) {
func TestServicesWithSameEndpoints(t *testing.T) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(false)
groupAllocator := openflow.NewGroupAllocator()
fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, false)

svcPortName1 := makeSvcPortName("ns", "svc1", strconv.Itoa(svcPort), corev1.ProtocolTCP)
Expand Down Expand Up @@ -2740,7 +2740,7 @@ func TestMetrics(t *testing.T) {
func TestGetServiceFlowKeys(t *testing.T) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(false)
groupAllocator := openflow.NewGroupAllocator()
svc := makeTestNodePortService(&svcPortName,
svc1IPv4,
nil,
Expand Down Expand Up @@ -2828,7 +2828,7 @@ func TestGetServiceFlowKeys(t *testing.T) {
func TestServiceLabelSelector(t *testing.T) {
ctrl := gomock.NewController(t)
mockOFClient, mockRouteClient := getMockClients(ctrl)
groupAllocator := openflow.NewGroupAllocator(false)
groupAllocator := openflow.NewGroupAllocator()
svcPortName1 := makeSvcPortName("ns", "svc1", strconv.Itoa(svcPort), corev1.ProtocolTCP)
svcPortName2 := makeSvcPortName("ns", "svc2", strconv.Itoa(svcPort), corev1.ProtocolTCP)
svcPortName3 := makeSvcPortName("ns", "svc3", strconv.Itoa(svcPort), corev1.ProtocolTCP)
Expand Down

0 comments on commit 753dd21

Please sign in to comment.