Skip to content

Commit

Permalink
Manual merge of pull request #2
Browse files Browse the repository at this point in the history
  • Loading branch information
brianhks committed Oct 13, 2022
1 parent 79bc5ac commit 99996cb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/kairosdb/bigqueue/BigQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void applyForEach(ItemIterator iterator) throws IOException {
}

long index = this.queueFrontIndex.get();
for (long i = index; i < this.innerArray.size(); i++) {
for (long i = index; i < this.innerArray.getHeadIndex(); i++) {
iterator.forEach(this.innerArray.get(i));
}
} finally {
Expand Down
91 changes: 75 additions & 16 deletions src/test/java/org/kairosdb/bigqueue/BigQueueUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,85 @@ public void testInvalidDataPageSize() throws IOException {
}


@Test
public void testApplyForEachDoNotChangeTheQueue() throws Exception {
bigQueue = new BigQueueImpl(testDir, "testApplyForEachDoNotChangeTheQueue", BigArrayImpl.MINIMUM_DATA_PAGE_SIZE);
bigQueue.enqueue("1".getBytes());
bigQueue.enqueue("2".getBytes());
bigQueue.enqueue("3".getBytes());
@Test
public void testApplyForEachDoNotChangeTheQueue() throws Exception {
bigQueue = new BigQueueImpl(testDir, "testApplyForEachDoNotChangeTheQueue", BigArrayImpl.MINIMUM_DATA_PAGE_SIZE);
bigQueue.enqueue("1".getBytes());
bigQueue.enqueue("2".getBytes());
bigQueue.enqueue("3".getBytes());

DefaultItemIterator dii = new DefaultItemIterator();
bigQueue.applyForEach(dii);
System.out.println("[" + dii.getCount() + "] " + dii.toString());
DefaultItemIterator dii = new DefaultItemIterator();
bigQueue.applyForEach(dii);
System.out.println("[" + dii.getCount() + "] " + dii.toString());

assertEquals(3, bigQueue.size());
assertEquals(bigQueue.size(), dii.getCount());
assertEquals(3, bigQueue.size());
assertEquals(bigQueue.size(), dii.getCount());
assertEquals("1, 2, 3, ", dii.toString());

assertArrayEquals("1".getBytes(), bigQueue.dequeue());
assertArrayEquals("2".getBytes(), bigQueue.dequeue());
assertArrayEquals("3".getBytes(), bigQueue.dequeue());
assertArrayEquals("1".getBytes(), bigQueue.dequeue());
assertArrayEquals("2".getBytes(), bigQueue.dequeue());
assertArrayEquals("3".getBytes(), bigQueue.dequeue());

assertEquals(0, bigQueue.size());
}
assertEquals(0, bigQueue.size());
}


/**
* This test a simple lifecycle test where we call gc as soon as we dequeue.
* First enqueue 3 messages onto the queue.
* Then we dequeue it
* Add one more messages
* Dequeue it.
*
* Throughout the lifecycle we test the appleForEach to make sure we still get back what is on the queue.
*
* This test the gc call to make sure when the this.arrayTailIndex.get() is reset to the position before the last
* dequeue position that we still can reach all the items in the queue.
* @throws Exception if something goes wrong
*/
@Test
public void testApplyForEachTestGCLifeCycle() throws Exception {
bigQueue = new BigQueueImpl(testDir, "testApplyForEachTestGC", BigArrayImpl.MINIMUM_DATA_PAGE_SIZE);
bigQueue.enqueue("1".getBytes());
bigQueue.enqueue("2".getBytes());
bigQueue.enqueue("3".getBytes());

DefaultItemIterator dii = new DefaultItemIterator();
bigQueue.applyForEach(dii);
System.out.println("[" + dii.getCount() + "] " + dii.toString());

assertEquals(3, bigQueue.size());
assertEquals(bigQueue.size(), dii.getCount());
assertEquals("1, 2, 3, ", dii.toString());

assertArrayEquals("1".getBytes(), bigQueue.dequeue());
bigQueue.gc();
assertArrayEquals("2".getBytes(), bigQueue.dequeue());
bigQueue.gc();
assertArrayEquals("3".getBytes(), bigQueue.dequeue());
bigQueue.gc();

assertEquals(0, bigQueue.size());

// Send one more data
DefaultItemIterator dii2 = new DefaultItemIterator();
bigQueue.enqueue("1".getBytes());
bigQueue.applyForEach(dii2);
System.out.println("[" + dii2.getCount() + "] " + dii2.toString());
assertEquals(1, bigQueue.size());
assertEquals(bigQueue.size(), dii2.getCount());
assertEquals("1, ", dii2.toString());
assertArrayEquals("1".getBytes(), bigQueue.dequeue());
bigQueue.gc();

// Check nothing on the applyForEach
DefaultItemIterator dii3 = new DefaultItemIterator();
bigQueue.applyForEach(dii3);
System.out.println("[" + dii3.getCount() + "] " + dii3.toString());
assertEquals(0, bigQueue.size());
assertEquals(bigQueue.size(), dii3.getCount());
assertEquals("", dii3.toString()); // nothing to return
}

@Test
public void concurrentApplyForEachTest() throws Exception {
Expand Down

0 comments on commit 99996cb

Please sign in to comment.