Skip to content

Commit

Permalink
started to add API for sampling payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
DC2-DanielKrueger committed Oct 10, 2024
1 parent c46495a commit f0f9650
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;

@Singleton
Expand All @@ -46,14 +47,17 @@ public SamplingResourceImpl(final @NotNull SamplingService samplingService) {

final List<byte[]> samples = samplingService.getSamples(topic);
final ArrayList<PayloadSample> sampleArrayList = new ArrayList<>();

// we want LIFO, but the queue return FIFO
Collections.reverse(samples);

samples.forEach(sample -> sampleArrayList.add(new PayloadSample(Base64.getEncoder().encodeToString(sample))));
return Response.ok().entity(new PayloadSampleList(sampleArrayList)).build();
}

@Override
public @NotNull Response startSamplingForTopic(@NotNull final String topicBase64) {
final String topic = new String(Base64.getDecoder().decode(topicBase64), StandardCharsets.UTF_8);
System.err.println(topicBase64 + "->" + topic);
samplingService.startSampling(topic);
return Response.ok().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ExecutorService;

import static com.hivemq.mqtt.handler.publish.PublishStatus.*;
import static com.hivemq.sampling.SamplingService.SAMPLER_PREFIX;

/**
* @author Christoph Schäbel
Expand Down Expand Up @@ -185,6 +186,8 @@ private ListenableFuture<PublishStatus> handlePublish(
appliedQoS = 0;
}
}
}else if(client.startsWith(SAMPLER_PREFIX)){
appliedQueueLimit = 10L;
}

return queuePublish(client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.hivemq.persistence.clientsession.SharedSubscriptionServiceImpl.SharedSubscription;
import static com.hivemq.sampling.SamplingService.SAMPLER_PREFIX;

@Singleton
public class ClientQueuePersistenceImpl extends AbstractPersistence implements ClientQueuePersistence {
Expand Down Expand Up @@ -108,12 +109,20 @@ public ListenableFuture<Void> add(
return Futures.immediateFailedFuture(exception);
}


return singleWriter.submit(queueId, (bucketIndex) -> {
MqttConfigurationService.QueuedMessagesStrategy queuedMessagesStrategy =
mqttConfigurationService.getQueuedMessagesStrategy();
// TODO thats super ugly.
if(queueId.startsWith(SAMPLER_PREFIX)){
queuedMessagesStrategy = MqttConfigurationService.QueuedMessagesStrategy.DISCARD_OLDEST;
}

localPersistence.add(queueId,
shared,
publish,
queueLimit,
mqttConfigurationService.getQueuedMessagesStrategy(),
queuedMessagesStrategy,
retained,
bucketIndex);
final int queueSize = localPersistence.size(queueId, shared, bucketIndex);
Expand Down Expand Up @@ -145,11 +154,17 @@ public ListenableFuture<Void> add(

return singleWriter.submit(queueId, (bucketIndex) -> {
final boolean queueWasEmpty = localPersistence.size(queueId, shared, bucketIndex) == 0;
MqttConfigurationService.QueuedMessagesStrategy queuedMessagesStrategy =
mqttConfigurationService.getQueuedMessagesStrategy();
// TODO thats super ugly.
if(queueId.startsWith(SAMPLER_PREFIX)){
queuedMessagesStrategy = MqttConfigurationService.QueuedMessagesStrategy.DISCARD_OLDEST;
}

localPersistence.add(queueId,
shared,
publishes,
queueLimit,
mqttConfigurationService.getQueuedMessagesStrategy(),
queueLimit, queuedMessagesStrategy,
retained,
bucketIndex);
if (queueWasEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ private void addQos0Publish(

// check for expiration, but do not modify the queue
if (!publishWithRetained.isExpired()) {
messageCount++;
bytes += publishWithRetained.getEstimatedSizeInMemory();
// check if adding the message would exceed the byte limit
if ((bytes > bytesLimit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SamplingService {

private final @NotNull LocalTopicTree localTopicTree;
private final @NotNull ClientQueuePersistence clientQueuePersistence;
private static final @NotNull String SAMPLER_PREFIX = "Sampler#";
public static final @NotNull String SAMPLER_PREFIX = "Sampler#";


@Inject
Expand All @@ -62,7 +62,7 @@ public void stopSampling(final @NotNull String topic) {
public @NotNull List<byte[]> getSamples(final @NotNull String topic) {
final String clientId = SAMPLER_PREFIX + topic;
final String queueId = clientId + "/" + topic;
final ListenableFuture<ImmutableList<PUBLISH>> publishes = clientQueuePersistence.peek(queueId, true, 10_000, 10);
final ListenableFuture<ImmutableList<PUBLISH>> publishes = clientQueuePersistence.peek(queueId, true, 100_000, 10);
try {
return publishes.get().stream().map(PUBLISH::getPayload).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
Expand Down

0 comments on commit f0f9650

Please sign in to comment.