diff --git a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/SamplingResourceImpl.java b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/SamplingResourceImpl.java index 07628da3f..3801843d7 100644 --- a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/SamplingResourceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/SamplingResourceImpl.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.List; @Singleton @@ -46,6 +47,10 @@ public SamplingResourceImpl(final @NotNull SamplingService samplingService) { final List samples = samplingService.getSamples(topic); final ArrayList sampleArrayList = new ArrayList<>(); + + // we want LIFO, but the queue return FIFO + Collections.reverse(samples); + samples.forEach(sample -> sampleArrayList.add(new PayloadSample(Base64.getEncoder().encodeToString(sample)))); return Response.ok().entity(new PayloadSampleList(sampleArrayList)).build(); } @@ -53,7 +58,6 @@ public SamplingResourceImpl(final @NotNull SamplingService samplingService) { @Override public @NotNull Response startSamplingForTopic(@NotNull final String topicBase64) { final String topic = new String(Base64.getDecoder().decode(topicBase64), StandardCharsets.UTF_8); - System.err.println(topicBase64 + "->" + topic); samplingService.startSampling(topic); return Response.ok().build(); } diff --git a/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishDistributorImpl.java b/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishDistributorImpl.java index 534254aea..72db1db94 100644 --- a/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishDistributorImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/mqtt/services/PublishDistributorImpl.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService; import static com.hivemq.mqtt.handler.publish.PublishStatus.*; +import static com.hivemq.sampling.SamplingService.SAMPLER_PREFIX; /** * @author Christoph Schäbel @@ -185,6 +186,8 @@ private ListenableFuture handlePublish( appliedQoS = 0; } } + }else if(client.startsWith(SAMPLER_PREFIX)){ + appliedQueueLimit = 10L; } return queuePublish(client, diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java index 3d3419ca7..7bb165916 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/clientqueue/ClientQueuePersistenceImpl.java @@ -53,6 +53,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.hivemq.persistence.clientsession.SharedSubscriptionServiceImpl.SharedSubscription; +import static com.hivemq.sampling.SamplingService.SAMPLER_PREFIX; @Singleton public class ClientQueuePersistenceImpl extends AbstractPersistence implements ClientQueuePersistence { @@ -108,12 +109,20 @@ public ListenableFuture add( return Futures.immediateFailedFuture(exception); } + return singleWriter.submit(queueId, (bucketIndex) -> { + MqttConfigurationService.QueuedMessagesStrategy queuedMessagesStrategy = + mqttConfigurationService.getQueuedMessagesStrategy(); + // TODO thats super ugly. + if(queueId.startsWith(SAMPLER_PREFIX)){ + queuedMessagesStrategy = MqttConfigurationService.QueuedMessagesStrategy.DISCARD_OLDEST; + } + localPersistence.add(queueId, shared, publish, queueLimit, - mqttConfigurationService.getQueuedMessagesStrategy(), + queuedMessagesStrategy, retained, bucketIndex); final int queueSize = localPersistence.size(queueId, shared, bucketIndex); @@ -145,11 +154,17 @@ public ListenableFuture add( return singleWriter.submit(queueId, (bucketIndex) -> { final boolean queueWasEmpty = localPersistence.size(queueId, shared, bucketIndex) == 0; + MqttConfigurationService.QueuedMessagesStrategy queuedMessagesStrategy = + mqttConfigurationService.getQueuedMessagesStrategy(); + // TODO thats super ugly. + if(queueId.startsWith(SAMPLER_PREFIX)){ + queuedMessagesStrategy = MqttConfigurationService.QueuedMessagesStrategy.DISCARD_OLDEST; + } + localPersistence.add(queueId, shared, publishes, - queueLimit, - mqttConfigurationService.getQueuedMessagesStrategy(), + queueLimit, queuedMessagesStrategy, retained, bucketIndex); if (queueWasEmpty) { diff --git a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index cebd77e9e..fa5891987 100644 --- a/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/hivemq-edge/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -381,7 +381,6 @@ private void addQos0Publish( // check for expiration, but do not modify the queue if (!publishWithRetained.isExpired()) { - messageCount++; bytes += publishWithRetained.getEstimatedSizeInMemory(); // check if adding the message would exceed the byte limit if ((bytes > bytesLimit)) { diff --git a/hivemq-edge/src/main/java/com/hivemq/sampling/SamplingService.java b/hivemq-edge/src/main/java/com/hivemq/sampling/SamplingService.java index fdfd92ce9..27ddf5412 100644 --- a/hivemq-edge/src/main/java/com/hivemq/sampling/SamplingService.java +++ b/hivemq-edge/src/main/java/com/hivemq/sampling/SamplingService.java @@ -36,7 +36,7 @@ public class SamplingService { private final @NotNull LocalTopicTree localTopicTree; private final @NotNull ClientQueuePersistence clientQueuePersistence; - private static final @NotNull String SAMPLER_PREFIX = "Sampler#"; + public static final @NotNull String SAMPLER_PREFIX = "Sampler#"; @Inject @@ -62,7 +62,7 @@ public void stopSampling(final @NotNull String topic) { public @NotNull List getSamples(final @NotNull String topic) { final String clientId = SAMPLER_PREFIX + topic; final String queueId = clientId + "/" + topic; - final ListenableFuture> publishes = clientQueuePersistence.peek(queueId, true, 10_000, 10); + final ListenableFuture> publishes = clientQueuePersistence.peek(queueId, true, 100_000, 10); try { return publishes.get().stream().map(PUBLISH::getPayload).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) {