diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java index d4d2b8b8ff..63272a4e67 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java @@ -20,10 +20,11 @@ import io.vertx.core.Vertx; import java.io.File; import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; @@ -35,15 +36,17 @@ public class OIDCDiscoveryConfigListener implements AutoCloseable { private final Vertx vertx; private final FileWatcher configFeaturesWatcher; private final int timeoutSeconds; - private final CopyOnWriteArrayList> callbacks; + private final ConcurrentHashMap> callbacks; private final AtomicReference oidcDiscoveryConfig; + private final AtomicInteger callbackIdGenerator; public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException { this.featuresConfigPath = featuresConfigPath; this.vertx = vertx; this.timeoutSeconds = timeoutSeconds; this.oidcDiscoveryConfig = new AtomicReference<>(); - this.callbacks = new CopyOnWriteArrayList<>(); + this.callbacks = new ConcurrentHashMap<>(); + this.callbackIdGenerator = new AtomicInteger(0); this.buildFeaturesAndOIDCDiscoveryConfig(); @@ -53,7 +56,7 @@ public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int t this.buildFeaturesAndOIDCDiscoveryConfig(); OIDCDiscoveryConfig config = this.oidcDiscoveryConfig.get(); if (config != null) { - this.callbacks.forEach(callback -> callback.accept(config)); + this.callbacks.values().forEach(callback -> callback.accept(config)); } } }); @@ -66,12 +69,13 @@ public OIDCDiscoveryConfig getOidcDiscoveryConfig() { } public int registerCallback(Consumer callback) { - this.callbacks.add(callback); - return this.callbacks.size() - 1; + int id = callbackIdGenerator.incrementAndGet(); + this.callbacks.put(id, callback); + return id; } public void deregisterCallback(int callbackId) { - this.callbacks.set(callbackId, null); + this.callbacks.remove(callbackId); } private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException {