Skip to content

Commit

Permalink
DBZ-8265 fix journal processing loop when we paginate to last receive…
Browse files Browse the repository at this point in the history
…r before receiver offset reset
  • Loading branch information
msillence committed Sep 23, 2024
1 parent 437be80 commit b272abd
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,14 @@ static class RangeFinder {

public Optional<PositionRange> next(DetailedJournalReceiver nextReceiver) {
if (found) {
// if the next journal has wrapped use just go to the end
// if the next journal has wrapped use just go to the end of the previous one
if (lastReceiver != null && nextReceiver.start().compareTo(lastReceiver.end()) < 0) {
// we're at the end and we've processed it move start on to next receiver
if (startEqualsEndAndProcessed(startPosition, lastReceiver)) {
startPosition.setPosition(new JournalPosition(nextReceiver.start(), nextReceiver.info().receiver()), false);
}
else {
// the only way we can get here is if we have already checked for pagination
// when we found the start so we should never need to paginate
// it is inexpensive and safer to check
final Optional<PositionRange> paginated = rangeWithinCurrentPosition(lastReceiver,
startPosition.getOffset());
if (paginated.isPresent()) {
return paginated;
}
return Optional.of(new PositionRange(false, startPosition,
new JournalPosition(lastReceiver.end(), lastReceiver.info().receiver())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public RetrieveJournal(RetrieveConfig config, JournalInfoRetrieval journalRetrie
public boolean retrieveJournal(JournalProcessedPosition previousPosition) throws Exception {

final PositionRange range = journalReceivers.findRange(config.as400().connection(), previousPosition);
log.debug("retrieve journal start {} end {}", range.start(), range.end());

return retrieveJournal(previousPosition, range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibm.as400.access.AS400;

Expand Down Expand Up @@ -503,4 +505,81 @@ void testStartEqualsEndProcessedResetReceiversPaginate() {
assertEquals(new JournalPosition(BigInteger.valueOf(5l), j2.info().receiver()), found.get().end());
assertFalse(found.get().startEqualsEnd());
}

private static final Logger log = LoggerFactory.getLogger(ReceiverPaginationTest.class);


@Test
void testStopBeforeJournalResetsPaginateOver() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(16), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end(), position.get().end().getOffset());

}

@Test
void testStopBeforeJournalResetsPaginateExact() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(15), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end(), position.get().end().getOffset());

}

@Test
void testStopOneBeforeJournalResetsPaginate() {
final ReceiverPagination jreceivers = new ReceiverPagination(journalInfoRetrieval, 20, journalInfo);
final DetailedJournalReceiver j1 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j1", "jlib"), new Date(1),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(10), Optional.of(new JournalReceiver("j2", "jlib")), 1, 1);
final DetailedJournalReceiver j2 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j2", "jlib"), new Date(2),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(11), BigInteger.valueOf(20), Optional.of(new JournalReceiver("j3", "jlib")), 1, 1);
final DetailedJournalReceiver j3 = new DetailedJournalReceiver(
new JournalReceiverInfo(new JournalReceiver("j3", "jlib"), new Date(4),
JournalStatus.OnlineSavedDetached, Optional.of(1)),
BigInteger.valueOf(1), BigInteger.valueOf(100), Optional.empty(), 1, 1);
final List<DetailedJournalReceiver> list = List.of(j1, j2, j3);
final JournalProcessedPosition start = new JournalProcessedPosition(BigInteger.valueOf(5),
j1.info().receiver(), Instant.ofEpochSecond(0), true);
final Optional<PositionRange> position = jreceivers.findPosition(start, BigInteger.valueOf(14), list, j1);
assertTrue(position.isPresent());
assertEquals("j2", position.get().end().getReceiver().name());
assertEquals(j2.end().subtract(BigInteger.ONE), position.get().end().getOffset());
}
}

0 comments on commit b272abd

Please sign in to comment.