Skip to content

Commit

Permalink
[TH2-4912] migrate all requests to group requests (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Aug 14, 2023
1 parent 8fce744 commit ab665ea
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 89 deletions.
16 changes: 12 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.tasks.KaptGenerateStubs
import com.github.jk1.license.filter.LicenseBundleNormalizer
import com.github.jk1.license.render.JsonReportRenderer

Expand Down Expand Up @@ -94,15 +96,21 @@ docker {
copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
}

tasks.withType(KaptGenerateStubs).configureEach {
compilerOptions {
jvmTarget.set(JvmTarget.JVM_11)
}
}

compileKotlin {
kotlinOptions {
jvmTarget = '11'
compilerOptions {
jvmTarget.set(JvmTarget.JVM_11)
}
}

compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
compilerOptions {
jvmTarget.set(JvmTarget.JVM_11)
}
}

Expand Down
24 changes: 10 additions & 14 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public FixHandler(IHandlerContext context) {
public void onStart() {
channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE);
if(settings.isLoadSequencesFromCradle()) {
SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias());
SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionGroup(), channel.getSessionAlias());
LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq());
msgSeqNum.set(sequences.getClientSeq());
serverMsgSeqNum.set(sequences.getServerSeq());
Expand Down Expand Up @@ -267,14 +267,12 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
}
}

CompletableFuture<MessageID> result = CompletableFuture.completedFuture(null);
try {
recoveryLock.lock();
result = channel.send(body, properties, eventID, SendMode.HANDLE_AND_MANGLE);
return channel.send(body, properties, eventID, SendMode.HANDLE_AND_MANGLE);
} finally {
recoveryLock.unlock();
}
return result;
}

@NotNull
Expand All @@ -291,7 +289,7 @@ public CompletableFuture<MessageID> send(@NotNull com.exactpro.th2.common.schema
}

@Override
public ByteBuf onReceive(IChannel channel, ByteBuf buffer) {
public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) {
int offset = buffer.readerIndex();
if (offset == buffer.writerIndex()) return null;

Expand Down Expand Up @@ -355,7 +353,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
FixField possDup = findField(message, POSS_DUP_TAG);
boolean isDup = false;
if(possDup != null) {
isDup = possDup.getValue().equals(IS_POSS_DUP);
isDup = Objects.equals(possDup.getValue(), IS_POSS_DUP);
}

String msgTypeValue = requireNonNull(msgType.getValue());
Expand Down Expand Up @@ -487,7 +485,7 @@ private void resetSequence(ByteBuf message) {
FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG);

if(seqNumValue != null) {
if(gapFillMode == null || gapFillMode.getValue().equals("N")) {
if(gapFillMode == null || Objects.equals(gapFillMode.getValue(), "N")) {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())));
} else {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1);
Expand Down Expand Up @@ -567,7 +565,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
|| msgTypeField == null || msgTypeField.getValue() == null) {
return true;
}
Integer sequence = Integer.parseInt(seqNum.getValue());
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence < beginSeqNo) return true;
Expand All @@ -578,9 +576,8 @@ private void recovery(int beginSeqNo, int endSeqNo) {
if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true;

if(sequence - 1 != lastProcessedSequence.get() ) {
int newSeqNo = sequence;
StringBuilder sequenceReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), newSeqNo);
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE);
resetHeartbeatTask();
}
Expand All @@ -598,9 +595,8 @@ private void recovery(int beginSeqNo, int endSeqNo) {
};

messageLoader.processMessagesInRange(
Direction.SECOND,
channel.getSessionAlias(),
beginSeqNo,
channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND,
beginSeqNo,
processMessage
);

Expand Down Expand Up @@ -716,7 +712,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField checksum = findLastField(message, CHECKSUM_TAG);

if (checksum == null) {
checksum = lastField(message).insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
lastField(message).insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
}

FixField msgSeqNum = findField(message, MSG_SEQ_NUM_TAG, US_ASCII, bodyLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerContext
import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerFactory
import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerSettings
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.kotlin.KotlinFeature
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.readValue
import com.google.auto.service.AutoService
Expand All @@ -38,7 +39,16 @@ import mu.KotlinLogging
private val LOGGER = KotlinLogging.logger {}

private val MAPPER = JsonMapper.builder()
.addModule(KotlinModule(nullIsSameAsDefault = true))
.addModule(
KotlinModule.Builder()
.withReflectionCacheSize(512)
.configure(KotlinFeature.NullToEmptyCollection, false)
.configure(KotlinFeature.NullToEmptyMap, false)
.configure(KotlinFeature.NullIsSameAsDefault, enabled = true)
.configure(KotlinFeature.SingletonSupport, false)
.configure(KotlinFeature.StrictNullChecks, false)
.build()
)
.build()

private const val RULE_NAME_PROPERTY = "rule-name"
Expand All @@ -63,7 +73,7 @@ class FixProtocolMangler(context: IManglerContext) : IMangler {
}
}

val (name, results, message) = MessageTransformer.transform(message, rule, unconditionally) ?: return null
val (name, results, byteBuf) = MessageTransformer.transform(message, rule, unconditionally) ?: return null

return Event.start().apply {
type(MANGLE_EVENT_TYPE)
Expand All @@ -84,7 +94,7 @@ class FixProtocolMangler(context: IManglerContext) : IMangler {
}

bodyData(createMessageBean("Original message:"))
bodyData(createMessageBean(ByteBufUtil.prettyHexDump(message)))
bodyData(createMessageBean(ByteBufUtil.prettyHexDump(byteBuf)))

TableBuilder<ActionRow>().run {
results.forEach { result ->
Expand Down
104 changes: 56 additions & 48 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG
import com.exactpro.th2.constants.Constants.POSS_DUP_TAG
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse
import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest
import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse
import com.exactpro.th2.dataprovider.lw.grpc.MessageStream
import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation
Expand Down Expand Up @@ -103,56 +103,42 @@ class MessageLoader(
}
}

fun loadInitialSequences(sessionAlias: String): SequenceHolder = searchLock.withLock {
val serverSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.FIRST,
sessionAlias
)
)
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
val clientSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.SECOND,
sessionAlias
)
),
true
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
fun loadInitialSequences(sessionGroup: String, sessionAlias: String): SequenceHolder = searchLock.withLock {
val serverSeq = searchMessage(sessionGroup, sessionAlias, Direction.FIRST, false)
val clientSeq = searchMessage(sessionGroup, sessionAlias, Direction.SECOND, true)
K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" }
return SequenceHolder(clientSeq, serverSeq)
}

fun processMessagesInRange(
direction: Direction,
sessionGroup: String,
sessionAlias: String,
direction: Direction,
fromSequence: Long,
processMessage: (ByteBuf) -> Boolean
) = searchLock.withLock {
processMessagesInRangeInternal(direction, sessionAlias, fromSequence, processMessage)
processMessagesInRangeInternal(sessionGroup, sessionAlias, direction, fromSequence, processMessage)
}

fun processMessagesInRangeInternal(
direction: Direction,
private fun processMessagesInRangeInternal(
sessionGroup: String,
sessionAlias: String,
direction: Direction,
fromSequence: Long,
processMessage: (ByteBuf) -> Boolean
) {
var timestamp: Timestamp? = null
ProviderCall.withCancellation {
val backwardIterator = dataProvider.searchMessages(
createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias)
val backwardIterator = dataProvider.searchMessageGroups(
createSearchGroupRequest(
from = Instant.now().toTimestamp(),
sessionGroup = sessionGroup,
sessionAlias = sessionAlias,
direction = direction
)
)

var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation
val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

Expand Down Expand Up @@ -198,13 +184,14 @@ class MessageLoader(

ProviderCall.withCancellation {

val iterator = dataProvider.searchMessages(
createSearchRequest(
startSearchTimestamp,
direction,
sessionAlias,
TimeRelation.NEXT,
Instant.now().toTimestamp()
val iterator = dataProvider.searchMessageGroups(
createSearchGroupRequest(
from = startSearchTimestamp,
to = Instant.now().toTimestamp(),
sessionGroup = sessionGroup,
sessionAlias = sessionAlias,
direction = direction,
timeRelation = TimeRelation.NEXT,
)
)

Expand All @@ -215,6 +202,25 @@ class MessageLoader(
}
}

private fun searchMessage(
sessionGroup: String,
sessionAlias: String,
direction: Direction,
checkPossFlag: Boolean
) = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessageGroups(
createSearchGroupRequest(
from = Instant.now().toTimestamp(),
sessionGroup = sessionGroup,
sessionAlias = sessionAlias,
direction = direction
)
),
checkPossFlag
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}

private fun <T> searchMessage(
iterator: Iterator<MessageSearchResponse>,
checkPossFlag: Boolean = false,
Expand Down Expand Up @@ -245,23 +251,25 @@ class MessageLoader(
MessageDetails(seqNum.toInt(), message.messageId.sequence, message.messageId.timestamp)
}

private fun createSearchRequest(
timestamp: Timestamp,
direction: Direction,
private fun createSearchGroupRequest(
from: Timestamp,
to: Timestamp = previousDaySessionStart,
sessionGroup: String,
sessionAlias: String,
searchDirection: TimeRelation = TimeRelation.PREVIOUS,
endTimestamp: Timestamp = previousDaySessionStart
) = MessageSearchRequest.newBuilder().apply {
startTimestamp = timestamp
this.endTimestamp = endTimestamp
direction: Direction,
timeRelation: TimeRelation = TimeRelation.PREVIOUS,
) = MessageGroupsSearchRequest.newBuilder().apply {
startTimestamp = from
endTimestamp = to
addResponseFormats(BASE64_FORMAT)
addStream(
MessageStream.newBuilder()
.setName(sessionAlias)
.setDirection(direction)
)
addMessageGroup(MessageGroupsSearchRequest.Group.newBuilder().setName(sessionGroup))
bookIdBuilder.name = bookName
this.searchDirection = searchDirection
searchDirection = timeRelation
}.build()

private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP
Expand Down
Loading

0 comments on commit ab665ea

Please sign in to comment.