diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java index 060af2a89853d..6b68a83da94e2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java @@ -16,8 +16,13 @@ import org.opensearch.action.search.SearchTask; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.service.ClusterService; @@ -37,7 +42,6 @@ import org.opensearch.search.backpressure.SearchBackpressureIT.TestResponse; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; -import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -55,17 +59,17 @@ import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.threadpool.ThreadPool.Names.SAME; import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; import static org.hamcrest.Matchers.instanceOf; -@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) public class WorkloadManagementIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { final static String PUT = "PUT"; final static String MEMORY = "MEMORY"; final static String CPU = "CPU"; final static String ENABLED = "enabled"; final static String DELETE = "DELETE"; - private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.SECONDS); public WorkloadManagementIT(Settings nodeSettings) { super(nodeSettings); @@ -207,7 +211,7 @@ public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) assertEquals(0, listener.getLatch().getCount()); } - public static class TestClusterUpdateRequest extends ActionRequest { + public static class TestClusterUpdateRequest extends ClusterManagerNodeRequest { final private String method; final private QueryGroup queryGroup; @@ -243,22 +247,49 @@ public String getMethod() { } } - public static class TestClusterUpdateTransportAction extends HandledTransportAction { + public static class TestClusterUpdateTransportAction extends TransportClusterManagerNodeAction { public static final ActionType ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new); - private final ClusterService clusterService; @Inject public TestClusterUpdateTransportAction( + ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, - ActionFilters actionFilters + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { - super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new); - this.clusterService = clusterService; + super( + ACTION.name(), + transportService, + clusterService, + threadPool, + actionFilters, + TestClusterUpdateRequest::new, + indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return SAME; + } + + @Override + protected TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TestClusterUpdateRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } @Override - protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { + protected void clusterManagerOperation( + TestClusterUpdateRequest request, + ClusterState clusterState, + ActionListener listener + ) { clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -350,7 +381,6 @@ protected void doExecute(Task task, TestQueryGroupTaskRequest request, ActionLis ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); assertEquals(request.getQueryGroupId(), ((QueryGroupTask) task).getQueryGroupId()); long startTime = System.nanoTime(); - while (System.nanoTime() - startTime < TIMEOUT.getNanos()) { doWork(request); if (cancellableTask.isCancelled()) { @@ -379,8 +409,9 @@ private void doWork(TestQueryGroupTaskRequest request) throws InterruptedExcepti } while (i < iterations); break; case "MEMORY": - Byte[] bytes = new Byte[100000]; - int[] ints = new int[1000000]; + int bytesToAllocate = (int) (Runtime.getRuntime().totalMemory() * 0.01); + Byte[] bytes = new Byte[bytesToAllocate]; + int[] ints = new int[bytesToAllocate]; break; } }