From 2b23d3e0dff48f951c4ced47220d44b9eeebb2d8 Mon Sep 17 00:00:00 2001 From: Hamado Dene Date: Tue, 20 Feb 2024 15:28:59 +0100 Subject: [PATCH] Fix listener close using channelGroup --- .../src/main/java/org/carapaceproxy/core/Listeners.java | 3 +++ .../server/config/NetworkListenerConfiguration.java | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/carapace-server/src/main/java/org/carapaceproxy/core/Listeners.java b/carapace-server/src/main/java/org/carapaceproxy/core/Listeners.java index dba64fa94..7f855ee00 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/core/Listeners.java +++ b/carapace-server/src/main/java/org/carapaceproxy/core/Listeners.java @@ -79,6 +79,7 @@ import org.carapaceproxy.utils.CertificatesUtils; import org.carapaceproxy.utils.PrometheusUtils; import reactor.netty.DisposableServer; +import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; import reactor.netty.http.server.HttpServer; @@ -148,6 +149,7 @@ private void stopListener(HostPort hostport) throws InterruptedException { ListeningChannel channel = listeningChannels.remove(hostport); if (channel != null) { channel.channel.disposeNow(Duration.ofSeconds(10)); + FutureMono.from(channel.getConfig().getGroup().close()).block(Duration.ofSeconds(30)); } } @@ -283,6 +285,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { .doOnConnection(conn -> { CURRENT_CONNECTED_CLIENTS_GAUGE.inc(); conn.channel().closeFuture().addListener(e -> CURRENT_CONNECTED_CLIENTS_GAUGE.dec()); + config.getGroup().add(conn.channel()); }) .httpRequestDecoder(option -> option.maxHeaderSize(currentConfiguration.getMaxHeaderSize())) .handle((request, response) -> { // Custom request-response handling diff --git a/carapace-server/src/main/java/org/carapaceproxy/server/config/NetworkListenerConfiguration.java b/carapace-server/src/main/java/org/carapaceproxy/server/config/NetworkListenerConfiguration.java index e050c780f..2ec12b5e5 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/server/config/NetworkListenerConfiguration.java +++ b/carapace-server/src/main/java/org/carapaceproxy/server/config/NetworkListenerConfiguration.java @@ -21,6 +21,10 @@ import java.util.Collections; import java.util.Set; + +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.DefaultEventExecutor; import lombok.Data; /** @@ -44,6 +48,8 @@ public class NetworkListenerConfiguration { private int keepAliveCount; private int maxKeepAliveRequests; + private ChannelGroup group; + public HostPort getKey() { return new HostPort(host, port); } @@ -96,6 +102,7 @@ public NetworkListenerConfiguration(String host, this.keepAliveInterval = keepAliveInterval; this.keepAliveCount = keepAliveCount; this.maxKeepAliveRequests = maxKeepAliveRequests; + this.group = new DefaultChannelGroup(new DefaultEventExecutor()); } }