From a6e58504945cac5d2e47f63ed97ebfa8b55367ce Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 7 Oct 2023 20:58:25 +0800 Subject: [PATCH] [Improve][Test] Add test for ResourceManager to keep task will be deployed in different node (#5518) * [Improve][Test] Add test for ResourceManager to keep task will be deployed in different node --- .../AbstractResourceManager.java | 8 +- .../ResourceRequestHandler.java | 3 +- .../resourcemanager/worker/WorkerProfile.java | 2 + .../resourcemanager/FakeResourceManager.java | 93 +++++++++++++++++++ .../ResourceManagerFunctionTest.java | 56 +++++++++++ 5 files changed, 155 insertions(+), 7 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index cf4f4e6e822..8b7e0b18643 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -29,10 +29,8 @@ import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; import com.hazelcast.internal.services.MembershipServiceEvent; -import com.hazelcast.spi.impl.InternalCompletableFuture; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.Operation; -import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; @@ -75,7 +73,7 @@ private void initWorker() { .map(Member::getAddress) .collect(Collectors.toList()); log.info("initWorker live nodes: " + aliveWorker); - List> futures = + List> futures = aliveWorker.stream() .map( worker -> @@ -86,7 +84,7 @@ private void initWorker() { worker, (WorkerProfile) p); })) .collect(Collectors.toList()); - futures.forEach(InternalCompletableFuture::join); + futures.forEach(CompletableFuture::join); log.info("registerWorker: " + registerWorker); } @@ -155,7 +153,7 @@ public void close() { isRunning = false; } - protected InvocationFuture sendToMember(Operation operation, Address address) { + protected CompletableFuture sendToMember(Operation operation, Address address) { return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, address); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java index be5993997c1..680aa1c07c8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java @@ -27,7 +27,6 @@ import com.hazelcast.cluster.Address; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; -import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import java.util.ArrayList; import java.util.Arrays; @@ -139,7 +138,7 @@ private void addSlotToCacheMap(int index, SlotProfile slotProfile) { private CompletableFuture singleResourceRequestToMember( int i, ResourceProfile r, WorkerProfile workerProfile) { - InvocationFuture future = + CompletableFuture future = resourceManager.sendToMember( new RequestSlotOperation(jobId, r), workerProfile.getAddress()); return future.whenComplete( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java index e8a73d33838..836b25201e1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java @@ -25,6 +25,7 @@ import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import lombok.AllArgsConstructor; import lombok.Data; import java.io.IOException; @@ -33,6 +34,7 @@ * Used to describe the status of the current Worker, including address and resource assign status */ @Data +@AllArgsConstructor public class WorkerProfile implements IdentifiedDataSerializable { private Address address; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java new file mode 100644 index 00000000000..9c8595e1676 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager; + +import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; +import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile; + +import com.hazelcast.cluster.Address; +import com.hazelcast.spi.impl.NodeEngine; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; + +/** Used to test ResourceManager, override init method to register more workers. */ +public class FakeResourceManager extends AbstractResourceManager { + public FakeResourceManager(NodeEngine nodeEngine) { + super(nodeEngine); + init(); + } + + @Override + public void init() { + try { + Address address1 = new Address("localhost", 5801); + WorkerProfile workerProfile1 = + new WorkerProfile( + address1, + new ResourceProfile(), + new ResourceProfile(), + new SlotProfile[] {}, + new SlotProfile[] {}); + this.registerWorker.put(address1, workerProfile1); + + Address address2 = new Address("localhost", 5802); + WorkerProfile workerProfile2 = + new WorkerProfile( + address2, + new ResourceProfile(), + new ResourceProfile(), + new SlotProfile[] {}, + new SlotProfile[] {}); + this.registerWorker.put(address2, workerProfile2); + Address address3 = new Address("localhost", 5803); + WorkerProfile workerProfile3 = + new WorkerProfile( + address3, + new ResourceProfile(), + new ResourceProfile(), + new SlotProfile[] {}, + new SlotProfile[] {}); + this.registerWorker.put(address3, workerProfile3); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Override + protected CompletableFuture sendToMember(Operation operation, Address address) { + if (operation instanceof RequestSlotOperation) { + return (CompletableFuture) + CompletableFuture.completedFuture( + new SlotAndWorkerProfile( + new WorkerProfile( + address, + new ResourceProfile(), + new ResourceProfile(), + new SlotProfile[] {}, + new SlotProfile[] {}), + new SlotProfile(address, 1, new ResourceProfile(), ""))); + } else { + return super.sendToMember(operation, address); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java new file mode 100644 index 00000000000..acb4237f070 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager; + +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.hazelcast.cluster.Address; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class ResourceManagerFunctionTest + extends AbstractSeaTunnelServerTest { + + @Test + public void testApplyResourceWithRandomResult() + throws ExecutionException, InterruptedException { + FakeResourceManager resourceManager = new FakeResourceManager(nodeEngine); + + List resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + List slotProfiles = resourceManager.applyResources(1L, resourceProfiles).get(); + Assertions.assertEquals(slotProfiles.size(), 5); + + Set
addresses = + slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet()); + Assertions.assertTrue(addresses.size() > 1); + } +}