diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala deleted file mode 100644 index a06adfb2e531..000000000000 --- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala +++ /dev/null @@ -1,279 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import java.io.File -import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.storage.internals.log.{LogSegment, LogSegments} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.mockito.Mockito.{mock, when} - -import java.util.Arrays.asList -import java.util.Optional -import java.util.OptionalLong -import scala.jdk.CollectionConverters._ - -class LogSegmentsTest { - - val topicPartition = new TopicPartition("topic", 0) - var logDir: File = _ - - /* create a segment with the given base offset */ - private def createSegment(offset: Long, - indexIntervalBytes: Int = 10, - time: Time = Time.SYSTEM): LogSegment = { - LogTestUtils.createSegment(offset, logDir, indexIntervalBytes, time) - } - - @BeforeEach - def setup(): Unit = { - logDir = TestUtils.tempDir() - } - - @AfterEach - def teardown(): Unit = { - Utils.delete(logDir) - } - - private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[java.lang.Long, LogSegment]): Unit = { - assertEquals(segment.baseOffset, tested.getKey) - assertEquals(segment, tested.getValue) - } - - @Test - def testBasicOperations(): Unit = { - val segments = new LogSegments(topicPartition) - assertTrue(segments.isEmpty) - assertFalse(segments.nonEmpty) - - val offset1 = 40 - val seg1 = createSegment(offset1) - val offset2 = 80 - val seg2 = createSegment(offset2) - val seg3 = createSegment(offset1) - - // Add seg1 - segments.add(seg1) - assertFalse(segments.isEmpty) - assertTrue(segments.nonEmpty) - assertEquals(1, segments.numberOfSegments) - assertTrue(segments.contains(offset1)) - assertEquals(Optional.of(seg1), segments.get(offset1)) - - // Add seg2 - segments.add(seg2) - assertFalse(segments.isEmpty) - assertTrue(segments.nonEmpty) - assertEquals(2, segments.numberOfSegments) - assertTrue(segments.contains(offset2)) - assertEquals(Optional.of(seg2), segments.get(offset2)) - - // Replace seg1 with seg3 - segments.add(seg3) - assertFalse(segments.isEmpty) - assertTrue(segments.nonEmpty) - assertEquals(2, segments.numberOfSegments) - assertTrue(segments.contains(offset1)) - assertEquals(Optional.of(seg3), segments.get(offset1)) - - // Remove seg2 - segments.remove(offset2) - assertFalse(segments.isEmpty) - assertTrue(segments.nonEmpty) - assertEquals(1, segments.numberOfSegments) - assertFalse(segments.contains(offset2)) - - // Clear all segments including seg3 - segments.clear() - assertTrue(segments.isEmpty) - assertFalse(segments.nonEmpty) - assertEquals(0, segments.numberOfSegments) - assertFalse(segments.contains(offset1)) - - // since we do segments.clear() before, we have to close segments one by one - seg1.close() - seg2.close() - seg3.close() - } - - @Test - def testSegmentAccess(): Unit = { - val segments = new LogSegments(topicPartition) - - val offset1 = 1 - val seg1 = createSegment(offset1) - val offset2 = 2 - val seg2 = createSegment(offset2) - val offset3 = 3 - val seg3 = createSegment(offset3) - val offset4 = 4 - val seg4 = createSegment(offset4) - - // Test firstEntry, lastEntry - List(seg1, seg2, seg3, seg4).foreach { seg => - segments.add(seg) - assertEntry(seg1, segments.firstEntry.get) - assertEquals(Optional.of(seg1), segments.firstSegment) - assertEquals(OptionalLong.of(1), segments.firstSegmentBaseOffset()) - assertEntry(seg, segments.lastEntry.get) - assertEquals(Optional.of(seg), segments.lastSegment) - } - - // Test baseOffsets - assertEquals(Seq(offset1, offset2, offset3, offset4), segments.baseOffsets.asScala.toSeq) - - // Test values - assertEquals(Seq(seg1, seg2, seg3, seg4), segments.values.asScala.toSeq) - - // Test values(to, from) - assertThrows(classOf[IllegalArgumentException], () => segments.values(2, 1)) - assertEquals(Seq(), segments.values(1, 1).asScala.toSeq) - assertEquals(Seq(seg1), segments.values(1, 2).asScala.toSeq) - assertEquals(Seq(seg1, seg2), segments.values(1, 3).asScala.toSeq) - assertEquals(Seq(seg1, seg2, seg3), segments.values(1, 4).asScala.toSeq) - assertEquals(Seq(seg2, seg3), segments.values(2, 4).asScala.toSeq) - assertEquals(Seq(seg3), segments.values(3, 4).asScala.toSeq) - assertEquals(Seq(), segments.values(4, 4).asScala.toSeq) - assertEquals(Seq(seg4), segments.values(4, 5).asScala.toSeq) - - // Test activeSegment - assertEquals(seg4, segments.activeSegment()) - - // Test nonActiveLogSegmentsFrom - assertEquals(Seq(seg2, seg3), segments.nonActiveLogSegmentsFrom(2).asScala.toSeq) - assertEquals(Seq(), segments.nonActiveLogSegmentsFrom(4).asScala.toSeq) - - segments.close() - } - - @Test - def testClosestMatchOperations(): Unit = { - val segments = new LogSegments(topicPartition) - - val seg1 = createSegment(1) - val seg2 = createSegment(3) - val seg3 = createSegment(5) - val seg4 = createSegment(7) - - List(seg1, seg2, seg3, seg4).foreach(segments.add) - - // Test floorSegment - assertEquals(Optional.of(seg1), segments.floorSegment(2)) - assertEquals(Optional.of(seg2), segments.floorSegment(3)) - - // Test lowerSegment - assertEquals(Optional.of(seg1), segments.lowerSegment(3)) - assertEquals(Optional.of(seg2), segments.lowerSegment(4)) - - // Test higherSegment, higherEntry - assertEquals(Optional.of(seg3), segments.higherSegment(4)) - assertEntry(seg3, segments.higherEntry(4).get) - assertEquals(Optional.of(seg4), segments.higherSegment(5)) - assertEntry(seg4, segments.higherEntry(5).get) - - segments.close() - } - - @Test - def testHigherSegments(): Unit = { - val segments = new LogSegments(topicPartition) - - val seg1 = createSegment(1) - val seg2 = createSegment(3) - val seg3 = createSegment(5) - val seg4 = createSegment(7) - val seg5 = createSegment(9) - - List(seg1, seg2, seg3, seg4, seg5).foreach(segments.add) - - // higherSegments(0) should return all segments in order - { - val iterator = segments.higherSegments(0).iterator - List(seg1, seg2, seg3, seg4, seg5).foreach { - segment => - assertTrue(iterator.hasNext) - assertEquals(segment, iterator.next()) - } - assertFalse(iterator.hasNext) - } - - // higherSegments(1) should return all segments in order except seg1 - { - val iterator = segments.higherSegments(1).iterator - List(seg2, seg3, seg4, seg5).foreach { - segment => - assertTrue(iterator.hasNext) - assertEquals(segment, iterator.next()) - } - assertFalse(iterator.hasNext) - } - - // higherSegments(8) should return only seg5 - { - val iterator = segments.higherSegments(8).iterator - assertTrue(iterator.hasNext) - assertEquals(seg5, iterator.next()) - assertFalse(iterator.hasNext) - } - - // higherSegments(9) should return no segments - { - val iterator = segments.higherSegments(9).iterator - assertFalse(iterator.hasNext) - } - - segments.close() - } - - @Test - def testSizeForLargeLogs(): Unit = { - val largeSize = Int.MaxValue.toLong * 2 - val logSegment: LogSegment = mock(classOf[LogSegment]) - - when(logSegment.size).thenReturn(Int.MaxValue) - - assertEquals(Int.MaxValue, LogSegments.sizeInBytes(asList(logSegment))) - assertEquals(largeSize, LogSegments.sizeInBytes(asList(logSegment, logSegment))) - assertTrue(UnifiedLog.sizeInBytes(asList(logSegment, logSegment)) > Int.MaxValue) - - val logSegments: LogSegments = new LogSegments(topicPartition) - logSegments.add(logSegment) - assertEquals(Int.MaxValue, logSegments.sizeInBytes()) - - logSegment.close() - } - - @Test - def testUpdateDir(): Unit = { - val seg1 = createSegment(1) - val segments = new LogSegments(topicPartition) - segments.add(seg1) - - val newDir: File = TestUtils.tempDir() - segments.updateParentDir(newDir) - assertEquals(newDir, seg1.log().file().getParentFile) - assertEquals(newDir, seg1.timeIndexFile().getParentFile) - assertEquals(newDir, seg1.offsetIndexFile().getParentFile) - assertEquals(newDir, seg1.txnIndex().file().getParentFile) - - seg1.close() - Utils.delete(newDir) - } -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java index 580199f98c63..586d40e90b37 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Collection; @@ -34,7 +35,7 @@ * This class encapsulates a thread-safe navigable map of LogSegment instances and provides the * required read and write behavior on the map. */ -public class LogSegments { +public class LogSegments implements Closeable { private final TopicPartition topicPartition; /* the segments of the log with key being LogSegment base offset and value being a LogSegment */ @@ -102,6 +103,7 @@ public void clear() { /** * Close all segments. */ + @Override public void close() throws IOException { for (LogSegment s : values()) s.close(); diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java new file mode 100644 index 000000000000..43da918b29d3 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LogSegmentsTest { + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private static File logDir = null; + + /* create a segment with the given base offset */ + private static LogSegment createSegment(Long offset) throws IOException { + return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM); + } + + @BeforeEach + public void setup() { + logDir = TestUtils.tempDirectory(); + } + + @AfterEach + public void teardown() throws IOException { + Utils.delete(logDir); + } + + private void assertEntry(LogSegment segment, Map.Entry tested) { + assertEquals(segment.baseOffset(), tested.getKey()); + assertEquals(segment, tested.getValue()); + } + + @Test + public void testBasicOperations() throws IOException { + long offset1 = 40; + long offset2 = 80; + try (LogSegments segments = new LogSegments(topicPartition); + LogSegment seg1 = createSegment(offset1); + LogSegment seg2 = createSegment(offset2); + LogSegment seg3 = createSegment(offset1)) { + + assertTrue(segments.isEmpty()); + assertFalse(segments.nonEmpty()); + + // Add seg1 + segments.add(seg1); + assertFalse(segments.isEmpty()); + assertTrue(segments.nonEmpty()); + assertEquals(1, segments.numberOfSegments()); + assertTrue(segments.contains(offset1)); + assertEquals(Optional.of(seg1), segments.get(offset1)); + + // Add seg2 + segments.add(seg2); + assertFalse(segments.isEmpty()); + assertTrue(segments.nonEmpty()); + assertEquals(2, segments.numberOfSegments()); + assertTrue(segments.contains(offset2)); + assertEquals(Optional.of(seg2), segments.get(offset2)); + + // Replace seg1 with seg3 + segments.add(seg3); + assertFalse(segments.isEmpty()); + assertTrue(segments.nonEmpty()); + assertEquals(2, segments.numberOfSegments()); + assertTrue(segments.contains(offset1)); + assertEquals(Optional.of(seg3), segments.get(offset1)); + + // Remove seg2 + segments.remove(offset2); + assertFalse(segments.isEmpty()); + assertTrue(segments.nonEmpty()); + assertEquals(1, segments.numberOfSegments()); + assertFalse(segments.contains(offset2)); + + // Clear all segments including seg3 + segments.clear(); + assertTrue(segments.isEmpty()); + assertFalse(segments.nonEmpty()); + assertEquals(0, segments.numberOfSegments()); + assertFalse(segments.contains(offset1)); + } + } + + @Test + public void testSegmentAccess() throws IOException { + try (LogSegments segments = new LogSegments(topicPartition)) { + long offset1 = 1; + LogSegment seg1 = createSegment(offset1); + long offset2 = 2; + LogSegment seg2 = createSegment(offset2); + long offset3 = 3; + LogSegment seg3 = createSegment(offset3); + long offset4 = 4; + LogSegment seg4 = createSegment(offset4); + + // Test firstEntry, lastEntry + List segmentList = Arrays.asList(seg1, seg2, seg3, seg4); + for (LogSegment seg : segmentList) { + segments.add(seg); + assertEntry(seg1, segments.firstEntry().get()); + assertEquals(Optional.of(seg1), segments.firstSegment()); + assertEquals(OptionalLong.of(1), segments.firstSegmentBaseOffset()); + assertEntry(seg, segments.lastEntry().get()); + assertEquals(Optional.of(seg), segments.lastSegment()); + } + + // Test baseOffsets + assertEquals(Arrays.asList(offset1, offset2, offset3, offset4), segments.baseOffsets()); + + // Test values + assertEquals(Arrays.asList(seg1, seg2, seg3, seg4), new ArrayList<>(segments.values())); + + // Test values(to, from) + assertThrows(IllegalArgumentException.class, () -> segments.values(2, 1)); + assertEquals(Collections.emptyList(), segments.values(1, 1)); + assertEquals(Collections.singletonList(seg1), new ArrayList<>(segments.values(1, 2))); + assertEquals(Arrays.asList(seg1, seg2), new ArrayList<>(segments.values(1, 3))); + assertEquals(Arrays.asList(seg1, seg2, seg3), new ArrayList<>(segments.values(1, 4))); + assertEquals(Arrays.asList(seg2, seg3), new ArrayList<>(segments.values(2, 4))); + assertEquals(Collections.singletonList(seg3), new ArrayList<>(segments.values(3, 4))); + assertEquals(Collections.emptyList(), new ArrayList<>(segments.values(4, 4))); + assertEquals(Collections.singletonList(seg4), new ArrayList<>(segments.values(4, 5))); + + // Test activeSegment + assertEquals(seg4, segments.activeSegment()); + + // Test nonActiveLogSegmentsFrom + assertEquals(Arrays.asList(seg2, seg3), new ArrayList<>(segments.nonActiveLogSegmentsFrom(2))); + assertEquals(Collections.emptyList(), new ArrayList<>(segments.nonActiveLogSegmentsFrom(4))); + } + } + + @Test + public void testClosestMatchOperations() throws IOException { + try (LogSegments segments = new LogSegments(topicPartition)) { + LogSegment seg1 = createSegment(1L); + LogSegment seg2 = createSegment(3L); + LogSegment seg3 = createSegment(5L); + LogSegment seg4 = createSegment(7L); + + Arrays.asList(seg1, seg2, seg3, seg4).forEach(segments::add); + + // Test floorSegment + assertEquals(Optional.of(seg1), segments.floorSegment(2)); + assertEquals(Optional.of(seg2), segments.floorSegment(3)); + + // Test lowerSegment + assertEquals(Optional.of(seg1), segments.lowerSegment(3)); + assertEquals(Optional.of(seg2), segments.lowerSegment(4)); + + // Test higherSegment, higherEntry + assertEquals(Optional.of(seg3), segments.higherSegment(4)); + assertEntry(seg3, segments.higherEntry(4).get()); + assertEquals(Optional.of(seg4), segments.higherSegment(5)); + assertEntry(seg4, segments.higherEntry(5).get()); + } + } + + @Test + public void testHigherSegments() throws IOException { + try (LogSegments segments = new LogSegments(topicPartition)) { + LogSegment seg1 = createSegment(1L); + LogSegment seg2 = createSegment(3L); + LogSegment seg3 = createSegment(5L); + LogSegment seg4 = createSegment(7L); + LogSegment seg5 = createSegment(9L); + + Arrays.asList(seg1, seg2, seg3, seg4, seg5).forEach(segments::add); + + // higherSegments(0) should return all segments in order + { + final Iterator iterator = segments.higherSegments(0).iterator(); + Arrays.asList(seg1, seg2, seg3, seg4, seg5).forEach(segment -> { + assertTrue(iterator.hasNext()); + assertEquals(segment, iterator.next()); + }); + assertFalse(iterator.hasNext()); + } + + // higherSegments(1) should return all segments in order except seg1 + { + final Iterator iterator = segments.higherSegments(1).iterator(); + Arrays.asList(seg2, seg3, seg4, seg5).forEach(segment -> { + assertTrue(iterator.hasNext()); + assertEquals(segment, iterator.next()); + }); + assertFalse(iterator.hasNext()); + } + + // higherSegments(8) should return only seg5 + { + final Iterator iterator = segments.higherSegments(8).iterator(); + assertTrue(iterator.hasNext()); + assertEquals(seg5, iterator.next()); + assertFalse(iterator.hasNext()); + } + + // higherSegments(9) should return no segments + { + final Iterator iterator = segments.higherSegments(9).iterator(); + assertFalse(iterator.hasNext()); + } + } + } + + @Test + public void testSizeForLargeLogs() throws IOException { + try (LogSegment logSegment = mock(LogSegment.class)) { + long largeSize = (long) Integer.MAX_VALUE * 2; + + when(logSegment.size()).thenReturn(Integer.MAX_VALUE); + + assertEquals(Integer.MAX_VALUE, LogSegments.sizeInBytes(Collections.singletonList(logSegment))); + assertEquals(largeSize, LogSegments.sizeInBytes(Arrays.asList(logSegment, logSegment))); + assertTrue(LogSegments.sizeInBytes(Arrays.asList(logSegment, logSegment)) > Integer.MAX_VALUE); + + try (LogSegments logSegments = new LogSegments(topicPartition)) { + logSegments.add(logSegment); + assertEquals(Integer.MAX_VALUE, logSegments.sizeInBytes()); + } + } + } + + @Test + public void testUpdateDir() throws IOException { + try (LogSegment seg1 = createSegment(1L); + LogSegments segments = new LogSegments(topicPartition)) { + + segments.add(seg1); + File newDir = TestUtils.tempDirectory(); + segments.updateParentDir(newDir); + assertEquals(newDir, seg1.log().file().getParentFile()); + assertEquals(newDir, seg1.timeIndexFile().getParentFile()); + assertEquals(newDir, seg1.offsetIndexFile().getParentFile()); + assertEquals(newDir, seg1.txnIndex().file().getParentFile()); + + Utils.delete(newDir); + } + } + +} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java new file mode 100644 index 000000000000..50b666a0fb1d --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.utils.Time; + +import java.io.File; +import java.io.IOException; + +public class LogTestUtils { + public static LogSegment createSegment(long offset, File logDir, int indexIntervalBytes, Time time) throws IOException { + // Create instances of the required components + FileRecords ms = FileRecords.open(LogFileUtils.logFile(logDir, offset)); + LazyIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(logDir, offset), offset, 1000); + LazyIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(logDir, offset), offset, 1500); + TransactionIndex txnIndex = new TransactionIndex(offset, LogFileUtils.transactionIndexFile(logDir, offset, "")); + + // Create and return the LogSegment instance + return new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time); + } +}