Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: merge similar-purpose EndpointKey and HostPort classes #515

Merged
merged 9 commits into from
Nov 26, 2024
20 changes: 13 additions & 7 deletions carapace-server/src/main/java/org/carapaceproxy/EndpointStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,32 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.ToString;
import org.carapaceproxy.client.EndpointKey;
import org.carapaceproxy.core.EndpointKey;

/**
* Stats about an endpoint
*
* @author enrico.olivelli
*/
@ToString
public class EndpointStats {

private final EndpointKey key;
@Getter
private final AtomicInteger totalRequests = new AtomicInteger();
@Getter
private final AtomicLong lastActivity = new AtomicLong();

public EndpointStats(EndpointKey key) {
public EndpointStats(final EndpointKey key) {
this.key = key;
}

public String toString() {
return "EndpointStats(key=" + this.key + ", totalRequests=" + this.totalRequests + ", lastActivity=" + this.lastActivity + ")";
}

public AtomicInteger getTotalRequests() {
return this.totalRequests;
}

public AtomicLong getLastActivity() {
return this.lastActivity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import javax.ws.rs.Produces;
import lombok.Data;
import org.carapaceproxy.EndpointStats;
import org.carapaceproxy.client.EndpointKey;
import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.core.HttpProxyServer.ConnectionPoolStats;
import org.carapaceproxy.server.backends.BackendHealthCheck;
Expand Down Expand Up @@ -75,15 +75,14 @@ public BackendBean(String id, String host, int port) {
@GET
public Map<String, BackendBean> getAll() {
HttpProxyServer server = (HttpProxyServer) context.getAttribute("server");
Map<String, BackendHealthStatus> backendsSnapshot = server.getBackendHealthManager().getBackendsSnapshot();
Map<EndpointKey, BackendHealthStatus> backendsSnapshot = server.getBackendHealthManager().getBackendsSnapshot();
Map<String, BackendBean> res = new HashMap<>();

server.getMapper().getBackends().values().forEach(backendConf -> {
String id = backendConf.id();
String hostPort = backendConf.getHostPort();
BackendBean bean = new BackendBean(id, backendConf.host(), backendConf.port());
bean.lastProbePath = backendConf.probePath();
EndpointKey key = EndpointKey.make(hostPort);
EndpointKey key = backendConf.hostPort();
Map<String, ConnectionPoolStats> poolsStats = server.getConnectionPoolsStats().get(key);
if (poolsStats != null) {
bean.openConnections = poolsStats.values().stream()
Expand All @@ -95,7 +94,7 @@ public Map<String, BackendBean> getAll() {
bean.totalRequests = epstats.getTotalRequests().longValue();
bean.lastActivityTs = epstats.getLastActivity().longValue();
}
BackendHealthStatus bhs = backendsSnapshot.get(hostPort);
BackendHealthStatus bhs = backendsSnapshot.get(key);
if (bhs != null) {
bean.available = bhs.isAvailable();
bean.reportedAsUnreachable = bhs.isReportedAsUnreachable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import lombok.Data;
import org.carapaceproxy.client.EndpointKey;
import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.server.config.NetworkListenerConfiguration;

Expand Down Expand Up @@ -72,7 +72,8 @@ public Map<String, ListenerBean> getAllListeners() {
config.getDefaultCertificate(),
(int) listener.getValue().getTotalRequests().get()
);
return Map.entry(EndpointKey.make(config.getHost(), port).getHostPort(), bean);
EndpointKey endpointKey = EndpointKey.make(config.getHost(), port);
return Map.entry(endpointKey.toString(), bean);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.carapaceproxy.client;

import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.core.RuntimeServerConfiguration;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;
import org.carapaceproxy.EndpointStats;
import org.carapaceproxy.core.EndpointKey;

/**
* Stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.core.ProxyRequestsManager;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.carapaceproxy.core;

import org.carapaceproxy.client.EndpointKey;
import reactor.netty.http.HttpProtocol;

/**
Expand All @@ -11,6 +10,6 @@
*/
public record ConnectionKey(String host, HttpProtocol protocolVersion) {
public ConnectionKey(final EndpointKey key, final String id, final HttpProtocol protocolVersion) {
this(key.getHostPort() + "_" + id, protocolVersion);
this(key.toString() + "_" + id, protocolVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,34 @@
under the License.

*/
package org.carapaceproxy.client;
package org.carapaceproxy.core;

import lombok.Data;
import org.carapaceproxy.utils.StringUtils;

/**
* Identifier of an endpoint
*
* @author enrico.olivelli
*/
@Data
public final class EndpointKey {
public record EndpointKey(String host, int port) {

private final String host;
private final int port;
/**
* The minimum port value according to <a href="https://tools.ietf.org/html/rfc6335">RFC 6335</a>.
*/
public static final int MIN_PORT = 0;
/**
* The maximum port value according to <a href="https://tools.ietf.org/html/rfc6335">RFC 6335</a>.
*/
public static final int MAX_PORT = 0xffff;

public EndpointKey {
if (StringUtils.isBlank(host)) {
throw new IllegalArgumentException("Hostname cannot be blank");
}
if (port > MAX_PORT || port < MIN_PORT) {
throw new IllegalArgumentException("Invalid port: " + port);
}
}

public static EndpointKey make(String host, int port) {
return new EndpointKey(host, port);
Expand All @@ -39,20 +53,15 @@ public static EndpointKey make(String host, int port) {
public static EndpointKey make(String hostAndPort) {
int pos = hostAndPort.indexOf(':');
if (pos <= 0) {
return new EndpointKey(hostAndPort, 0);
return new EndpointKey(hostAndPort, MIN_PORT);
}
String host = hostAndPort.substring(0, pos);
int port = Integer.parseInt(hostAndPort.substring(pos + 1));
return new EndpointKey(host, port);
}

public EndpointKey(String host, int port) {
this.host = host;
this.port = port;
}

public String getHostPort() {
@Override
public String toString() {
return host + ":" + port;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.carapaceproxy.api.AuthAPIRequestsFilter;
import org.carapaceproxy.api.ConfigResource;
import org.carapaceproxy.api.ForceHeadersAPIRequestsFilter;
import org.carapaceproxy.client.EndpointKey;
import org.carapaceproxy.cluster.GroupMembershipHandler;
import org.carapaceproxy.cluster.impl.NullGroupMembershipHandler;
import org.carapaceproxy.cluster.impl.ZooKeeperGroupMembershipHandler;
Expand Down
33 changes: 16 additions & 17 deletions carapace-server/src/main/java/org/carapaceproxy/core/Listeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import jdk.net.ExtendedSocketOptions;
import lombok.Data;
import org.carapaceproxy.server.config.ConfigurationNotValidException;
import org.carapaceproxy.server.config.HostPort;
import org.carapaceproxy.server.config.NetworkListenerConfiguration;
import org.carapaceproxy.server.config.SSLCertificateConfiguration;
import org.carapaceproxy.utils.CarapaceLogger;
Expand Down Expand Up @@ -100,7 +99,7 @@ public class Listeners {

private final HttpProxyServer parent;
private final Map<String, SslContext> sslContexts = new ConcurrentHashMap<>();
private final Map<HostPort, ListeningChannel> listeningChannels = new ConcurrentHashMap<>();
private final Map<EndpointKey, ListeningChannel> listeningChannels = new ConcurrentHashMap<>();
private final File basePath;
private boolean started;

Expand All @@ -120,7 +119,7 @@ public int getLocalPort() {
return -1;
}

public Map<HostPort, ListeningChannel> getListeningChannels() {
public Map<EndpointKey, ListeningChannel> getListeningChannels() {
return listeningChannels;
}

Expand All @@ -130,7 +129,7 @@ public void start() throws InterruptedException, ConfigurationNotValidException
}

public void stop() {
for (HostPort key : listeningChannels.keySet()) {
for (EndpointKey key : listeningChannels.keySet()) {
try {
stopListener(key);
} catch (InterruptedException ex) {
Expand All @@ -140,7 +139,7 @@ public void stop() {
}
}

private void stopListener(HostPort hostport) throws InterruptedException {
private void stopListener(EndpointKey hostport) throws InterruptedException {
ListeningChannel channel = listeningChannels.remove(hostport);
if (channel != null) {
channel.channel.disposeNow(Duration.ofSeconds(10));
Expand Down Expand Up @@ -173,10 +172,10 @@ void reloadConfiguration(RuntimeServerConfiguration newConfiguration) throws Int
sslContexts.clear();

// stop dropped listeners, start new one
List<HostPort> listenersToStop = new ArrayList<>();
List<HostPort> listenersToRestart = new ArrayList<>();
for (Map.Entry<HostPort, ListeningChannel> channel : listeningChannels.entrySet()) {
HostPort key = channel.getKey();
List<EndpointKey> listenersToStop = new ArrayList<>();
List<EndpointKey> listenersToRestart = new ArrayList<>();
for (Map.Entry<EndpointKey, ListeningChannel> channel : listeningChannels.entrySet()) {
EndpointKey key = channel.getKey();
NetworkListenerConfiguration actualListenerConfig = currentConfiguration.getListener(key);
NetworkListenerConfiguration newConfigurationForListener = newConfiguration.getListener(key);
if (newConfigurationForListener == null) {
Expand All @@ -190,9 +189,9 @@ void reloadConfiguration(RuntimeServerConfiguration newConfiguration) throws Int
}
channel.getValue().clear();
}
List<HostPort> listenersToStart = new ArrayList<>();
List<EndpointKey> listenersToStart = new ArrayList<>();
for (NetworkListenerConfiguration config : newConfiguration.getListeners()) {
HostPort key = config.getKey();
EndpointKey key = config.getKey();
if (!listeningChannels.containsKey(key)) {
LOG.log(Level.INFO, "listener: {0} is to be started", key);
listenersToStart.add(key);
Expand All @@ -203,19 +202,19 @@ void reloadConfiguration(RuntimeServerConfiguration newConfiguration) throws Int
currentConfiguration = newConfiguration;

try {
for (HostPort hostport : listenersToStop) {
for (EndpointKey hostport : listenersToStop) {
LOG.log(Level.INFO, "Stopping {0}", hostport);
stopListener(hostport);
}

for (HostPort hostport : listenersToRestart) {
for (EndpointKey hostport : listenersToRestart) {
LOG.log(Level.INFO, "Restart {0}", hostport);
stopListener(hostport);
NetworkListenerConfiguration newConfigurationForListener = currentConfiguration.getListener(hostport);
bootListener(newConfigurationForListener);
}

for (HostPort hostport : listenersToStart) {
for (EndpointKey hostport : listenersToStart) {
LOG.log(Level.INFO, "Starting {0}", hostport);
NetworkListenerConfiguration newConfigurationForListener = currentConfiguration.getListener(hostport);
bootListener(newConfigurationForListener);
Expand All @@ -227,7 +226,7 @@ void reloadConfiguration(RuntimeServerConfiguration newConfiguration) throws Int
}

private void bootListener(NetworkListenerConfiguration config) throws InterruptedException {
HostPort hostPort = new HostPort(config.getHost(), config.getPort() + parent.getListenersOffsetPort());
EndpointKey hostPort = new EndpointKey(config.getHost(), config.getPort() + parent.getListenersOffsetPort());
ListeningChannel listeningChannel = new ListeningChannel(hostPort, config);
LOG.log(Level.INFO, "Starting listener at {0}:{1} ssl:{2}", new Object[]{hostPort.host(), String.valueOf(hostPort.port()), config.isSsl()});

Expand Down Expand Up @@ -331,13 +330,13 @@ protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocato
@Data
public final class ListeningChannel implements io.netty.util.AsyncMapping<String, SslContext> {

private final HostPort hostPort;
private final EndpointKey hostPort;
private final NetworkListenerConfiguration config;
private final Counter.Child totalRequests;
private final Map<String, SslContext> listenerSslContexts = new HashMap<>();
DisposableServer channel;

public ListeningChannel(HostPort hostPort, NetworkListenerConfiguration config) {
public ListeningChannel(EndpointKey hostPort, NetworkListenerConfiguration config) {
this.hostPort = hostPort;
this.config = config;
totalRequests = TOTAL_REQUESTS_PER_LISTENER_COUNTER.labels(hostPort.host() + "_" + hostPort.port());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import lombok.Data;
import org.carapaceproxy.server.config.HostPort;
import org.carapaceproxy.server.filters.UrlEncodedQueryString;
import org.carapaceproxy.server.mapper.MapResult;
import org.carapaceproxy.server.mapper.requestmatcher.MatchingContext;
Expand Down Expand Up @@ -69,7 +68,7 @@ public class ProxyRequest implements MatchingContext {
private final long id = REQUESTS_ID_GENERATOR.incrementAndGet();
private final HttpServerRequest request;
private final HttpServerResponse response;
private final HostPort listener;
private final EndpointKey listener;
private MapResult action;
private String userId;
private String sessionId;
Expand All @@ -83,7 +82,7 @@ public class ProxyRequest implements MatchingContext {
private boolean servedFromCache;
private HttpVersion httpProtocol;

public ProxyRequest(HttpServerRequest request, HttpServerResponse response, HostPort listener) {
public ProxyRequest(HttpServerRequest request, HttpServerResponse response, EndpointKey listener) {
this.request = request;
this.response = response;
this.listener = listener;
Expand Down
Loading
Loading