Skip to content

Commit

Permalink
Merge pull request #470 from MaterializeInc/remove_finalizer_cleanup_…
Browse files Browse the repository at this point in the history
…code

remove finalizer cleanup code
  • Loading branch information
alex-hunt-materialize authored Jun 24, 2024
2 parents 95e20f1 + 21b3d7a commit e8faa33
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 124 deletions.
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ Cilium (as of 1.12.0) does not seem to support configuring masquerade on a per-p

##### B. Run a privileged daemonset in the host network to inject ip rules for pods managed by the eip-operator.

You must set the `VPC_CIDR` environment variable to match the Cilium `ipv4NativeRoutingCIDR`. This allows the agent to detect the appropriate table to forward to from the existing Cilium-created rules.

```yaml
apiVersion: apps/v1
kind: DaemonSet
Expand All @@ -193,13 +191,6 @@ spec:
env:
- name: RUST_LOG
value: INFO
- name: VPC_CIDR
value: 10.2.0.0/16
- name: NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
image: materialize/k8s-eip-operator
name: eip-operator
securityContext:
Expand Down
4 changes: 1 addition & 3 deletions cilium_eip_no_masquerade_agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cilium-eip-no-masquerade-agent"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
license = "Apache-2.0"

Expand All @@ -12,8 +12,6 @@ eip-operator-shared = { workspace = true }
futures = { workspace = true }
iptables = { workspace = true }
json-patch = { workspace = true }
k8s-openapi = { workspace = true }
kube = { workspace = true }
netlink-packet-route = { workspace = true }
rand = { workspace = true }
rtnetlink = { workspace = true }
Expand Down
111 changes: 2 additions & 109 deletions cilium_eip_no_masquerade_agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use std::collections::HashSet;
use std::net::Ipv4Addr;

use futures::{future, TryStream, TryStreamExt};
use futures::{future, TryStreamExt};
use iptables::IPTables;
use json_patch::{PatchOperation, RemoveOperation, TestOperation};
use k8s_openapi::api::core::v1::Pod;
use kube::api::{ListParams, Patch, PatchParams};
use kube::{Api, Client as KubeClient, ResourceExt};
use netlink_packet_route::rule::Nla;
use netlink_packet_route::RuleMessage;
use rtnetlink::{new_connection, IpVersion, RuleHandle};
use tokio::time::{sleep, Duration};
use tracing::{event, info, trace, Level};

use eip_operator_shared::{run_with_tracing, Error, MANAGE_EIP_LABEL};
use eip_operator_shared::{run_with_tracing, Error};

const FINALIZER_NAME: &str = "eip.materialize.cloud/cilium-no-masquerade-rule";
const TABLE: &str = "mangle";
const CHAIN: &str = "CILIUM_PRE_mangle";
const FW_MASK: u32 = 0xf;
Expand All @@ -24,27 +17,9 @@ const FIRST_SECONDARY_ENI_INDEX: u32 = 1;
// eth15, No AWS instance type supports more than 15 ENIs
const LAST_SECONDARY_ENI_INDEX: u32 = 15;

async fn filter_pod_rules(
rules: impl TryStream<Ok = RuleMessage, Error = rtnetlink::Error>,
pod_ip: Ipv4Addr,
) -> Result<Vec<RuleMessage>, rtnetlink::Error> {
rules
.try_filter(|rule| {
future::ready(
rule.header.src_len == 32
&& rule.nlas.contains(&Nla::Source(pod_ip.octets().to_vec())),
)
})
.try_collect()
.await
}

struct RuleManager {
iptables: IPTables,
ip_rule_handle: RuleHandle,
kube_client: KubeClient,
global_pod_api: Api<Pod>,
node_name: String,
}

impl RuleManager {
Expand All @@ -55,82 +30,12 @@ impl RuleManager {
let ip_rule_handle = rtnetlink_handle.rule();
tokio::spawn(connection);

let kube_client = KubeClient::try_default().await.unwrap();
let global_pod_api: Api<Pod> = Api::all(kube_client.clone());

let node_name = std::env::var("NODE_NAME")
.expect("NODE_NAME env var must be set to the name of the kubernetes node this agent is running on.");

RuleManager {
iptables,
ip_rule_handle,
kube_client,
global_pod_api,
node_name,
}
}

async fn cleanup_legacy_per_pod_rules(&self, pod: &Pod) -> Result<(), Error> {
let pod_name = pod.name_unchecked();

// Assuming that if it doesn't have an IP during cleanup, that it never had one.
if let Some(pod_ip_str) = &pod
.status
.as_ref()
.and_then(|status| status.pod_ip.as_ref())
{
let pod_ip: Ipv4Addr = pod_ip_str.parse()?;
let rules = self
.ip_rule_handle
.get(IpVersion::V4)
.execute()
.into_stream();
let pod_rules = filter_pod_rules(rules, pod_ip).await?;
if let Some(rule) = pod_rules
.into_iter()
.find(|rule| rule.header.dst_len == 0 && rule.header.action == 1)
{
event!(Level::INFO, pod_name = %pod_name, pod_ip = %pod_ip, rule = ?rule, "Deleting rule.");
self.ip_rule_handle.del(rule).execute().await?;
}
}
self.remove_finalizer(pod, &pod_name).await?;
Ok(())
}

async fn remove_finalizer(&self, pod: &Pod, pod_name: &str) -> Result<(), Error> {
// https://docs.rs/kube-runtime/latest/src/kube_runtime/finalizer.rs.html
let finalizer_index = pod
.finalizers()
.iter()
.enumerate()
.find(|(_, finalizer)| *finalizer == FINALIZER_NAME)
.map(|(i, _)| i);
if let Some(finalizer_index) = finalizer_index {
let pod_api: Api<Pod> =
Api::namespaced(self.kube_client.clone(), &pod.namespace().unwrap());
let finalizer_path = format!("/metadata/finalizers/{finalizer_index}");
pod_api
.patch::<Pod>(
pod_name,
&PatchParams::default(),
&Patch::Json(json_patch::Patch(vec![
// All finalizers run concurrently and we use an integer index
// `Test` ensures that we fail instead of deleting someone else's finalizer
PatchOperation::Test(TestOperation {
path: finalizer_path.clone(),
value: FINALIZER_NAME.into(),
}),
PatchOperation::Remove(RemoveOperation {
path: finalizer_path,
}),
])),
)
.await?;
}
Ok(())
}

async fn wait_for_chain_to_exist(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Waiting for {CHAIN} chain to exist");
while !self.iptables.chain_exists(TABLE, CHAIN)? {
Expand Down Expand Up @@ -228,18 +133,6 @@ async fn run() -> Result<(), Error> {
}
}

let pods = manager
.global_pod_api
.list(
&ListParams::default()
.labels(MANAGE_EIP_LABEL)
.fields(&format!("spec.nodeName={}", manager.node_name)),
)
.await?;
for pod in pods {
manager.cleanup_legacy_per_pod_rules(&pod).await?;
}

let delay_secs = 1;
info!("Done! Will recheck in {delay_secs} seconds");
sleep(Duration::from_secs(delay_secs)).await;
Expand Down

0 comments on commit e8faa33

Please sign in to comment.