Skip to content

Commit

Permalink
improved ApplicationSessionCtxEntity serialize/deserialize, fixed bac…
Browse files Browse the repository at this point in the history
…kward compatibility for serialize/deserialize of PublishMsg and DevicePublishMsg
  • Loading branch information
dmytro-landiak committed Dec 4, 2024
1 parent f9e38b4 commit 415463f
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.mqtt.broker.service.mqtt;

import com.fasterxml.jackson.annotation.JsonAlias;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttProperties;
Expand All @@ -33,6 +34,7 @@ public class PublishMsg {
private final String topicName;
private final byte[] payload;
private final ByteBuf byteBuf;
@JsonAlias("qosLevel") // Map old "qosLevel" field during deserialization. Can be removed in the future releases
private final int qos;
private final boolean isRetained;
private final boolean isDup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@Builder
@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class ApplicationMsgInfo {

private long offset;
private int packetId;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Collection;

@Data
@ToString
@Builder(toBuilder = true)
@EqualsAndHashCode
public class ApplicationSessionCtx {

private String clientId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.mqtt.broker.common.data;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.netty.handler.codec.mqtt.MqttProperties;
Expand All @@ -38,6 +39,7 @@
public class DevicePublishMsg {

private String clientId;
@JsonAlias("topic") // Map old "topic" field during deserialization. Can be removed in the future releases
private String topicName;
private Long time;
private Integer qos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
Expand Down Expand Up @@ -90,6 +91,14 @@ public static ObjectNode newObjectNode() {
return OBJECT_MAPPER.createObjectNode();
}

public static ArrayNode newArrayNode() {
return newArrayNode(OBJECT_MAPPER);
}

public static ArrayNode newArrayNode(ObjectMapper mapper) {
return mapper.createArrayNode();
}

public static <T> T clone(T value) {
@SuppressWarnings("unchecked")
Class<T> valueClass = (Class<T>) value.getClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@Component
@RequiredArgsConstructor
public class DefaultApplicationSessionCtxDao extends JpaAbstractDaoListeningExecutorService implements ApplicationSessionCtxDao {

private final ApplicationSessionCtxRepository applicationSessionCtxRepository;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public CompletionStage<List<DevicePublishMsg>> findPersistedMessages(String clie
throw new CompletionException(throwable);
}).thenApply(messages ->
messages.stream()
.map(messageStr -> JacksonUtil.fromBytes(messageStr, DevicePublishMsg.class))
.map(messageInBytes -> JacksonUtil.fromBytes(messageInBytes, DevicePublishMsg.class))
.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ApplicationSessionCtxEntity implements ToData<ApplicationSessionCtx
@Convert(converter = JsonConverter.class)
@Column(name = ModelConstants.APPLICATION_SESSION_CTX_PUBLISH_MSG_INFOS_PROPERTY)
private JsonNode publishMsgInfos;

@Convert(converter = JsonConverter.class)
@Column(name = ModelConstants.APPLICATION_SESSION_CTX_PUBREL_MSG_INFOS_PROPERTY)
private JsonNode pubRelMsgInfos;
Expand All @@ -58,8 +59,8 @@ public ApplicationSessionCtxEntity() {
public ApplicationSessionCtxEntity(ApplicationSessionCtx applicationSessionCtx) {
this.clientId = applicationSessionCtx.getClientId();
this.lastUpdatedTime = applicationSessionCtx.getLastUpdatedTime();
this.publishMsgInfos = JacksonUtil.toJsonNode(JacksonUtil.toString(applicationSessionCtx.getPublishMsgInfos()));
this.pubRelMsgInfos = JacksonUtil.toJsonNode(JacksonUtil.toString(applicationSessionCtx.getPubRelMsgInfos()));
this.publishMsgInfos = JacksonUtil.valueToTree(applicationSessionCtx.getPublishMsgInfos());
this.pubRelMsgInfos = JacksonUtil.valueToTree(applicationSessionCtx.getPubRelMsgInfos());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@

@RunWith(ClasspathSuite.class)
@ClassnameFilters({
"org.thingsboard.mqtt.broker.dao.service.*ServiceTest",
"org.thingsboard.mqtt.broker.dao.client.unauthorized.*Test",
"org.thingsboard.mqtt.broker.dao.data.*Test",
"org.thingsboard.mqtt.broker.dao.sqlts.sql.*Test",
"org.thingsboard.mqtt.broker.dao.util.protocol.*Test",
"org.thingsboard.mqtt.broker.dao.validation.*Test",
"org.thingsboard.mqtt.broker.dao.ws.*Test"
"org.thingsboard.mqtt.broker.dao.*Test"
})
public class DaoRedisClusterServiceTestSuite extends AbstractRedisClusterContainer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@

@RunWith(ClasspathSuite.class)
@ClassnameFilters({
"org.thingsboard.mqtt.broker.dao.service.*ServiceTest",
"org.thingsboard.mqtt.broker.dao.client.unauthorized.*Test",
"org.thingsboard.mqtt.broker.dao.data.*Test",
"org.thingsboard.mqtt.broker.dao.sqlts.sql.*Test",
"org.thingsboard.mqtt.broker.dao.util.protocol.*Test",
"org.thingsboard.mqtt.broker.dao.validation.*Test",
"org.thingsboard.mqtt.broker.dao.ws.*Test"
"org.thingsboard.mqtt.broker.dao.*Test"
})
public class DaoServiceTestSuite extends AbstractRedisContainer {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.dao.model;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Test;
import org.thingsboard.mqtt.broker.common.data.ApplicationMsgInfo;
import org.thingsboard.mqtt.broker.common.data.ApplicationSessionCtx;
import org.thingsboard.mqtt.broker.common.util.JacksonUtil;

import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class ApplicationSessionCtxEntityTest {

@Test
public void testConstructorAndToData() {
ObjectNode publish1 = JacksonUtil.newObjectNode();
publish1.put("offset", 2L);
publish1.put("packetId", 3);

ObjectNode publish2 = JacksonUtil.newObjectNode();
publish2.put("offset", 3L);
publish2.put("packetId", 4);

ArrayNode publishMsgs = JacksonUtil.newArrayNode();
publishMsgs.add(publish1);
publishMsgs.add(publish2);

ObjectNode pubrel1 = JacksonUtil.newObjectNode();
pubrel1.put("offset", 0L);
pubrel1.put("packetId", 1);

ObjectNode pubrel2 = JacksonUtil.newObjectNode();
pubrel2.put("offset", 1L);
pubrel2.put("packetId", 2);

ArrayNode pubrelMsgs = JacksonUtil.newArrayNode();
pubrelMsgs.add(pubrel1);
pubrelMsgs.add(pubrel2);

List<ApplicationMsgInfo> publishMsgInfos = Arrays.asList(
new ApplicationMsgInfo(2L, 3),
new ApplicationMsgInfo(3L, 4)
);
List<ApplicationMsgInfo> pubRelMsgInfos = Arrays.asList(
new ApplicationMsgInfo(0L, 1),
new ApplicationMsgInfo(1L, 2)
);
long ts = System.currentTimeMillis();
String clientId = "client";

ApplicationSessionCtx sessionCtx = ApplicationSessionCtx.builder()
.clientId(clientId)
.lastUpdatedTime(ts)
.publishMsgInfos(publishMsgInfos)
.pubRelMsgInfos(pubRelMsgInfos)
.build();

ApplicationSessionCtxEntity entity = new ApplicationSessionCtxEntity(sessionCtx);
ApplicationSessionCtx reconstructedCtx = entity.toData();

assertThat(entity.getClientId()).isEqualTo(clientId);
assertThat(entity.getLastUpdatedTime()).isEqualTo(ts);
assertThat(entity.getPublishMsgInfos().toPrettyString()).isEqualTo(publishMsgs.toPrettyString());
assertThat(entity.getPubRelMsgInfos().toPrettyString()).isEqualTo(pubrelMsgs.toPrettyString());

assertThat(reconstructedCtx.getClientId()).isEqualTo(clientId);
assertThat(reconstructedCtx.getLastUpdatedTime()).isEqualTo(ts);
assertThat(reconstructedCtx.getPublishMsgInfos()).isEqualTo(publishMsgInfos);
assertThat(reconstructedCtx.getPubRelMsgInfos()).isEqualTo(pubRelMsgInfos);
}
}

0 comments on commit 415463f

Please sign in to comment.