Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert a rate limit for cold backends #512

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ public Map<String, BackendBean> getAll() {
}
BackendHealthStatus bhs = backendsSnapshot.get(key);
if (bhs != null) {
bean.available = bhs.isAvailable();
bean.reportedAsUnreachable = bhs.isReportedAsUnreachable();
bean.reportedAsUnreachableTs = bhs.getReportedAsUnreachableTs();
bean.available = bhs.getStatus() != BackendHealthStatus.Status.DOWN;
bean.reportedAsUnreachable = bhs.getStatus() == BackendHealthStatus.Status.DOWN;
bean.reportedAsUnreachableTs = bhs.getUnreachableSince();
BackendHealthCheck lastProbe = bhs.getLastProbe();
if (lastProbe != null) {
bean.lastProbeTs = lastProbe.endTs();
bean.lastProbeSuccess = lastProbe.isOk();
bean.lastProbeSuccess = lastProbe.ok();
bean.httpResponse = lastProbe.httpResponse();
bean.httpBody = lastProbe.httpBody();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ public List<String> getBackends() {

@GET
public List<DirectorBean> getAll() {
final List<DirectorBean> directors = new ArrayList();
final List<DirectorBean> directors = new ArrayList<>();
HttpProxyServer server = (HttpProxyServer) context.getAttribute("server");
server.getMapper().getDirectors().forEach(director -> {
directors.add(new DirectorBean(director.getId(), director.getBackends()));
});
server.getMapper().getDirectors()
.forEach((directorId, director) -> directors.add(new DirectorBean(directorId, director.getBackends())));

return directors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public class HttpProxyServer implements AutoCloseable {
*/
private final ReentrantLock configurationLock = new ReentrantLock();

private Server adminserver;
private Server adminServer;
private String adminAccessLogPath = "admin.access.log";
private String adminAccessLogTimezone = "GMT";
private int adminLogRetentionDays = 90;
Expand All @@ -226,48 +226,52 @@ public class HttpProxyServer implements AutoCloseable {
@Getter
private int listenersOffsetPort = 0;

public static HttpProxyServer buildForTests(String host, int port, EndpointMapper mapper, File baseDir) throws ConfigurationNotValidException {
HttpProxyServer res = new HttpProxyServer(mapper, baseDir.getAbsoluteFile());
res.currentConfiguration.addListener(new NetworkListenerConfiguration(host, port));
res.proxyRequestsManager.reloadConfiguration(res.currentConfiguration, mapper.getBackends().values());

return res;
}

public HttpProxyServer(EndpointMapper mapper, File basePath) {
@VisibleForTesting
public static HttpProxyServer buildForTests(
final String host,
final int port,
final EndpointMapper.Factory mapperFactory,
final File baseDir
) throws ConfigurationNotValidException {
final HttpProxyServer server = new HttpProxyServer(mapperFactory, baseDir.getAbsoluteFile());
final EndpointMapper mapper = server.getMapper();
server.currentConfiguration.addListener(new NetworkListenerConfiguration(host, port));
server.proxyRequestsManager.reloadConfiguration(server.currentConfiguration, mapper.getBackends().values());
return server;
}

public HttpProxyServer(final EndpointMapper.Factory mapperFactory, File basePath) throws ConfigurationNotValidException {
// metrics
statsProvider = new PrometheusMetricsProvider();
mainLogger = statsProvider.getStatsLogger("");
prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Metrics.globalRegistry.add(prometheusRegistry);
this.statsProvider = new PrometheusMetricsProvider();
this.mainLogger = this.statsProvider.getStatsLogger("");
this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Metrics.globalRegistry.add(this.prometheusRegistry);
Metrics.globalRegistry.config()
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.data"))) // spam
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.response"))) // spam
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.errors"))); // spam

this.mapper = mapper;
this.basePath = basePath;
this.filters = new ArrayList<>();
this.currentConfiguration = new RuntimeServerConfiguration();
this.backendHealthManager = new BackendHealthManager(currentConfiguration, mapper);
this.listeners = new Listeners(this);
this.cache = new ContentsCache(currentConfiguration);
this.requestsLogger = new RequestsLogger(currentConfiguration);
this.dynamicCertificatesManager = new DynamicCertificatesManager(this);
this.trustStoreManager = new TrustStoreManager(currentConfiguration, this);
this.ocspStaplingManager = new OcspStaplingManager(trustStoreManager);
this.proxyRequestsManager = new ProxyRequestsManager(this);
if (mapper != null) {
mapper.setParent(this);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, mapper.getBackends().values());
}
this.mapper = mapperFactory.build(this);
this.backendHealthManager = new BackendHealthManager(currentConfiguration, this.mapper);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, this.mapper.getBackends().values());

this.usePooledByteBufAllocator = Boolean.getBoolean("cache.allocator.usepooledbytebufallocator");
this.cachePoolAllocator = usePooledByteBufAllocator
? new PooledByteBufAllocator(true) : new UnpooledByteBufAllocator(true);
? new PooledByteBufAllocator(true)
: new UnpooledByteBufAllocator(true);
this.cacheByteBufMemoryUsageMetric = new CacheByteBufMemoryUsageMetric(this);
//Best practice is to reuse EventLoopGroup
// Best practice is to reuse EventLoopGroup
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#25.0
this.eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
}
Expand All @@ -283,13 +287,6 @@ public int getLocalPort() {
return listeners.getLocalPort();
}

@VisibleForTesting
public void setMapper(EndpointMapper mapper) {
Objects.requireNonNull(mapper);
mapper.setParent(this);
this.mapper = mapper;
}

public void startAdminInterface() throws Exception {
if (!adminServerEnabled) {
return;
Expand All @@ -299,17 +296,17 @@ public void startAdminInterface() throws Exception {
throw new RuntimeException("To enable admin interface at least one between http and https port must be set");
}

adminserver = new Server();
adminServer = new Server();

ServerConnector httpConnector = null;
if (adminServerHttpPort >= 0) {
LOG.info("Starting Admin UI over HTTP");

httpConnector = new ServerConnector(adminserver);
httpConnector = new ServerConnector(adminServer);
httpConnector.setPort(adminServerHttpPort);
httpConnector.setHost(adminServerHost);

adminserver.addConnector(httpConnector);
adminServer.addConnector(httpConnector);
}

ServerConnector httpsConnector = null;
Expand All @@ -333,17 +330,17 @@ public void startAdminInterface() throws Exception {
https.setSecurePort(adminServerHttpsPort);
https.addCustomizer(new SecureRequestCustomizer());

httpsConnector = new ServerConnector(adminserver,
httpsConnector = new ServerConnector(adminServer,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(https));
httpsConnector.setPort(adminServerHttpsPort);
httpsConnector.setHost(adminServerHost);

adminserver.addConnector(httpsConnector);
adminServer.addConnector(httpsConnector);
}

ContextHandlerCollection contexts = new ContextHandlerCollection();
adminserver.setHandler(constrainTraceMethod(contexts));
adminServer.setHandler(constrainTraceMethod(contexts));

File webUi = new File(basePath, "web/ui");
if (webUi.isDirectory()) {
Expand Down Expand Up @@ -379,7 +376,7 @@ public void startAdminInterface() throws Exception {

contexts.addHandler(requestLogHandler);

adminserver.start();
adminServer.start();

LOG.info("Admin UI started");

Expand Down Expand Up @@ -472,13 +469,13 @@ public void close() {
ocspStaplingManager.stop();
cacheByteBufMemoryUsageMetric.stop();

if (adminserver != null) {
if (adminServer != null) {
try {
adminserver.stop();
adminServer.stop();
} catch (Exception err) {
LOG.error("Error while stopping admin server", err);
} finally {
adminserver = null;
adminServer = null;
}
}
statsProvider.stop();
Expand All @@ -496,21 +493,20 @@ public void close() {
staticContentsManager.close();

if (dynamicConfigurationStore != null) {
// this will also shutdown embedded database
// this will also shut down embedded database
dynamicConfigurationStore.close();
}
}

private static EndpointMapper buildMapper(String className, ConfigurationStore properties) throws ConfigurationNotValidException {
private static EndpointMapper buildMapper(final String className, final HttpProxyServer parent, final ConfigurationStore properties) throws ConfigurationNotValidException {
try {
EndpointMapper res = (EndpointMapper) Class.forName(className).getConstructor().newInstance();
final Class<? extends EndpointMapper> mapperClass = Class.forName(className).asSubclass(EndpointMapper.class);
final EndpointMapper res = mapperClass.getConstructor(HttpProxyServer.class).newInstance(parent);
res.configure(properties);
return res;
} catch (ClassNotFoundException err) {
} catch (final ClassNotFoundException | ClassCastException | NoSuchMethodException err) {
throw new ConfigurationNotValidException(err);
} catch (IllegalAccessException | IllegalArgumentException
| InstantiationException | NoSuchMethodException
| SecurityException | InvocationTargetException err) {
} catch (final IllegalAccessException | IllegalArgumentException | InstantiationException | SecurityException | InvocationTargetException err) {
throw new RuntimeException(err);
}
}
Expand Down Expand Up @@ -654,7 +650,7 @@ public RuntimeServerConfiguration buildValidConfiguration(ConfigurationStore sim

// Try to perform a service configuration from the passed store.
newConfiguration.configure(simpleStore);
buildMapper(newConfiguration.getMapperClassname(), simpleStore);
buildMapper(newConfiguration.getMapperClassname(), this, simpleStore);
buildRealm(userRealmClassname, simpleStore);

return newConfiguration;
Expand Down Expand Up @@ -747,8 +743,7 @@ private void applyDynamicConfiguration(ConfigurationStore newConfigurationStore,
}
try {
RuntimeServerConfiguration newConfiguration = buildValidConfiguration(storeWithConfig);
EndpointMapper newMapper = buildMapper(newConfiguration.getMapperClassname(), storeWithConfig);
newMapper.setParent(this);
EndpointMapper newMapper = buildMapper(newConfiguration.getMapperClassname(), this, storeWithConfig);
UserRealm newRealm = buildRealm(userRealmClassname, storeWithConfig);

this.filters = buildFilters(newConfiguration);
Expand Down Expand Up @@ -876,7 +871,8 @@ public Map<EndpointKey, Map<String, ConnectionPoolStats>> getConnectionPoolsStat
if (!metric.getName().startsWith(CONNECTION_PROVIDER_PREFIX)) {
return;
}
EndpointKey key = EndpointKey.make(metric.getTag(REMOTE_ADDRESS));
final String remoteAddress = Objects.requireNonNull(metric.getTag(REMOTE_ADDRESS));
EndpointKey key = EndpointKey.make(remoteAddress);
Map<String, ConnectionPoolStats> pools = res.computeIfAbsent(key, k -> new HashMap<>());
String poolName = metric.getTag(NAME);
ConnectionPoolStats stats = pools.computeIfAbsent(poolName, k -> new ConnectionPoolStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocato
.handle((request, response) -> { // Custom request-response handling
if (LOG.isDebugEnabled()) {
LOG.debug(
"Receive request {} From {} Timestamp {}",
request.uri(),
request.remoteAddress(),
"Receive request {} From {} Timestamp {}", request.uri(), request.remoteAddress(),
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss.SSS"))
);
}
Expand All @@ -312,10 +310,7 @@ protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocato
// response compression
if (currentConfiguration.getResponseCompressionThreshold() >= 0) {
LOG.debug(
"Response compression enabled with min size = {} bytes for listener {}",
currentConfiguration.getResponseCompressionThreshold(),
hostPort
);
"Response compression enabled with min size = {} bytes for listener {}", currentConfiguration.getResponseCompressionThreshold(), hostPort);
httpServer = httpServer.compress(currentConfiguration.getResponseCompressionThreshold());
} else {
LOG.debug("Response compression disabled for listener {}", hostPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ public boolean isValidHostAndPort(final String hostAndPort) {
if (hostAndPort == null) {
return false;
}
HostAndPort parsed = HostAndPort.fromString(hostAndPort);
String host = parsed.getHost();
final HostAndPort parsed = HostAndPort.fromString(hostAndPort);
final String host = parsed.getHost();
if (parsed.hasPort()) {
return !host.isBlank()
&& (InternetDomainName.isValid(host) || InetAddresses.isInetAddress(host))
&& parsed.getPort() >= 0
&& parsed.getPort() <= 65535;
&& parsed.getPort() <= EndpointKey.MAX_PORT;
} else {
return !host.isBlank()
&& (InternetDomainName.isValid(host) || InetAddresses.isInetAddress(host));
Expand Down
Loading
Loading