Skip to content

Commit

Permalink
Merge pull request #760 from komamitsu/fix-potential-data-loss
Browse files Browse the repository at this point in the history
Fix potential data loss
  • Loading branch information
komamitsu authored Dec 17, 2023
2 parents 5376c37 + a118c11 commit 58f2197
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ private RetentionBuffer prepareBuffer(String tag, int writeSize)

RetentionBuffer newBuffer = new RetentionBuffer(acquiredBuffer, System.currentTimeMillis());
if (retentionBuffer != null) {
retentionBuffer.getByteBuffer().flip();
newBuffer.getByteBuffer().put(retentionBuffer.getByteBuffer());
ByteBuffer buf = retentionBuffer.getByteBuffer().duplicate();
buf.flip();
newBuffer.getByteBuffer().put(buf);
bufferPool.returnBuffer(retentionBuffer.getByteBuffer());
}
LOG.trace("prepareBuffer(): allocate a new buffer. tag={}, buffer={}", tag, newBuffer);
Expand Down Expand Up @@ -334,8 +335,9 @@ private void moveRetentionBufferToFlushable(String tag, RetentionBuffer buffer)
{
try {
LOG.trace("moveRetentionBufferToFlushable(): tag={}, buffer={}", tag, buffer);
buffer.getByteBuffer().flip();
flushableBuffers.put(new TaggableBuffer(tag, buffer.getByteBuffer()));
ByteBuffer buf = buffer.getByteBuffer().duplicate();
buf.flip();
flushableBuffers.put(new TaggableBuffer(tag, buf));
retentionBuffers.put(tag, null);
}
catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.komamitsu.fluency.recordformat.RecordFormatter;
import org.komamitsu.fluency.util.Tuple;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +38,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -192,6 +192,43 @@ void flush()
verify(ingester, times(1)).ingest(eq("foodb.bartbl"), any(ByteBuffer.class));
}

@Test
void flushWithInterrupt()
throws IOException
{
Buffer buffer = new Buffer(bufferConfig, recordFormatter);

Ingester ingester = mock(Ingester.class);
AtomicReference<ByteBuffer> receivedBuffer = new AtomicReference<>();
doAnswer((Answer<Void>) invocation -> {
receivedBuffer.set(((ByteBuffer)invocation.getArgument(1)).duplicate());
return null;
}).when(ingester).ingest(anyString(), any(ByteBuffer.class));

Map<String, Object> data = new HashMap<>();
data.put("name", "komamitsu");
buffer.append("foodb.bartbl", 1420070400, data);

try {
Thread.currentThread().interrupt();
buffer.flush(ingester, true);
fail();
}
catch (IOException e) {
// Caused by interruption. This is expected.
}

// Retry
buffer.flush(ingester, true);

verify(ingester, times(1)).ingest(eq("foodb.bartbl"), any(ByteBuffer.class));
ObjectMapper objectMapper = new ObjectMapper();
byte[] receivedData = new byte[receivedBuffer.get().remaining()];
receivedBuffer.get().get(receivedData);
Map<String, Object> deserialized = objectMapper.readValue(receivedData, new TypeReference<Map<String, Object>>() {});
System.out.println(deserialized);
}

@Test
void testFileBackup()
throws IOException
Expand Down

0 comments on commit 58f2197

Please sign in to comment.