Skip to content

Commit

Permalink
[Weighted Shard Routing] API versioning (opensearch-project#5255)
Browse files Browse the repository at this point in the history
* Support API versioning for weighted shard routing

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
anshu1106 authored Jan 9, 2023
1 parent 32b0e61 commit e0ddf52
Show file tree
Hide file tree
Showing 27 changed files with 612 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -453,6 +454,7 @@ private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterMana
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -565,6 +567,7 @@ public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -649,6 +652,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -769,6 +773,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -809,6 +814,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Ex
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -922,6 +928,7 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.cluster.routing;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
Expand Down Expand Up @@ -79,6 +81,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand All @@ -88,6 +91,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(0)
.get();
assertEquals(response.isAcknowledged(), true);
}
Expand Down Expand Up @@ -215,6 +219,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -304,6 +309,7 @@ public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exceptio
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -361,6 +367,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -398,12 +405,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() {
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get()
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
public void testDeleteWeightedRouting_WeightsAreSet() throws IOException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
Expand All @@ -430,13 +439,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testPutAndDeleteWithVersioning() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// update weights api call with correct version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get();
assertTrue(response.isAcknowledged());

// update weights api call with incorrect version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights);
UnsupportedWeightedRoutingStateException exception = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get()
);
assertEquals(exception.status(), RestStatus.CONFLICT);

// get weights call
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();

// update weights call using version returned by get api call
weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(weightedRoutingResponse.getVersion())
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights by awareness attribute
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.setVersion(2)
.get();
assertTrue(deleteResponse.isAcknowledged());

// update weights again and make sure that version number got updated on delete
weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get();
assertTrue(deleteResponse.isAcknowledged());

// delete weights call, incorrect version number
UnsupportedWeightedRoutingStateException deleteException = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get()
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void testSearchWithWRRShardRouting() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -129,7 +130,7 @@ public void testSearchWithWRRShardRouting() throws IOException {

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertEquals(deleteResponse.isAcknowledged(), true);

hitNodes = new HashSet<>();
Expand Down
Loading

0 comments on commit e0ddf52

Please sign in to comment.