Skip to content

Commit

Permalink
Replace fabric8 watchers with informers
Browse files Browse the repository at this point in the history
  • Loading branch information
tgeens committed Jul 28, 2023
1 parent b953be2 commit c757f7d
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ KubernetesResourceWatcherBinding kubernetesApplicationConfigMapWatcher(

@Bean
ApplicationRunner k8sWatchSecrets(KubernetesResourceWatcherBinding watcherBinding) {
return args -> watcherBinding.watch(KubernetesClient::secrets, new Fabric8SecretMapper());
return args -> watcherBinding.inform(KubernetesClient::secrets, new Fabric8SecretMapper());
}

@Bean
ApplicationRunner k8sWatchConfigMaps(KubernetesResourceWatcherBinding watcherBinding) {
return args -> watcherBinding.watch(KubernetesClient::configMaps, new Fabric8ConfigMapMapper());
return args -> watcherBinding.inform(KubernetesClient::configMaps, new Fabric8ConfigMapMapper());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import java.io.Closeable;
import java.io.IOException;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
Expand All @@ -29,60 +27,98 @@ public class KubernetesResourceWatcherBinding implements AutoCloseable {
.addToMatchLabels(KubernetesLabels.CONTENTGRID_SERVICETYPE, "gateway")
.build();

private final Set<Closeable> watches = new HashSet<>();
private final Set<AutoCloseable> closeables = new HashSet<>();

public <T extends HasMetadata> void watch(Function<KubernetesClient, MixedOperation<T, ?, ?>> resourceSelector,
public <T extends HasMetadata> void inform(Function<KubernetesClient, MixedOperation<T, ?, ?>> resourceSelector,
KubernetesResourceMapper<T> mapper) {

var watch = resourceSelector.apply(client)
var informer = resourceSelector.apply(client)
.inNamespace(namespace)
.withLabelSelector(selector)
.watch(new ApplicationConfigResourceWatcher<>(this.appConfigRepository, mapper));
this.watches.add(watch);
.inform(new ApplicationConfigResourceHandler<>(this.appConfigRepository, mapper));
this.closeables.add(informer);
}

@Override
public void close() {
var closables = Set.copyOf(this.watches);
this.watches.clear();
var closables = Set.copyOf(this.closeables);
this.closeables.clear();

for (var closable : closables) {
try {
if (closable != null) {
closable.close();
}
} catch (IOException e) {
} catch (Exception e) {
log.warn("Closing watch failed", e);
}
}
}

@Slf4j
@RequiredArgsConstructor
static class ApplicationConfigResourceWatcher<T extends HasMetadata> implements Watcher<T> {
static class ApplicationConfigResourceHandler<T extends HasMetadata> implements ResourceEventHandler<T> {

private final ComposableApplicationConfigurationRepository appConfigRepository;

private final KubernetesResourceMapper<T> resourceMapper;

@Override
public void eventReceived(Action action, T resource) {
public void onNothing() {
ResourceEventHandler.super.onNothing();
}

@Override
public void onAdd(T resource) {
resourceMapper.apply(resource).ifPresent(fragment -> {
log.info("{} {} {} [app-id:{}]", action, resource.getKind().toLowerCase(),
resource.getMetadata().getName(), fragment.getApplicationId());
log.info("informer: on-add {} {} [app-id:{}]",
resource.getKind().toLowerCase(),
resource.getMetadata().getName(),
fragment.getApplicationId());

switch (action) {
case ADDED, MODIFIED -> appConfigRepository.merge(fragment);
case DELETED -> appConfigRepository.revoke(fragment);
default -> log.warn("Unknown action {} on secret {}", action, resource);
}
appConfigRepository.merge(fragment);
});
}

@Override
public void onClose(WatcherException cause) {
log.info("Closed configmap watcher", cause);
public void onUpdate(T oldResource, T newResource) {
var oldFragment = resourceMapper.apply(oldResource);
var newFragment = resourceMapper.apply(newResource);

if (Objects.equals(oldFragment, newFragment)) {
log.info("informer: on-update {} {} - data has not changed, skipping",
newResource.getKind().toLowerCase(),
newResource.getMetadata().getName());

return;
}

newFragment.ifPresent(fragment -> {
log.info("informer: on-update {} {} [app-id:{}]",
newResource.getKind().toLowerCase(),
newResource.getMetadata().getName(),
fragment.getApplicationId());
appConfigRepository.merge(fragment);
});
}
}

@Override
public void onDelete(T resource, boolean deletedFinalStateUnknown) {
resourceMapper.apply(resource).ifPresent(fragment -> {

if (deletedFinalStateUnknown) {
log.warn("MISSED DELETE EVENT - informer: on-delete {} {} [app-id:{}]",
resource.getKind().toLowerCase(),
resource.getMetadata().getName(),
fragment.getApplicationId());
} else {
log.info("informer: on-delete {} {} [app-id:{}]",
resource.getKind().toLowerCase(),
resource.getMetadata().getName(),
fragment.getApplicationId());
}
appConfigRepository.revoke(fragment);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.contentgrid.gateway.runtime.application.ApplicationId;
import com.contentgrid.gateway.runtime.config.ApplicationConfiguration.Keys;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

class ApplicationConfigurationFragmentTest {
Expand All @@ -27,7 +30,6 @@ class ApplicationConfigurationFragmentTest {
));



@Test
void getRoutingDomains() {
assertThat(SIMPLE.getDomains()).singleElement().isEqualTo("my-app.contentgrid.cloud");
Expand All @@ -46,4 +48,27 @@ void getCorsOrigins() {
assertThat(MISSING.getCorsOrigins()).isNotNull().isEmpty();
}

@Test
void testEquals() {
var configId = SIMPLE.getConfigurationId();
var appId = SIMPLE.getApplicationId();

var props1 = new LinkedHashMap<String, String>();
props1.put(Keys.ROUTING_DOMAINS, "my-app.contentgrid.cloud");
props1.put(Keys.CORS_ORIGINS, "https://my-app.contentgrid.app");

var props2 = new LinkedHashMap<String, String>();
props2.put(Keys.CORS_ORIGINS, "https://my-app.contentgrid.app");
props2.put(Keys.ROUTING_DOMAINS, "my-app.contentgrid.cloud");

assertThat(SIMPLE).isEqualTo(new ApplicationConfigurationFragment(configId, appId, props1));
assertThat(SIMPLE).isEqualTo(new ApplicationConfigurationFragment(configId, appId, props2));

assertThat(new ApplicationConfigurationFragment(configId, appId, props1)).isEqualTo(SIMPLE);
assertThat(new ApplicationConfigurationFragment(configId, appId, props2)).isEqualTo(SIMPLE);

assertThat(new ApplicationConfigurationFragment(configId, appId, props1))
.isEqualTo(new ApplicationConfigurationFragment(configId, appId, props2));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import com.contentgrid.gateway.runtime.application.ApplicationId;
import com.contentgrid.gateway.runtime.config.ApplicationConfigurationFragment;
import com.contentgrid.gateway.runtime.config.ComposableApplicationConfigurationRepository;
import com.contentgrid.gateway.runtime.config.kubernetes.KubernetesResourceWatcherBinding.ApplicationConfigResourceWatcher;
import com.contentgrid.gateway.runtime.config.kubernetes.KubernetesResourceWatcherBinding.ApplicationConfigResourceHandler;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.WatcherException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand All @@ -23,12 +24,12 @@ class KubernetesApplicationSecretsWatcherTest {
@Test
void secret_added() {
var configs = new ComposableApplicationConfigurationRepository();
var watcher = new ApplicationConfigResourceWatcher<>(configs, new Fabric8SecretMapper());
var handler = new ApplicationConfigResourceHandler<>(configs, new Fabric8SecretMapper());

var appId = ApplicationId.random();
var secret = createSecret(appId, UUID.randomUUID().toString());

watcher.eventReceived(Action.ADDED, secret);
handler.onAdd(secret);

assertThat(configs.getApplicationConfiguration(appId))
.isNotNull()
Expand All @@ -38,10 +39,38 @@ void secret_added() {
});
}

@Test
void secret_updated() {
var configs = new ComposableApplicationConfigurationRepository();
var handler = new ApplicationConfigResourceHandler<>(configs, new Fabric8SecretMapper());

var appId = ApplicationId.random();
var fragmentId = UUID.randomUUID().toString();

var oldSecret = createSecret(appId, fragmentId);
handler.onAdd(oldSecret);

// verify the starting condition
assertThat(configs.getApplicationConfiguration(appId)).isNotNull();

// create new secret with same app-id and fragment-id, but different properties
handler.onUpdate(oldSecret, createSecret(appId, fragmentId, Map.of("foo", "baz")));

// verify config has been updated
assertThat(configs.getApplicationConfiguration(appId)).isNotNull();

assertThat(configs.getApplicationConfiguration(appId))
.isNotNull()
.satisfies(appConfig -> {
assertThat(appConfig.getApplicationId()).isEqualTo(appId);
assertThat(appConfig.stream()).singleElement().isEqualTo(entry("foo", "baz"));
});
}

@Test
void secret_removed() {
var configs = new ComposableApplicationConfigurationRepository();
var watcher = new ApplicationConfigResourceWatcher<>(configs, new Fabric8SecretMapper());
var handler = new ApplicationConfigResourceHandler<>(configs, new Fabric8SecretMapper());

var appId = ApplicationId.random();
var fragmentId = UUID.randomUUID().toString();
Expand All @@ -54,7 +83,7 @@ void secret_removed() {
assertThat(configs.getApplicationConfiguration(appId)).isNotNull();

// process DELETED event of secret with UUID='fragmentId'
watcher.eventReceived(Action.DELETED, secret);
handler.onDelete(secret, false);

// verify config has been deleted
assertThat(configs.getApplicationConfiguration(appId)).isNull();
Expand All @@ -63,23 +92,21 @@ void secret_removed() {
@Test
void secret_unknown_operation() {
var configs = Mockito.mock(ComposableApplicationConfigurationRepository.class);
var watcher = new ApplicationConfigResourceWatcher<>(configs, new Fabric8SecretMapper());
var handler = new ApplicationConfigResourceHandler<>(configs, new Fabric8SecretMapper());

// unknown events should be ignored
watcher.eventReceived(Action.BOOKMARK, createSecret(ApplicationId.random(), UUID.randomUUID().toString()));
handler.onNothing();

Mockito.verifyNoMoreInteractions(configs);
}

@Test
void onClose() {
var configs = new ComposableApplicationConfigurationRepository();
var watcher = new ApplicationConfigResourceWatcher<>(configs, new Fabric8SecretMapper());
watcher.onClose(new WatcherException("making sonatype happy"));
@NonNull
private static Secret createSecret(ApplicationId appId, String fragmentId) {
return createSecret(appId, fragmentId, Map.of("foo", "bar"));
}

@NonNull
private static Secret createSecret(ApplicationId appId, String fragmentId) {
private static Secret createSecret(ApplicationId appId, String fragmentId, Map<String, String> properties) {
var encoder = Base64.getEncoder();
var metadata = new ObjectMeta();
metadata.setName("my-secret-name");
Expand All @@ -92,7 +119,10 @@ private static Secret createSecret(ApplicationId appId, String fragmentId) {

var secret = new Secret();
secret.setMetadata(metadata);
secret.setData(Map.of("foo", encoder.encodeToString("bar".getBytes())));
var data = properties.entrySet().stream()
.map(entry -> entry(entry.getKey(), encoder.encodeToString(entry.getValue().getBytes(StandardCharsets.UTF_8))))
.collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
secret.setData(data);
return secret;
}
}

0 comments on commit c757f7d

Please sign in to comment.