Skip to content

Commit

Permalink
[FLINK-36201][Coordination] Don't use StateLocalitySlotAssigner when …
Browse files Browse the repository at this point in the history
…local recovery is disabled
  • Loading branch information
1996fanrui authored and dmvk committed Sep 4, 2024
1 parent 3f39a7b commit 57bc169
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
Expand Down Expand Up @@ -103,7 +104,9 @@ public SchedulerNG createInstance(
jobGraph.getJobID());

final SlotSharingSlotAllocator slotAllocator =
createSlotSharingSlotAllocator(declarativeSlotPool);
createSlotSharingSlotAllocator(
declarativeSlotPool,
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY));

final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
Expand Down Expand Up @@ -146,10 +149,11 @@ public JobManagerOptions.SchedulerType getSchedulerType() {
}

public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
DeclarativeSlotPool declarativeSlotPool) {
DeclarativeSlotPool declarativeSlotPool, boolean localRecoveryEnabled) {
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot,
declarativeSlotPool::containsFreeSlot);
declarativeSlotPool::containsFreeSlot,
localRecoveryEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,29 @@ public class SlotSharingSlotAllocator implements SlotAllocator {
private final ReserveSlotFunction reserveSlotFunction;
private final FreeSlotFunction freeSlotFunction;
private final IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction;
private final boolean localRecoveryEnabled;

private SlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
boolean localRecoveryEnabled) {
this.reserveSlotFunction = reserveSlot;
this.freeSlotFunction = freeSlotFunction;
this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
this.localRecoveryEnabled = localRecoveryEnabled;
}

public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
ReserveSlotFunction reserveSlot,
FreeSlotFunction freeSlotFunction,
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction,
boolean localRecoveryEnabled) {
return new SlotSharingSlotAllocator(
reserveSlot, freeSlotFunction, isSlotAvailableAndFreeFunction);
reserveSlot,
freeSlotFunction,
isSlotAvailableAndFreeFunction,
localRecoveryEnabled);
}

@Override
Expand Down Expand Up @@ -131,9 +138,9 @@ public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
.map(
parallelism -> {
SlotAssigner slotAssigner =
jobAllocationsInformation.isEmpty()
? new DefaultSlotAssigner()
: new StateLocalitySlotAssigner();
localRecoveryEnabled && !jobAllocationsInformation.isEmpty()
? new StateLocalitySlotAssigner()
: new DefaultSlotAssigner();
return new JobSchedulingPlan(
parallelism,
slotAssigner.assignSlots(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
Expand Down Expand Up @@ -269,7 +270,8 @@ public AdaptiveScheduler build() throws Exception {
declarativeSlotPool,
slotAllocator == null
? AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
declarativeSlotPool)
declarativeSlotPool,
jobMasterConfiguration.get(StateRecoveryOptions.LOCAL_RECOVERY))
: slotAllocator,
executorService,
userCodeLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class SlotSharingSlotAllocatorTest {
.build();
private static final IsSlotAvailableAndFreeFunction TEST_IS_SLOT_FREE_FUNCTION =
ignored -> true;
private static final boolean DISABLE_LOCAL_RECOVERY = false;

private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
Expand All @@ -77,7 +78,8 @@ void testCalculateRequiredSlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);

final ResourceCounter resourceCounter =
slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
Expand All @@ -95,7 +97,8 @@ void testDetermineParallelismWithMinimumSlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);

final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
Expand All @@ -114,7 +117,8 @@ void testDetermineParallelismWithManySlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);

final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
Expand All @@ -136,7 +140,8 @@ void testDetermineParallelismWithVariedParallelism() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
final JobInformation.VertexInformation vertex11 =
new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
Expand Down Expand Up @@ -169,7 +174,8 @@ void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);

final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
Expand All @@ -186,7 +192,8 @@ void testDetermineParallelismWithPartiallyEqualLowerUpperBound() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 1, 8, new SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
Expand Down Expand Up @@ -214,7 +221,8 @@ void testDetermineParallelismWithLowerBoundsInsufficientSlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
Expand All @@ -235,7 +243,8 @@ void testDetermineParallelismWithLowerBoundsInsufficientSlotsForPartialVertices(
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 2, 2, slotSharingGroup);
Expand All @@ -256,7 +265,8 @@ void testDetermineParallelismWithAllEqualLowerUpperBoundFreSlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 10, new SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
Expand Down Expand Up @@ -286,7 +296,8 @@ void testDetermineParallelismWithAllEqualLowerUpperBoundManySlots() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);
final JobInformation.VertexInformation vertex1 =
new TestVertexInformation(new JobVertexID(), 4, 4, new SlotSharingGroup());
final JobInformation.VertexInformation vertex2 =
Expand Down Expand Up @@ -316,7 +327,8 @@ void testReserveAvailableResources() {
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
TEST_IS_SLOT_FREE_FUNCTION);
TEST_IS_SLOT_FREE_FUNCTION,
DISABLE_LOCAL_RECOVERY);

final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
Expand Down Expand Up @@ -357,7 +369,10 @@ jobInformation, getSlots(50), JobAllocationsInformation.empty())
void testReserveUnavailableResources() {
final SlotSharingSlotAllocator slotSharingSlotAllocator =
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, ignored -> false);
TEST_RESERVE_SLOT_FUNCTION,
TEST_FREE_SLOT_FUNCTION,
ignored -> false,
DISABLE_LOCAL_RECOVERY);

final JobInformation jobInformation =
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
Expand Down Expand Up @@ -422,7 +437,8 @@ void testStickyAllocation() {
(allocationId, resourceProfile) ->
TestingPhysicalSlot.builder().build(),
(allocationID, cause, ts) -> {},
id -> false)
id -> false,
true)
.determineParallelismAndCalculateAssignment(
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)),
freeSlots,
Expand Down

0 comments on commit 57bc169

Please sign in to comment.