Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuos replication #2896

Merged
merged 26 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ee0244b
adding logic in replicate method
Sep 5, 2024
557117e
Merge branch 'master' into continuous-replication-replicate
Sep 10, 2024
69a94aa
Adding logic for group generation and added comments and tests
Sep 12, 2024
daa03c7
modifying comments
Sep 12, 2024
8e3d070
termination logic for continuous replication
Sep 16, 2024
85a9888
changing language of comments to present tense
Sep 16, 2024
56f9929
Merge branch 'continuous-replication-replicate' into continuous-repli…
Sep 16, 2024
e2a431c
Adding comments for methods
Sep 16, 2024
51bd386
Merge branch 'master' into continuous-replication-termination-2
Sep 16, 2024
70ead19
Adding comments for group tracker
Sep 16, 2024
809c0f8
adding logic for group creation for continuous replication
Sep 16, 2024
f0be250
adding test for group generation
Sep 17, 2024
b3c502e
adding logic to recheck state of ACTIVE replicas too after finishing
Sep 17, 2024
eb25871
add dummy comment
Sep 17, 2024
9a5b353
adding logic for continuous replication
Sep 18, 2024
bc293bf
adding comments and logs to code
Sep 19, 2024
21812f3
making fillStandBy private
Sep 19, 2024
6926a5c
adding metric for replica level throttling
Sep 19, 2024
e682951
Merge branch 'master' into continuos-replicate
Sep 19, 2024
d078898
Merge branch 'master' into continuos-replicate
Sep 23, 2024
5a5f2d9
resolving comments
Sep 30, 2024
1c61e34
Merge branch 'master' into continuos-replicate
Sep 30, 2024
0056da0
breaking replicateContinuous method into parts
Oct 1, 2024
663f568
moving terminate condition in finally
Oct 1, 2024
fe953a3
Merge branch 'master' into continuos-replicate
Oct 9, 2024
25ad5d8
Merge branch 'master' into continuos-replicate
Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ReplicationConfig {
public static final String REPLICA_TOKEN_FILE_NAME = "replicaTokens";
public static final String REPLICATION_MODEL_ACROSS_DATACENTERS = "replication.model.across.datacenters";
public static final String REPLICATION_ENABLE_CONTINUOUS_REPLICATION = "replication.enable.continuous.replication";
public static final String REPLICATION_GROUP_ITERATION_LIMIT = "replication.group.iteration.limit";
public static final String REPLICATION_STANDBY_WAIT_TIMEOUT_TO_TRIGGER_CROSS_COLO_FETCH_SECONDS =
"replication.standby.wait.timeout.to.trigger.cross.colo.fetch.seconds";

Expand Down Expand Up @@ -58,6 +59,17 @@ public class ReplicationConfig {
@Default("false")
public final boolean replicationEnableContinuousReplication;

/**
* Maximum number of iterations per remote replica group in a continuous replication cycle
* This is valid only when continuous replication is enabled.
* Its default value is 1, to make it compatible with old cyclic replication logic.
* When value is set to 1, each group does 1 iteration in a cycle(continuous replication call)
* as in the beginning of cycle, all groups will get created and all will reach limit of 1.
*/
@Config(REPLICATION_GROUP_ITERATION_LIMIT)
@Default("1")
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
public final int replicationGroupIterationLimit;

/**
* The number of replica threads on each server that runs the replication protocol for intra dc replication
*/
Expand Down Expand Up @@ -337,6 +349,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
DEFAULT_RECOVERY_TOKEN_FACTORY);
replicationEnableContinuousReplication =
verifiableProperties.getBoolean(REPLICATION_ENABLE_CONTINUOUS_REPLICATION, false);
replicationGroupIterationLimit = verifiableProperties.getInt(REPLICATION_GROUP_ITERATION_LIMIT, 1);
replicationNumOfIntraDCReplicaThreads =
verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1);
replicationNumOfInterDCReplicaThreads =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ protected void afterHandleReplicaMetadataResponse(RemoteReplicaGroup group) {
group.setState(ReplicaGroupReplicationState.DONE);
}

protected MessageInfo mapBlob(MessageInfo blob) {
protected MessageInfo mapBlob(MessageInfo blob, List<StoreKey> storeKeysConversionLog) {
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
StoreKey keyConvert = null;
try {
// Don't do batch-convert, if one replica in batch fails, then it affects handling others
keyConvert = storeKeyConverter.convert(Collections.singleton(blob.getStoreKey()))
keyConvert = convertStoreKeys(Collections.singleton(blob.getStoreKey()), storeKeysConversionLog)
.get(blob.getStoreKey());
} catch (Throwable e) {
metrics.backupIntegrityError.inc();
Expand Down Expand Up @@ -225,7 +225,7 @@ Set<BlobMatchStatus> recheck(RemoteReplicaInfo replica, MessageInfo serverBlob,
* @return
*/
List<ExchangeMetadataResponse> handleReplicaMetadataResponse(ReplicaMetadataResponse response,
List<RemoteReplicaInfo> replicas, DataNodeId server) {
List<RemoteReplicaInfo> replicas, DataNodeId server, List<StoreKey> storeKeysConversionLog) {
IntStream.range(0, response.getReplicaMetadataResponseInfoList().size())
.filter(i -> response.getReplicaMetadataResponseInfoList().get(i).getError() == ServerErrorCode.No_Error)
.forEach(i -> {
Expand All @@ -235,7 +235,7 @@ List<ExchangeMetadataResponse> handleReplicaMetadataResponse(ReplicaMetadataResp
metadata.getMessageInfoList().stream()
.map(serverBlob -> {
numBlobScanned.incrementAndGet();
return mapBlob(serverBlob);
return mapBlob(serverBlob, storeKeysConversionLog);
})
.filter(serverBlob -> serverBlob.getStoreKey() != null)
.map(serverBlob -> {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class ReplicationMetrics {
public final Counter interColoReplicaThreadIdleCount;
public final Counter intraColoReplicaThreadThrottleCount;
public final Counter interColoReplicaThreadThrottleCount;
public final Counter intraColoContinuousReplicationReplicaThrottleCount;
public final Counter interColoContinuousReplicationReplicaThrottleCount;
public final Counter remoteReplicaInfoRemoveError;
public final Counter remoteReplicaInfoAddError;
public final Counter allResponsedKeysExist;
Expand Down Expand Up @@ -282,6 +284,10 @@ public ReplicationMetrics(MetricRegistry registry, List<? extends ReplicaId> rep
registry.counter(MetricRegistry.name(ReplicaThread.class, "IntraColoReplicaThreadThrottleCount"));
interColoReplicaThreadThrottleCount =
registry.counter(MetricRegistry.name(ReplicaThread.class, "InterColoReplicaThreadThrottleCount"));
intraColoContinuousReplicationReplicaThrottleCount = registry.counter(
MetricRegistry.name(ReplicaThread.class, "IntraColoContinuousReplicationReplicaThrottleCount"));
interColoContinuousReplicationReplicaThrottleCount = registry.counter(
MetricRegistry.name(ReplicaThread.class, "InterColoContinuousReplicationReplicaThrottleCount"));
remoteReplicaInfoRemoveError =
registry.counter(MetricRegistry.name(ReplicaThread.class, "RemoteReplicaInfoRemoveError"));
remoteReplicaInfoAddError = registry.counter(MetricRegistry.name(ReplicaThread.class, "RemoteReplicaInfoAddError"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.replication.continuous;

import java.util.List;
import java.util.stream.Collectors;


/**
Expand All @@ -29,7 +30,18 @@ public ActiveGroupTracker(int groupId, List<ReplicaTracker> preAssignedReplicas)
this.preAssignedReplicas = preAssignedReplicas;
}

public List<ReplicaTracker> getPreAssignedReplica() {
public List<ReplicaTracker> getPreAssignedReplicas() {
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
return preAssignedReplicas;
}

public List<ReplicaTracker> getPreAssignedReplicas(List<ReplicaStatus> statuses) {
return preAssignedReplicas.stream()
.filter(replicaTracker -> statuses.contains(replicaTracker.getReplicaStatus()))
.collect(Collectors.toList());
}

@Override
public String toString() {
return "ActiveGroupTracker :[" + super.toString() + " " + preAssignedReplicas.toString() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.replication.RemoteReplicaInfo;
import com.github.ambry.replication.ReplicaThread;
import com.github.ambry.utils.Time;
import com.github.ambry.utils.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -41,9 +44,11 @@ public class DataNodeTracker {
* @param remoteReplicas remote replicas for this data node
* @param maxActiveGroupSize maximum count of replicas in active groups
* @param startGroupId group id from which we can start and increment and generate unique group id for each group
* @param time Ambry time
* @param replicaThrottleDurationMs throttle duration for replicas
*/
public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteReplicas, int maxActiveGroupSize,
int startGroupId) {
int startGroupId, Time time, long replicaThrottleDurationMs) {
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
this.dataNodeId = dataNodeId;
this.activeGroupTrackers = new ArrayList<>();

Expand All @@ -56,8 +61,9 @@ public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteRepl

// for each of smaller array of remote replicas create active group trackers with consecutive group ids
for (List<RemoteReplicaInfo> remoteReplicaList : remoteReplicaSegregatedList) {
ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId,
remoteReplicaList.stream().map(ReplicaTracker::new).collect(Collectors.toList()));
ActiveGroupTracker activeGroupTracker = new ActiveGroupTracker(currentGroupId, remoteReplicaList.stream()
.map(remoteReplicaInfo -> new ReplicaTracker(remoteReplicaInfo, time, replicaThrottleDurationMs))
.collect(Collectors.toList()));
activeGroupTrackers.add(activeGroupTracker);
currentGroupId++;
}
Expand All @@ -77,11 +83,68 @@ public DataNodeId getDataNodeId() {
return dataNodeId;
}

public List<GroupTracker> getGroupTrackers() {
List<GroupTracker> groupTrackers = new ArrayList<>(getActiveGroupTrackers());
groupTrackers.add(getStandByGroupTracker());
return groupTrackers;
}

public List<ActiveGroupTracker> getActiveGroupTrackers() {
return activeGroupTrackers;
}

public StandByGroupTracker getStandByGroupTracker() {
return standByGroupTracker;
}

/**
* @return returns remote replica groups that are getting tracked in group trackers of
* this data node tracker
*/
public List<ReplicaThread.RemoteReplicaGroup> getInflightRemoteReplicaGroups() {
return getGroupTrackers().stream()
.filter(GroupTracker::isInFlight)
.map(GroupTracker::getRemoteReplicaGroup)
.collect(Collectors.toList());
}

/**
* Iterates over all group trackers and if remote replica group is getting tracked by group tracker is done,
* removes it from group tracker
* @return returns list of remote replica groups removed from tracking
*/
public List<ReplicaThread.RemoteReplicaGroup> processFinishedGroups() {
List<ReplicaThread.RemoteReplicaGroup> finishedGroups = new ArrayList<>();
getGroupTrackers().forEach(groupTracker -> {
if (!groupTracker.isGroupDone()) {
return;
}
finishedGroups.add(groupTracker.getRemoteReplicaGroup());
groupTracker.finishIteration();
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
});
return finishedGroups;
}

/**
* @return Iterates over all group trackers and returns maximum iterations done by any group
*/
public int getMaxIterationAcrossGroups() {
return getGroupTrackers().stream().map(GroupTracker::getIterations).max(Comparator.naturalOrder()).orElse(0);
}

/**
* @return iterates over all active group trackers and returns trackers which are not tracking
* any remote replica group
*/
public List<ActiveGroupTracker> getFinishedActiveGroupTrackers() {
return getActiveGroupTrackers().stream()
.filter(groupTracker -> !groupTracker.isInFlight())
.collect(Collectors.toList());
}

@Override
public String toString() {
return "DataNodeTracker :[" + dataNodeId.toString() + " " + activeGroupTrackers.toString() + " "
+ standByGroupTracker.toString() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,78 @@
*
* {@link #groupId} group id assigned to remote replica group
* {@link #remoteReplicaGroup} group for which we have created this tracker
* {@link #inflightReplicas} replicas that are currently present in {@link #remoteReplicaGroup}
* {@link #inflightReplicaTrackers} replicas that are currently present in {@link #remoteReplicaGroup}
* {@link #iterations} total number of iterations for the remote replica group with {@link #groupId} group id
*/
public abstract class GroupTracker {
private final int groupId;
private ReplicaThread.RemoteReplicaGroup remoteReplicaGroup;
private List<ReplicaTracker> inflightReplicas;
private List<ReplicaTracker> inflightReplicaTrackers;
private int iterations;

GroupTracker(int groupId) {
this.groupId = groupId;
this.remoteReplicaGroup = null;
this.inflightReplicas = new ArrayList<>();
this.inflightReplicaTrackers = new ArrayList<>();
this.iterations = 0;
}

public int getGroupId() {
return groupId;
}

public ReplicaThread.RemoteReplicaGroup getRemoteReplicaGroup() {
return remoteReplicaGroup;
}

public List<ReplicaTracker> getInflightReplicaTrackers() {
return inflightReplicaTrackers;
}

public int getIterations() {
return iterations;
}

/**
* This method should be called, when a new remote replica group
* @param remoteReplicaGroup Remote replica group created for this tracker
* @param inflightReplicas replica trackers for replicas included in the remoteReplicaGroup
*/
public void startIteration(ReplicaThread.RemoteReplicaGroup remoteReplicaGroup,
List<ReplicaTracker> inflightReplicas) {
this.remoteReplicaGroup = remoteReplicaGroup;
this.inflightReplicas = inflightReplicas;
this.inflightReplicaTrackers = inflightReplicas;
this.iterations++;
}

/**
* removes remote replica group from tracking, finishes iterations for inflight replicas
* and removes from tracking afterward
*/
public void finishIteration() {
remoteReplicaGroup = null;
inflightReplicas.clear();

inflightReplicaTrackers.forEach(ReplicaTracker::finishIteration);
inflightReplicaTrackers.clear();
}

/**
* If remoteReplicaGroup is null, only then we consider group is not getting tracked
* @return returns true if group is getting tracked, false otherwise
*/
public boolean isInFlight() {
return remoteReplicaGroup != null;
manbearpig1996 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return checks and returns true if group is getting tracked and in done state, false otherwise
*/
public boolean isGroupDone() {
return isInFlight() && remoteReplicaGroup.isDone();
}

public boolean isAvailable() {
return remoteReplicaGroup == null;
@Override
public String toString() {
return "GroupTracker: [" + groupId + " " + inflightReplicaTrackers.toString() + " " + iterations + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
package com.github.ambry.replication.continuous;

/**
* These are states we are tracking for a continuous replication cycle
* These are statuses we are tracking for a continuous replication cycle
*
* UNKNOWN - We do not know the state
* UNKNOWN - We do not know the status
* OFFLINE - Replica is determined to be offline
* STANDBY - Replica is waiting for its data to come from intra colo replication, we should not pull data, valid in remote colo
* STANDBY_TIMED_OUT - Replica was in STANDBY, but data has not arrived for some time, so we need to pull data,
* ,valid in remote colo,
* STANDBY - Replica is waiting for its data to come from intra colo replication, does not make any replication calls until data arrives from intra colo thread,
* or it has timed out and moved to STANDBY_TIMED_OUT_ON_NO_PROGRESS status, valid in remote colo
* STANDBY_TIMED_OUT_ON_NO_PROGRESS - Replica was in STANDBY, but data has not arrived for some time, so we need to pull data,
* valid in remote colo,
* ACTIVE - We can pull data for this replica , valid for remote colo leader-leader pair and intra-colo replication.
*/
public enum ReplicaState {
UNKNOWN, OFFLINE, STANDBY, STANDBY_TIMED_OUT, ACTIVE
public enum ReplicaStatus {
UNKNOWN, OFFLINE, STANDBY, STANDBY_TIMED_OUT_ON_NO_PROGRESS, ACTIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,52 @@


/**
* This class tracks for a current state for continuous replication cycle.
* This class tracks for a current status of Replica for a continuous replication cycle.
* This also tracks whether a replicas is throttled or not in a continuous replication cycle.
*/
public class ReplicaTracker {
private final RemoteReplicaInfo remoteReplicaInfo;
private ReplicaState replicaState;
private ReplicaStatus replicaStatus;
private final Time time;
private final long throttleDurationMs;
private long throttledTill;

public ReplicaTracker(RemoteReplicaInfo remoteReplicaInfo) {
public ReplicaTracker(RemoteReplicaInfo remoteReplicaInfo, Time time, long throttleDurationMs) {
this.remoteReplicaInfo = remoteReplicaInfo;
this.replicaState = ReplicaState.UNKNOWN;
this.replicaStatus = ReplicaStatus.UNKNOWN;
this.time = time;
this.throttleDurationMs = throttleDurationMs;
this.throttledTill = 0;
}

public RemoteReplicaInfo getRemoteReplicaInfo() {
return remoteReplicaInfo;
}

public void setReplicaState(ReplicaState replicaState) {
this.replicaState = replicaState;
public void setReplicaStatus(ReplicaStatus replicaStatus) {
this.replicaStatus = replicaStatus;
}

public ReplicaState getReplicaState() {
return replicaState;
public ReplicaStatus getReplicaStatus() {
return replicaStatus;
}

public boolean isThrottled(Time time) {
return time.milliseconds() <= throttledTill;
public boolean isThrottled() {
return time.milliseconds() < throttledTill;
}

public void setThrottledTill(long throttledTill) {
this.throttledTill = throttledTill;
/**
* whenever a group is finished, this method gets called for corresponding replica trackers,
* replica status is moved to unknown and replica is throttled
*/
public void finishIteration() {
this.replicaStatus = ReplicaStatus.UNKNOWN;
this.throttledTill = time.milliseconds() + throttleDurationMs;
}

@Override
public String toString() {
return "ReplicaTracker: [" + remoteReplicaInfo.toString() + " " + replicaStatus.toString() + " " + throttledTill
+ "]";
}
}
Loading
Loading