Skip to content

Commit

Permalink
Add onKeyExchanged listener on PrometheusRSocketClient, improve closi…
Browse files Browse the repository at this point in the history
…ng of stale rsockets in proxy
  • Loading branch information
Jon Schneider committed Apr 7, 2020
1 parent f954f2d commit af4a01a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,38 @@ public class PrometheusRSocketClient {
private final PrometheusMeterRegistry registry;
private final Disposable connection;
private AtomicReference<PublicKey> latestKey = new AtomicReference<>();

private final AbstractRSocket rsocket = new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
PublicKey key = decodePublicKey(payload.getData());
latestKey.set(key);
onKeyExchanged.run();
return Mono.just(scrapePayload(key));
}

@Override
public Mono<Void> fireAndForget(Payload payload) {
latestKey.set(decodePublicKey(payload.getData()));
onKeyExchanged.run();
return Mono.empty();
}
};

private boolean pushOnDisconnect = false;
private RSocket sendingSocket;
private Runnable onKeyExchanged;

public PrometheusRSocketClient(PrometheusMeterRegistry registry, ClientTransport transport,
UnaryOperator<Flux<Void>> customizeAndRetry) {
this(registry, transport, customizeAndRetry, () -> {});
}

public PrometheusRSocketClient(PrometheusMeterRegistry registry, ClientTransport transport,
UnaryOperator<Flux<Void>> customizeAndRetry,
Runnable onKeyExchanged) {
this.registry = registry;
this.onKeyExchanged = onKeyExchanged;
Counter attempts = Counter.builder("prometheus.connection.attempts")
.description("Attempts at making an outbound RSocket connection to the Prometheus proxy")
.baseUnit("attempts")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2019 Pivotal Software, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.prometheus.rsocket;

import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Flux;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class SampleClientThatClosesManually {
public static void main(String[] args) throws InterruptedException {
PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
meterRegistry.config().commonTags("process.id", ManagementFactory.getRuntimeMXBean().getName());

CountDownLatch keyExchanges = new CountDownLatch(1);

PrometheusRSocketClient client = new PrometheusRSocketClient(meterRegistry,
WebsocketClientTransport.create("localhost", 8081),
c -> c.retry(Long.MAX_VALUE),
keyExchanges::countDown);

Random r = new Random();

Counter counter = meterRegistry.counter("my.counter", "instance", Integer.toString(r.nextInt(10)));
counter.increment();

keyExchanges.await(10, TimeUnit.SECONDS);
client.pushAndClose();

Flux.interval(Duration.ofSeconds(1)).blockLast();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,28 @@ class PrometheusController {
private final DistributionSummary scrapePayload;
private final MicrometerRSocketInterceptor metricsInterceptor;
private PrometheusControllerProperties properties;

private Map<RSocket, ConnectionState> scrapableApps = new ConcurrentHashMap<>();

// keyed by the RSocket that listens for dying pushes, the value is the RSocket used for request/response scrapes.
// private Map<RSocket, RSocket> scrapableAppsByDyingPushListener = new ConcurrentHashMap<>();

PrometheusController(PrometheusMeterRegistry meterRegistry, PrometheusControllerProperties properties) {
this.meterRegistry = meterRegistry;
this.metricsInterceptor = new MicrometerRSocketInterceptor(meterRegistry);
this.properties = properties;
meterRegistry.gaugeMapSize("prometheus.proxy.scrape.active.connections", Tags.empty(), scrapableApps);

this.scrapeTimerSuccess = Timer.builder("prometheus.proxy.scrape")
.tag("outcome", "success")
.publishPercentileHistogram()
.register(meterRegistry);
.tag("outcome", "success")
.publishPercentileHistogram()
.register(meterRegistry);

this.scrapeTimerClosed = meterRegistry.timer("prometheus.proxy.scrape", "outcome", "closed");
this.scrapePayload = DistributionSummary.builder("prometheus.proxy.scrape.payload")
.publishPercentileHistogram()
.baseUnit("bytes")
.register(meterRegistry);
.publishPercentileHistogram()
.baseUnit("bytes")
.register(meterRegistry);

this.scrapeSocketsClosed = meterRegistry.counter("prometheus.proxy.scrape.sockets.closed");
}
Expand All @@ -93,32 +97,32 @@ public void connect() throws NoSuchAlgorithmException {
KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA");

RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor((setup, sendingSocket) -> acceptRSocket(generator, sendingSocket))
.transport(TcpServerTransport.create(this.properties.getTcpPort()))
.start()
.subscribe();
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor((setup, sendingSocket) -> acceptRSocket(generator, sendingSocket))
.transport(TcpServerTransport.create(this.properties.getTcpPort()))
.start()
.subscribe();

RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor((setup, sendingSocket) -> acceptRSocket(generator, sendingSocket))
.transport(WebsocketServerTransport.create(this.properties.getWebsocketPort()))
.start()
.subscribe();
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor((setup, sendingSocket) -> acceptRSocket(generator, sendingSocket))
.transport(WebsocketServerTransport.create(this.properties.getWebsocketPort()))
.start()
.subscribe();
}

private Mono<RSocket> acceptRSocket(KeyPairGenerator generator, RSocket sendingSocket) {
// respond with Mono.error(..) to
RSocket metricsInterceptedSendingSocket = metricsInterceptor.apply(sendingSocket);

ConnectionState connectionState = new ConnectionState(generator.generateKeyPair());
scrapableApps.put(metricsInterceptor.apply(sendingSocket), connectionState);
scrapableApps.put(metricsInterceptedSendingSocket, connectionState);

// for use by the client to push metrics as it's dying if this happens before the first scrape
sendingSocket.fireAndForget(connectionState.createKeyPayload())
.subscribe();
// a key to be used by the client to push metrics as it's dying if this happens before the first scrape
metricsInterceptedSendingSocket.fireAndForget(connectionState.createKeyPayload()).subscribe();

// dispose this in order to disconnect the client
return Mono.just(new AbstractRSocket() {
RSocket dyingPushReceiver = new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
Expand All @@ -128,7 +132,9 @@ public Mono<Void> fireAndForget(Payload payload) {
}
return Mono.empty();
}
});
};

return Mono.just(dyingPushReceiver);
}

@GetMapping(value = "/metrics/proxy", produces = "text/plain")
Expand All @@ -139,29 +145,32 @@ public Mono<String> proxyMetrics() {
@GetMapping(value = "/metrics/connected", produces = "text/plain")
public Mono<String> prometheus() {
return Flux
.fromIterable(scrapableApps.entrySet())
.flatMap(socketAndState -> {
ConnectionState connectionState = socketAndState.getValue();
RSocket rsocket = socketAndState.getKey();
Timer.Sample sample = Timer.start();
return rsocket
.requestResponse(connectionState.createKeyPayload())
.map(payload -> connectionState.receiveScrapePayload(payload, sample))
.onErrorResume(throwable -> {
if (throwable instanceof ClosedChannelException) {
scrapeSocketsClosed.increment();
scrapableApps.remove(rsocket);
sample.stop(scrapeTimerClosed);
return connectionState.getDyingPush();
}

sample.stop(meterRegistry.timer("prometheus.proxy.scrape", "outcome", "error",
"exception", throwable.getClass().getName()));

return Mono.empty();
});
})
.collect(Collectors.joining("\n"));
.fromIterable(scrapableApps.entrySet())
.flatMap(socketAndState -> {
ConnectionState connectionState = socketAndState.getValue();
RSocket rsocket = socketAndState.getKey();
Timer.Sample sample = Timer.start();
return rsocket
.requestResponse(connectionState.createKeyPayload())
.map(payload -> connectionState.receiveScrapePayload(payload, sample))
.onErrorResume(throwable -> {
scrapeSocketsClosed.increment();

if (scrapableApps.remove(rsocket) != null) {
meterRegistry.counter("prometheus.proxy.unrecognized.rsocket").increment();
}

if (throwable instanceof ClosedChannelException) {
sample.stop(scrapeTimerClosed);
return connectionState.getDyingPush();
} else {
sample.stop(meterRegistry.timer("prometheus.proxy.scrape", "outcome", "error",
"exception", throwable.getClass().getName()));
return Mono.empty();
}
});
})
.collect(Collectors.joining("\n"));
}

class ConnectionState {
Expand All @@ -187,8 +196,8 @@ String receiveScrapePayload(Payload payload, Timer.Sample timing) {
ByteBuf sliceMetadata = payload.sliceMetadata();
ByteBuf sliceData = payload.sliceData();
byte[] decrypted = decrypt(keyPair,
ByteBufUtil.getBytes(sliceMetadata, sliceMetadata.readerIndex(), sliceMetadata.readableBytes(), false),
ByteBufUtil.getBytes(sliceData, sliceData.readerIndex(), sliceData.readableBytes(), false));
ByteBufUtil.getBytes(sliceMetadata, sliceMetadata.readerIndex(), sliceMetadata.readableBytes(), false),
ByteBufUtil.getBytes(sliceData, sliceData.readerIndex(), sliceData.readableBytes(), false));

String uncompressed = Snappy.uncompressString(decrypted);
scrapePayload.record(uncompressed.length());
Expand All @@ -206,7 +215,7 @@ String receiveScrapePayload(Payload payload, Timer.Sample timing) {
private byte[] decrypt(KeyPair keyPair, byte[] encryptedKey, byte[] data) {
try {
PrivateKey privateKey = KeyFactory.getInstance("RSA")
.generatePrivate(new PKCS8EncodedKeySpec(keyPair.getPrivate().getEncoded()));
.generatePrivate(new PKCS8EncodedKeySpec(keyPair.getPrivate().getEncoded()));

Cipher cipher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
cipher.init(Cipher.PRIVATE_KEY, privateKey);
Expand Down
16 changes: 8 additions & 8 deletions scripts/grafana-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@
"steppedLine": true,
"targets": [
{
"expr": "rate(rsocket_request_response_seconds_count{signal_type=\"ON_COMPLETE\"}[1m])",
"expr": "sum(rate(rsocket_request_response_seconds_count{signal_type=\"ON_COMPLETE\"}[1m]))",
"legendFormat": "complete",
"refId": "B"
},
{
"expr": "rate(rsocket_request_response_seconds_count{signal_type=\"CANCEL\"}[1m])",
"expr": "sum(rate(rsocket_request_response_seconds_count{signal_type=\"CANCEL\"}[1m]))",
"legendFormat": "cancel",
"refId": "C"
},
{
"expr": "rate(rsocket_request_response_seconds_count{signal_type=\"ON_ERROR\"}[1m])",
"expr": "sum(rate(rsocket_request_response_seconds_count{signal_type=\"ON_ERROR\"}[1m]))",
"legendFormat": "error",
"refId": "A"
}
Expand Down Expand Up @@ -379,17 +379,17 @@
"steppedLine": true,
"targets": [
{
"expr": "rate(prometheus_proxy_scrape_seconds_count{outcome=\"success\"}[1m])",
"expr": "sum(rate(prometheus_proxy_scrape_seconds_count{outcome=\"success\"}[1m]))",
"legendFormat": "success",
"refId": "A"
},
{
"expr": "rate(prometheus_proxy_scrape_seconds_count{outcome=\"closed\"}[1m])",
"expr": "sum(rate(prometheus_proxy_scrape_seconds_count{outcome=\"closed\"}[1m]))",
"legendFormat": "closed",
"refId": "B"
},
{
"expr": "rate(prometheus_proxy_scrape_seconds_count{outcome=\"error\"}[1m])",
"expr": "sum(rate(prometheus_proxy_scrape_seconds_count{outcome=\"error\"}[1m]))",
"legendFormat": "error",
"refId": "C"
}
Expand Down Expand Up @@ -444,7 +444,7 @@
"list": []
},
"time": {
"from": "now-3h",
"from": "now-15m",
"to": "now"
},
"timepicker": {
Expand All @@ -464,5 +464,5 @@
"timezone": "",
"title": "Prometheus Proxy",
"uid": "5sancpjZk",
"version": 12
"version": 13
}

0 comments on commit af4a01a

Please sign in to comment.