Skip to content

Commit

Permalink
datapath: Extend LocalNodeConfiguration with devices and addresses
Browse files Browse the repository at this point in the history
The loader and the config writer were accessing node IP address information
and devices via either the StateDB tables or via pkg/node globals. This made
these components hard to test and made it hard to reason about reinitialization
when data changed.

Clean this up by extending the LocalNodeConfiguration with additional fields
that capture this dynamic data that was accessed out-of-band previously.

The creation of LocalNodeConfiguration is moved from NodeDiscovery into the
datapath orchestrator where it belongs.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed May 31, 2024
1 parent a86ac24 commit cfb72f7
Show file tree
Hide file tree
Showing 31 changed files with 570 additions and 777 deletions.
2 changes: 0 additions & 2 deletions daemon/cmd/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
fakecni "github.com/cilium/cilium/daemon/cmd/cni/fake"
"github.com/cilium/cilium/pkg/controller"
fakeDatapath "github.com/cilium/cilium/pkg/datapath/fake"
"github.com/cilium/cilium/pkg/datapath/loader"
"github.com/cilium/cilium/pkg/datapath/prefilter"
datapath "github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/endpoint"
Expand Down Expand Up @@ -130,7 +129,6 @@ func setupDaemonSuite(tb testing.TB) *DaemonSuite {
),
fakeDatapath.Cell,
prefilter.Cell,
loader.Cell,
monitorAgent.Cell,
ControlPlane,
metrics.Cell,
Expand Down
6 changes: 0 additions & 6 deletions daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ import (
"github.com/cilium/cilium/pkg/option"
)

// LocalConfig returns the local configuration of the daemon's nodediscovery.
func (d *Daemon) LocalConfig() *datapath.LocalNodeConfiguration {
d.nodeDiscovery.WaitForLocalNodeInit()
return &d.nodeDiscovery.LocalConfig
}

// listFilterIfs returns a map of interfaces based on the given filter.
// The filter should take a link and, if found, return the index of that
// interface, if not found return -1.
Expand Down
23 changes: 7 additions & 16 deletions daemon/cmd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
. "github.com/cilium/cilium/api/v1/server/restapi/daemon"
"github.com/cilium/cilium/daemon/cmd/cni/fake"
fakeTypes "github.com/cilium/cilium/pkg/datapath/fake/types"
"github.com/cilium/cilium/pkg/mtu"
"github.com/cilium/cilium/pkg/node/manager"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/nodediscovery"
Expand All @@ -27,7 +26,6 @@ type GetNodesSuite struct {
}

var (
mtuConfig = mtu.NewConfiguration(0, false, false, false, false, 0, nil)
fakeConfig = &option.DaemonConfig{
RoutingMode: option.RoutingModeTunnel,
EnableIPSec: true,
Expand Down Expand Up @@ -80,8 +78,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "create a client ID and store it locally",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{
ClientID: &zero,
Expand Down Expand Up @@ -112,8 +109,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "retrieve nodes diff from a client that was already present",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{
ClientID: &clientIDs[0],
Expand Down Expand Up @@ -166,8 +162,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "retrieve nodes from an expired client, it should be ok because the clean up only happens when on insertion",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{
ClientID: &clientIDs[0],
Expand Down Expand Up @@ -221,8 +216,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "retrieve nodes for a new client, the expired client should be deleted",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{
ClientID: &zero,
Expand Down Expand Up @@ -271,8 +265,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "retrieve nodes for a new client, however the randomizer allocated an existing clientID, so we should return a empty clientID",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{
ClientID: &zero,
Expand Down Expand Up @@ -321,8 +314,7 @@ func Test_getNodesHandle(t *testing.T) {
{
name: "retrieve nodes for a client that does not want to have diffs, leave all other stored clients alone",
setupArgs: func() args {
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{})
nodeDiscovery := nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{})
return args{
params: GetClusterNodesParams{},
daemon: &Daemon{
Expand Down Expand Up @@ -428,10 +420,9 @@ func Test_cleanupClients(t *testing.T) {
args := tt.setupArgs()
want := tt.setupWanted()
h := &getNodes{clients: args.clients}
lnc, _ := nodediscovery.NewLocalNodeConfig(&mtuConfig, option.Config)
h.cleanupClients(
&Daemon{
nodeDiscovery: nodediscovery.NewNodeDiscovery(g.nm, nil, nil, lnc, &fake.FakeCNIConfigManager{}),
nodeDiscovery: nodediscovery.NewNodeDiscovery(g.nm, nil, nil, &fake.FakeCNIConfigManager{}),
})
require.EqualValues(t, want.clients, h.clients)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/datapath/fake/types/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ func (f *FakeDatapath) WriteNetdevConfig(io.Writer, *option.IntOptions) error {
}

// WriteTemplateConfig pretends to write the endpoint configuration to a writer.
func (f *FakeDatapath) WriteTemplateConfig(io.Writer, datapath.EndpointConfiguration) error {
func (f *FakeDatapath) WriteTemplateConfig(io.Writer, *datapath.LocalNodeConfiguration, datapath.EndpointConfiguration) error {
return nil
}

// WriteEndpointConfig pretends to write the endpoint configuration to a writer.
func (f *FakeDatapath) WriteEndpointConfig(io.Writer, datapath.EndpointConfiguration) error {
func (f *FakeDatapath) WriteEndpointConfig(io.Writer, *datapath.LocalNodeConfiguration, datapath.EndpointConfiguration) error {
return nil
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func (f *FakeLoader) CustomCallsMapPath(id uint16) string {
}

// Reinitialize does nothing.
func (f *FakeLoader) Reinitialize(ctx context.Context, tunnelConfig tunnel.Config, deviceMTU int, iptMgr datapath.IptablesManager, p datapath.Proxy) error {
func (f *FakeLoader) Reinitialize(ctx context.Context, cfg datapath.LocalNodeConfiguration, tunnelConfig tunnel.Config, iptMgr datapath.IptablesManager, p datapath.Proxy) error {
return nil
}

Expand All @@ -170,6 +170,10 @@ func (f *FakeLoader) DetachXDP(ifaceName string, bpffsBase, progName string) err
return nil
}

func (f *FakeLoader) WriteEndpointConfig(w io.Writer, e datapath.EndpointConfiguration) error {
return nil
}

type FakeOrchestrator struct{}

func (f *FakeOrchestrator) Reinitialize(ctx context.Context) error {
Expand Down
5 changes: 0 additions & 5 deletions pkg/datapath/linux/config/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ package config

import (
"github.com/cilium/hive/cell"
"github.com/cilium/statedb"
"github.com/sirupsen/logrus"

dpdef "github.com/cilium/cilium/pkg/datapath/linux/config/defines"
"github.com/cilium/cilium/pkg/datapath/linux/sysctl"
"github.com/cilium/cilium/pkg/datapath/tables"
datapath "github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/maps/nodemap"
)
Expand All @@ -24,9 +22,6 @@ type WriterParams struct {
NodeExtraDefines []dpdef.Map `group:"header-node-defines"`
NodeExtraDefineFns []dpdef.Fn `group:"header-node-define-fns"`
Sysctl sysctl.Sysctl
DB *statedb.DB
Devices statedb.Table[*tables.Device]
NodeAddresses statedb.Table[tables.NodeAddress]
}

var Cell = cell.Module(
Expand Down
81 changes: 27 additions & 54 deletions pkg/datapath/linux/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"net"
"net/netip"
"slices"
"sort"
"strconv"
"strings"
Expand All @@ -21,8 +22,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"

"github.com/cilium/statedb"

"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/byteorder"
"github.com/cilium/cilium/pkg/cidr"
Expand Down Expand Up @@ -59,17 +58,13 @@ import (
"github.com/cilium/cilium/pkg/maps/vtep"
"github.com/cilium/cilium/pkg/maps/worldcidrsmap"
"github.com/cilium/cilium/pkg/netns"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
wgtypes "github.com/cilium/cilium/pkg/wireguard/types"
)

// HeaderfileWriter is a wrapper type which implements datapath.ConfigWriter.
// It manages writing of configuration of datapath program headerfiles.
type HeaderfileWriter struct {
db *statedb.DB
devices statedb.Table[*tables.Device]
nodeAddrs statedb.Table[tables.NodeAddress]
log logrus.FieldLogger
nodeMap nodemap.MapV2
nodeAddressing datapath.NodeAddressing
Expand All @@ -87,9 +82,6 @@ func NewHeaderfileWriter(p WriterParams) (datapath.ConfigWriter, error) {
}
return &HeaderfileWriter{
nodeMap: p.NodeMap,
db: p.DB,
devices: p.Devices,
nodeAddrs: p.NodeAddresses,
nodeAddressing: p.NodeAddressing,
nodeExtraDefines: merged,
nodeExtraDefineFns: p.NodeExtraDefineFns,
Expand All @@ -104,53 +96,45 @@ func writeIncludes(w io.Writer) (int, error) {

// WriteNodeConfig writes the local node configuration to the specified writer.
func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeConfiguration) error {
txn := h.db.ReadTxn()

extraMacrosMap := make(dpdef.Map)
cDefinesMap := make(dpdef.Map)

var nativeDevices []*tables.Device
if h.db != nil && h.devices != nil {
txn := h.db.ReadTxn()
nativeDevices, _ = tables.SelectedDevices(h.devices, txn)
}
nativeDevices := cfg.Devices

fw := bufio.NewWriter(w)

writeIncludes(w)

routerIP := node.GetIPv6Router()
hostIP := node.GetIPv6()
routerIP := cfg.CiliumInternalIPv6
hostIP := cfg.NodeIPv6

var ipv4NodePortAddrs, ipv6NodePortAddrs []netip.Addr
iter, _ := h.nodeAddrs.All(txn)
for addr, _, ok := iter.Next(); ok; addr, _, ok = iter.Next() {
for _, addr := range cfg.NodeAddresses {
if !addr.NodePort {
continue
}
if addr.Addr.Is4() {
ipv4NodePortAddrs = append(ipv4NodePortAddrs, addr.Addr)
} else {
ipv6NodePortAddrs = append(ipv6NodePortAddrs, addr.Addr)

}
}

fmt.Fprintf(fw, "/*\n")
if option.Config.EnableIPv6 {
fmt.Fprintf(fw, " cilium.v6.external.str %s\n", node.GetIPv6().String())
fmt.Fprintf(fw, " cilium.v6.internal.str %s\n", node.GetIPv6Router().String())
fmt.Fprintf(fw, " cilium.v6.external.str %s\n", cfg.NodeIPv6.String())
fmt.Fprintf(fw, " cilium.v6.internal.str %s\n", cfg.CiliumInternalIPv6.String())
fmt.Fprintf(fw, " cilium.v6.nodeport.str %v\n", ipv6NodePortAddrs)
fmt.Fprintf(fw, "\n")
}
fmt.Fprintf(fw, " cilium.v4.external.str %s\n", node.GetIPv4().String())
fmt.Fprintf(fw, " cilium.v4.internal.str %s\n", node.GetInternalIPv4Router().String())
fmt.Fprintf(fw, " cilium.v4.external.str %s\n", cfg.NodeIPv4.String())
fmt.Fprintf(fw, " cilium.v4.internal.str %s\n", cfg.CiliumInternalIPv4.String())
fmt.Fprintf(fw, " cilium.v4.nodeport.str %v\n", ipv4NodePortAddrs)
fmt.Fprintf(fw, "\n")
if option.Config.EnableIPv6 {
fw.WriteString(dumpRaw(defaults.RestoreV6Addr, node.GetIPv6Router()))
fw.WriteString(dumpRaw(defaults.RestoreV6Addr, cfg.CiliumInternalIPv6))
}
fw.WriteString(dumpRaw(defaults.RestoreV4Addr, node.GetInternalIPv4Router()))
fw.WriteString(dumpRaw(defaults.RestoreV4Addr, cfg.CiliumInternalIPv4))
fmt.Fprintf(fw, " */\n\n")

cDefinesMap["KERNEL_HZ"] = fmt.Sprintf("%d", option.Config.KernelHz)
Expand All @@ -161,9 +145,9 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC
}

if option.Config.EnableIPv4 {
ipv4GW := node.GetInternalIPv4Router()
loopbackIPv4 := node.GetIPv4Loopback()
ipv4Range := node.GetIPv4AllocRange()
ipv4GW := cfg.CiliumInternalIPv4
loopbackIPv4 := cfg.LoopbackIPv4
ipv4Range := cfg.AllocCIDRIPv4
cDefinesMap["IPV4_GATEWAY"] = fmt.Sprintf("%#x", byteorder.NetIPv4ToHost32(ipv4GW))
cDefinesMap["IPV4_LOOPBACK"] = fmt.Sprintf("%#x", byteorder.NetIPv4ToHost32(loopbackIPv4))
cDefinesMap["IPV4_MASK"] = fmt.Sprintf("%#x", byteorder.NetIPv4ToHost32(net.IP(ipv4Range.Mask)))
Expand Down Expand Up @@ -285,7 +269,7 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC
}

cDefinesMap["TRACE_PAYLOAD_LEN"] = fmt.Sprintf("%dULL", option.Config.TracePayloadlen)
cDefinesMap["MTU"] = fmt.Sprintf("%d", cfg.MtuConfig.GetDeviceMTU())
cDefinesMap["MTU"] = fmt.Sprintf("%d", cfg.DeviceMTU)

if option.Config.EnableIPv4 {
cDefinesMap["ENABLE_IPV4"] = "1"
Expand Down Expand Up @@ -344,11 +328,11 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC
cDefinesMap["STRICT_IPV4_NET"] = fmt.Sprintf("%#x", byteorder.NetIPAddrToHost32(option.Config.EncryptionStrictModeCIDR.Addr()))
cDefinesMap["STRICT_IPV4_NET_SIZE"] = fmt.Sprintf("%d", option.Config.EncryptionStrictModeCIDR.Bits())

cDefinesMap["IPV4_ENCRYPT_IFACE"] = fmt.Sprintf("%#x", byteorder.NetIPv4ToHost32(node.GetIPv4()))
cDefinesMap["IPV4_ENCRYPT_IFACE"] = fmt.Sprintf("%#x", byteorder.NetIPv4ToHost32(cfg.NodeIPv4))

ipv4Interface, ok := netip.AddrFromSlice(node.GetIPv4().To4())
ipv4Interface, ok := netip.AddrFromSlice(cfg.NodeIPv4.To4())
if !ok {
return fmt.Errorf("unable to parse node IPv4 address %s", node.GetIPv4())
return fmt.Errorf("unable to parse node IPv4 address %s", cfg.NodeIPv4)
}

if option.Config.EncryptionStrictModeCIDR.Contains(ipv4Interface) {
Expand Down Expand Up @@ -615,7 +599,7 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC
}

if option.Config.EnableIPSec {
nodeAddress := node.GetIPv4()
nodeAddress := cfg.NodeIPv4
if nodeAddress == nil {
return errors.New("external IPv4 node address is required when IPSec is enabled, but none found")
}
Expand Down Expand Up @@ -1053,30 +1037,23 @@ func (h *HeaderfileWriter) writeStaticData(devices []string, fw io.Writer, e dat
}

// WriteEndpointConfig writes the BPF configuration for the endpoint to a writer.
func (h *HeaderfileWriter) WriteEndpointConfig(w io.Writer, e datapath.EndpointConfiguration) error {
func (h *HeaderfileWriter) WriteEndpointConfig(w io.Writer, cfg *datapath.LocalNodeConfiguration, e datapath.EndpointConfiguration) error {
fw := bufio.NewWriter(w)

var (
nativeDevices []*tables.Device
deviceNames []string
)
if h.db != nil && h.devices != nil {
nativeDevices, _ = tables.SelectedDevices(h.devices, h.db.ReadTxn())
deviceNames = tables.DeviceNames(nativeDevices)
}
deviceNames := cfg.DeviceNames()

// Add cilium_wg0 if necessary.
if option.Config.NeedBPFHostOnWireGuardDevice() {
deviceNames = append(deviceNames, wgtypes.IfaceName)
deviceNames = append(slices.Clone(deviceNames), wgtypes.IfaceName)
}

writeIncludes(w)
h.writeStaticData(deviceNames, fw, e)

return h.writeTemplateConfig(fw, nativeDevices, e)
return h.writeTemplateConfig(fw, deviceNames, cfg.HostEndpointID, e)
}

func (h *HeaderfileWriter) writeTemplateConfig(fw *bufio.Writer, devices []*tables.Device, e datapath.EndpointConfiguration) error {
func (h *HeaderfileWriter) writeTemplateConfig(fw *bufio.Writer, devices []string, hostEndpointID uint64, e datapath.EndpointConfiguration) error {
if e.RequireEgressProg() {
fmt.Fprintf(fw, "#define USE_BPF_PROG_FOR_INGRESS_POLICY 1\n")
}
Expand Down Expand Up @@ -1107,7 +1084,7 @@ func (h *HeaderfileWriter) writeTemplateConfig(fw *bufio.Writer, devices []*tabl
}
}

fmt.Fprintf(fw, "#define HOST_EP_ID %d\n", uint32(node.GetEndpointID()))
fmt.Fprintf(fw, "#define HOST_EP_ID %d\n", uint32(hostEndpointID))

if e.RequireARPPassthrough() {
fmt.Fprint(fw, "#define ENABLE_ARP_PASSTHROUGH 1\n")
Expand All @@ -1130,11 +1107,7 @@ func (h *HeaderfileWriter) writeTemplateConfig(fw *bufio.Writer, devices []*tabl
}

// WriteTemplateConfig writes the BPF configuration for the template to a writer.
func (h *HeaderfileWriter) WriteTemplateConfig(w io.Writer, e datapath.EndpointConfiguration) error {
func (h *HeaderfileWriter) WriteTemplateConfig(w io.Writer, cfg *datapath.LocalNodeConfiguration, e datapath.EndpointConfiguration) error {
fw := bufio.NewWriter(w)
var nativeDevices []*tables.Device
if h.db != nil && h.devices != nil {
nativeDevices, _ = tables.SelectedDevices(h.devices, h.db.ReadTxn())
}
return h.writeTemplateConfig(fw, nativeDevices, e)
return h.writeTemplateConfig(fw, cfg.DeviceNames(), cfg.HostEndpointID, e)
}
Loading

0 comments on commit cfb72f7

Please sign in to comment.