From ab665ea6975f75b2eee1b9756eb1482d2193f60e Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Fri, 11 Aug 2023 08:49:06 +0000 Subject: [PATCH] [TH2-4912] migrate all requests to group requests (#58) --- build.gradle | 16 ++- .../java/com/exactpro/th2/FixHandler.java | 24 ++-- .../th2/conn/dirty/fix/FixProtocolMangler.kt | 16 ++- .../th2/conn/dirty/fix/MessageLoader.kt | 104 ++++++++++-------- .../java/com/exactpro/th2/RecoveryTest.java | 32 +++--- .../th2/conn/dirty/fix/MessageSearcher.kt | 7 +- 6 files changed, 110 insertions(+), 89 deletions(-) diff --git a/build.gradle b/build.gradle index dc83f6d..c19ed9f 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,5 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.tasks.KaptGenerateStubs import com.github.jk1.license.filter.LicenseBundleNormalizer import com.github.jk1.license.render.JsonReportRenderer @@ -94,15 +96,21 @@ docker { copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) } +tasks.withType(KaptGenerateStubs).configureEach { + compilerOptions { + jvmTarget.set(JvmTarget.JVM_11) + } +} + compileKotlin { - kotlinOptions { - jvmTarget = '11' + compilerOptions { + jvmTarget.set(JvmTarget.JVM_11) } } compileTestKotlin { - kotlinOptions { - jvmTarget = '11' + compilerOptions { + jvmTarget.set(JvmTarget.JVM_11) } } diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 4a4e37c..d1131c4 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -236,7 +236,7 @@ public FixHandler(IHandlerContext context) { public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { - SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias()); + SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionGroup(), channel.getSessionAlias()); LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); @@ -267,14 +267,12 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map result = CompletableFuture.completedFuture(null); try { recoveryLock.lock(); - result = channel.send(body, properties, eventID, SendMode.HANDLE_AND_MANGLE); + return channel.send(body, properties, eventID, SendMode.HANDLE_AND_MANGLE); } finally { recoveryLock.unlock(); } - return result; } @NotNull @@ -291,7 +289,7 @@ public CompletableFuture send(@NotNull com.exactpro.th2.common.schema } @Override - public ByteBuf onReceive(IChannel channel, ByteBuf buffer) { + public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) { int offset = buffer.readerIndex(); if (offset == buffer.writerIndex()) return null; @@ -355,7 +353,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField possDup = findField(message, POSS_DUP_TAG); boolean isDup = false; if(possDup != null) { - isDup = possDup.getValue().equals(IS_POSS_DUP); + isDup = Objects.equals(possDup.getValue(), IS_POSS_DUP); } String msgTypeValue = requireNonNull(msgType.getValue()); @@ -487,7 +485,7 @@ private void resetSequence(ByteBuf message) { FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG); if(seqNumValue != null) { - if(gapFillMode == null || gapFillMode.getValue().equals("N")) { + if(gapFillMode == null || Objects.equals(gapFillMode.getValue(), "N")) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); } else { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); @@ -567,7 +565,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { || msgTypeField == null || msgTypeField.getValue() == null) { return true; } - Integer sequence = Integer.parseInt(seqNum.getValue()); + int sequence = Integer.parseInt(seqNum.getValue()); String msgType = msgTypeField.getValue(); if(sequence < beginSeqNo) return true; @@ -578,9 +576,8 @@ private void recovery(int beginSeqNo, int endSeqNo) { if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; if(sequence - 1 != lastProcessedSequence.get() ) { - int newSeqNo = sequence; StringBuilder sequenceReset = - createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), newSeqNo); + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); resetHeartbeatTask(); } @@ -598,9 +595,8 @@ private void recovery(int beginSeqNo, int endSeqNo) { }; messageLoader.processMessagesInRange( - Direction.SECOND, - channel.getSessionAlias(), - beginSeqNo, + channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND, + beginSeqNo, processMessage ); @@ -716,7 +712,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map().run { results.forEach { result -> diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt index 2ff5fb5..9f7261a 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -24,7 +24,7 @@ import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG import com.exactpro.th2.constants.Constants.POSS_DUP_TAG import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse -import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse import com.exactpro.th2.dataprovider.lw.grpc.MessageStream import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation @@ -103,56 +103,42 @@ class MessageLoader( } } - fun loadInitialSequences(sessionAlias: String): SequenceHolder = searchLock.withLock { - val serverSeq = ProviderCall.withCancellation { - searchMessage( - dataProvider.searchMessages( - createSearchRequest( - Instant.now().toTimestamp(), - Direction.FIRST, - sessionAlias - ) - ) - ) { _, seqNum -> seqNum?.toInt() ?: 0 } - } - val clientSeq = ProviderCall.withCancellation { - searchMessage( - dataProvider.searchMessages( - createSearchRequest( - Instant.now().toTimestamp(), - Direction.SECOND, - sessionAlias - ) - ), - true - ) { _, seqNum -> seqNum?.toInt() ?: 0 } - } + fun loadInitialSequences(sessionGroup: String, sessionAlias: String): SequenceHolder = searchLock.withLock { + val serverSeq = searchMessage(sessionGroup, sessionAlias, Direction.FIRST, false) + val clientSeq = searchMessage(sessionGroup, sessionAlias, Direction.SECOND, true) K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" } return SequenceHolder(clientSeq, serverSeq) } fun processMessagesInRange( - direction: Direction, + sessionGroup: String, sessionAlias: String, + direction: Direction, fromSequence: Long, processMessage: (ByteBuf) -> Boolean ) = searchLock.withLock { - processMessagesInRangeInternal(direction, sessionAlias, fromSequence, processMessage) + processMessagesInRangeInternal(sessionGroup, sessionAlias, direction, fromSequence, processMessage) } - fun processMessagesInRangeInternal( - direction: Direction, + private fun processMessagesInRangeInternal( + sessionGroup: String, sessionAlias: String, + direction: Direction, fromSequence: Long, processMessage: (ByteBuf) -> Boolean ) { var timestamp: Timestamp? = null ProviderCall.withCancellation { - val backwardIterator = dataProvider.searchMessages( - createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias) + val backwardIterator = dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction + ) ) - var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation var messagesToSkip = firstValidMessage.payloadSequence - fromSequence @@ -198,13 +184,14 @@ class MessageLoader( ProviderCall.withCancellation { - val iterator = dataProvider.searchMessages( - createSearchRequest( - startSearchTimestamp, - direction, - sessionAlias, - TimeRelation.NEXT, - Instant.now().toTimestamp() + val iterator = dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = startSearchTimestamp, + to = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction, + timeRelation = TimeRelation.NEXT, ) ) @@ -215,6 +202,25 @@ class MessageLoader( } } + private fun searchMessage( + sessionGroup: String, + sessionAlias: String, + direction: Direction, + checkPossFlag: Boolean + ) = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction + ) + ), + checkPossFlag + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + private fun searchMessage( iterator: Iterator, checkPossFlag: Boolean = false, @@ -245,23 +251,25 @@ class MessageLoader( MessageDetails(seqNum.toInt(), message.messageId.sequence, message.messageId.timestamp) } - private fun createSearchRequest( - timestamp: Timestamp, - direction: Direction, + private fun createSearchGroupRequest( + from: Timestamp, + to: Timestamp = previousDaySessionStart, + sessionGroup: String, sessionAlias: String, - searchDirection: TimeRelation = TimeRelation.PREVIOUS, - endTimestamp: Timestamp = previousDaySessionStart - ) = MessageSearchRequest.newBuilder().apply { - startTimestamp = timestamp - this.endTimestamp = endTimestamp + direction: Direction, + timeRelation: TimeRelation = TimeRelation.PREVIOUS, + ) = MessageGroupsSearchRequest.newBuilder().apply { + startTimestamp = from + endTimestamp = to addResponseFormats(BASE64_FORMAT) addStream( MessageStream.newBuilder() .setName(sessionAlias) .setDirection(direction) ) + addMessageGroup(MessageGroupsSearchRequest.Group.newBuilder().setName(sessionGroup)) bookIdBuilder.name = bookName - this.searchDirection = searchDirection + searchDirection = timeRelation }.build() private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java index f0618dd..8372737 100644 --- a/src/test/java/com/exactpro/th2/RecoveryTest.java +++ b/src/test/java/com/exactpro/th2/RecoveryTest.java @@ -19,17 +19,18 @@ import com.exactpro.th2.conn.dirty.fix.MessageSearcher; import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse; -import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest; +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest; import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse; import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import static com.exactpro.th2.FixHandlerTest.createHandlerSettings; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; @@ -40,6 +41,7 @@ import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; +@SuppressWarnings("DataFlowIssue") public class RecoveryTest { private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); @@ -59,8 +61,8 @@ void testSequenceResetInRange() { messageSearchResponse(5) ) ); - Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageGroupsSearchRequest.class)) ); channel = new Channel(settings, dataProviderService); fixHandler = channel.getFixHandler(); @@ -90,8 +92,8 @@ void testSequenceResetInsideRange() { messageSearchResponse(5) ) ); - Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageGroupsSearchRequest.class)) ); channel = new Channel(settings, dataProviderService); fixHandler = channel.getFixHandler(); @@ -150,8 +152,8 @@ void testSequenceResetOutOfRange() { messageSearchResponse(6) ) ); - Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageGroupsSearchRequest.class)) ); channel = new Channel(settings, dataProviderService); fixHandler = channel.getFixHandler(); @@ -182,8 +184,8 @@ void testSequenceResetAdminMessages() { messageSearchResponseAdmin(6) ) ); - Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageGroupsSearchRequest.class)) ); channel = new Channel(settings, dataProviderService); fixHandler = channel.getFixHandler(); @@ -225,9 +227,7 @@ void allMessagesMissed() { FixHandlerSettings settings = createHandlerSettings(); settings.setLoadMissedMessagesFromCradle(true); DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenReturn( - new ArrayList().iterator() - ); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenReturn(Collections.emptyIterator()); channel = new Channel(settings, dataProviderService); fixHandler = channel.getFixHandler(); fixHandler.onOpen(channel); @@ -272,7 +272,7 @@ private String message(Integer sequence) { } private String messageWithoutSeqNum() { - return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"); + return "8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"; } private String adminMessage(Integer sequence) { diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt index 8d7553b..44210da 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt @@ -15,19 +15,18 @@ */ package com.exactpro.th2.conn.dirty.fix -import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation import com.google.protobuf.util.Timestamps class MessageSearcher(private val messages: List) { - fun searchMessages(request: MessageSearchRequest): Iterator { - val limit = request.resultCountLimit.value + fun searchMessages(request: MessageGroupsSearchRequest): Iterator { val startTimestamp = request.startTimestamp val searchDirection = request.searchDirection - var filteredMessages = if (searchDirection == TimeRelation.NEXT) { + val filteredMessages = if (searchDirection == TimeRelation.NEXT) { messages.filter { Timestamps.compare(it.message.messageId.timestamp, startTimestamp) >= 0 } } else {