Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix: scanner.next() won't automatically recover when server side not …
Browse files Browse the repository at this point in the history
…return ERR_OK (#156)
  • Loading branch information
foreverneverer authored May 8, 2021
1 parent fdcdc6f commit b8b01a0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ private void onRecvRpcResponse(error_code err, scan_response response) {
}
} else { // rpc failed
_encounterError = true;
_rpcFailed = true;
_cause = new PException("scan failed with error: " + err.errno);
}
}
Expand All @@ -233,7 +234,17 @@ private void asyncNextInternal() {
p.setFailure(_cause);
}
_promises.clear();
// we don't reset the flag, just abandon this scan operation
if (_rpcFailed) { // reset _encounterError so that next loop will recall server
// for read, if error is equal with:
// - ERR_SESSION_RESET,ERR_OBJECT_NOT_FOUND,ERR_INVALID_STATE: the meta config must have
// been updated, next loop will use new config and try recover.
// - ERR_TIMEOUT or other error: meta config not be updated, next loop will only be retry.
// detail see TableHandler#onRpcReplay
_encounterError = false;
_rpcFailed = false;
}
// rpc succeed but still encounter unknown error in server side, not reset _encounterError and
// abandon the scanner
return;
}
while (!_promises.isEmpty()) {
Expand Down Expand Up @@ -288,6 +299,17 @@ private void contextReset() {
_contextId = CONTEXT_ID_NOT_EXIST;
}

protected void mockEncounterErrorForTest() {
_encounterError = true;
_cause = new PException("encounter unknown error");
}

protected void mockRpcErrorForTest() {
_encounterError = true;
_rpcFailed = true;
_cause = new PException("scan failed with error rpc");
}

private Table _table;
private blob _startKey;
private blob _stopKey;
Expand All @@ -308,7 +330,8 @@ private void contextReset() {
private Deque<DefaultPromise<Pair<Pair<byte[], byte[]>, byte[]>>> _promises;
private boolean _rpcRunning;
// mark whether scan operation encounter error
private boolean _encounterError;
protected boolean _encounterError; // set protect only for test class access
protected boolean _rpcFailed; // set protect only for test class access
Throwable _cause;

private boolean _needCheckHash;
Expand Down
67 changes: 67 additions & 0 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class TestScan {
static char[] CCH =
Expand Down Expand Up @@ -332,6 +333,72 @@ public void testHashKeyFilteringScan() throws PException {
compareSortMap(data, base.get(expectedHashKey), expectedHashKey);
}

@Test
public void testScanFailRecover() throws PException {
PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
String tableName = "temp";
String hashKey = "scanHashKey";
int count = 0;
while (count++ < 100) {
client.set(tableName, hashKey.getBytes(), String.valueOf(count).getBytes(), "".getBytes());
}

ScanOptions options = new ScanOptions();
options.batchSize = 1;
PegasusScannerInterface scanner =
client.getScanner(tableName, hashKey.getBytes(), "".getBytes(), "".getBytes(), options);
Pair<Pair<byte[], byte[]>, byte[]> item;
List<Pair<Pair<byte[], byte[]>, byte[]>> items = new ArrayList<>();

// test encounter error
int loop = 0;
boolean encounterErrorMocked = false;
while (loop++ < 100) {
try {
if ((item = scanner.next()) != null) {
items.add(item);
if (!encounterErrorMocked) {
// only mock _encounterError = true, all the follow request will be failed
((PegasusScanner) scanner).mockEncounterErrorForTest();
encounterErrorMocked = true;
}
}
} catch (PException e) {
Assertions.assertTrue(e.getMessage().contains("encounter unknown error"));
}
}
Assertions.assertEquals(1, items.size());
Assertions.assertTrue(((PegasusScanner) scanner)._encounterError);
Assertions.assertFalse(((PegasusScanner) scanner)._rpcFailed);
((PegasusScanner) scanner)._encounterError = false;
items.clear();

// test encounter rpc error
boolean rpcErrorMocked = false;
scanner =
client.getScanner(tableName, hashKey.getBytes(), "".getBytes(), "".getBytes(), options);
while (true) {
try {
if ((item = scanner.next()) != null) {
items.add(item);
if (!rpcErrorMocked) {
// mock _encounterError = true and _rpcFailed = true, follow request will be recovered
// automatically
((PegasusScanner) scanner).mockRpcErrorForTest();
rpcErrorMocked = true;
}
} else {
break;
}
} catch (PException e) {
Assertions.assertTrue(e.getMessage().contains("scan failed with error rpc"));
}
}
Assertions.assertEquals(100, items.size());
Assertions.assertFalse(((PegasusScanner) scanner)._encounterError);
Assertions.assertFalse(((PegasusScanner) scanner)._rpcFailed);
}

private static void clearDatabase() throws PException {
ScanOptions options = new ScanOptions();
List<PegasusScannerInterface> scanners = client.getUnorderedScanners(tableName, 1, options);
Expand Down

0 comments on commit b8b01a0

Please sign in to comment.