Skip to content

Commit

Permalink
data corruption test case
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuker committed Oct 15, 2017
1 parent b2b49d2 commit e06e81e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
23 changes: 19 additions & 4 deletions src/main/java/rtalk/RTalk.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rtalk;

import static java.util.UUID.randomUUID;
import static java.util.logging.Level.SEVERE;
import static java.util.stream.Collectors.toMap;

import java.io.PrintWriter;
Expand All @@ -11,12 +12,15 @@
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public class RTalk {
//java util logging is used here to introduce no external dependencies
private final static Logger log = Logger.getLogger(RTalk.class.getName());

public static final String KICKED = "KICKED";
public static final String DELETED = "DELETED";
Expand Down Expand Up @@ -212,7 +216,8 @@ protected Response on(Response response) {
}

private static final String fTube = "tube";
private static final String fState = "state";
//VisibleForTesting
static final String fState = "state";
private static final String fPriority = "pri";
private static final String fReserves = "reserves";
private static final String fCtime = "ctime";
Expand All @@ -225,7 +230,8 @@ protected Response on(Response response) {

private static final String fBuryReason = "error";

private String kJob(String id) {
//VisibleForTesting
String kJob(String id) {
return tube + "_" + id;
}

Expand Down Expand Up @@ -393,10 +399,19 @@ public synchronized Response reserve(long blockTimeoutMsec) {
Optional<Job> firstJob_ = ids
.stream()
.map(id -> _getJob(r, id))
.filter(j -> j != null && !Job.BURIED.equals(j.state))
.filter(j -> {
boolean ok = j != null && !Job.BURIED.equals(j.state);
if (j == null) {
log.log(SEVERE, "data corruption detected on tube " + tube + " job == null in ready queue "
+ kReadyQueue);
} else if (Job.BURIED.equals(j.state)) {
log.log(SEVERE, "data corruption detected on tube " + tube + " job " + j.id
+ " is BURIED but in ready queue " + kReadyQueue);
}
return ok;
})
.findFirst();
if (!firstJob_.isPresent() && (readyQueueSize != 0 || toLong(r.zcard(kReadyQueue)) != 0)) {
System.err.println("data corruption detected on tube " + tube);
ids = r.zrange(kReadyQueue, 0, -1);
firstJob_ = ids
.stream()
Expand Down
26 changes: 26 additions & 0 deletions src/test/java/rtalk/RTalkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.Before;
Expand Down Expand Up @@ -232,4 +233,29 @@ public void testTouchKeepsJobReserved() throws Exception {
Job statsJob2 = rt.statsJob(put.id);
assertEquals(Job.RESERVED, statsJob2.state);
}

@Test
public void testDataCorruption() throws Exception {
RTalk rt = new RTalk(jedisPool);
Response a = rt.put(0, 0, 1000, "a");
Response b = rt.put(1, 0, 1000, "b");

//emulate data corruption
jedisPool.getResource().del(rt.kJob(a.id));
Response reserve = rt.reserve();
assertEquals(b.id, reserve.id);
}

@Test
public void testDataCorruptionBuried() throws Exception {
RTalk rt = new RTalk(jedisPool);
Response a = rt.put(0, 0, 1000, "a");
Response b = rt.put(1, 0, 1000, "b");

//emulate data corruption
jedisPool.getResource().hset(rt.kJob(a.id), RTalk.fState, Job.BURIED);

Response reserve = rt.reserve();
assertEquals(b.id, reserve.id);
}
}

0 comments on commit e06e81e

Please sign in to comment.