Skip to content

Commit

Permalink
URI path filtering support in cluster stats API
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
Swetha Guptha committed Sep 17, 2024
1 parent 12ff5ed commit 0038530
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand All @@ -47,6 +48,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

/**
* Cluster Stats per index
Expand All @@ -68,14 +71,57 @@ public class ClusterStatsIndices implements ToXContentFragment {
private MappingStats mappings;

public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) {
this(Metric.allIndicesMetrics(), nodeResponses, mappingStats, analysisStats);

}

public ClusterStatsIndices(
Set<String> requestedMetrics,
List<ClusterStatsNodeResponse> nodeResponses,
MappingStats mappingStats,
AnalysisStats analysisStats
) {
Map<String, ShardStats> countsPerIndex = new HashMap<>();

this.docs = new DocsStats();
this.store = new StoreStats();
this.fieldData = new FieldDataStats();
this.queryCache = new QueryCacheStats();
this.completion = new CompletionStats();
this.segments = new SegmentsStats();
Consumer<DocsStats> docsStatsConsumer = (docs) -> {
if (Metric.DOCS.containedIn(requestedMetrics)) {
if (this.docs == null) this.docs = new DocsStats();
this.docs.add(docs);
}
};
Consumer<StoreStats> storeStatsConsumer = (store) -> {
if (Metric.STORE.containedIn(requestedMetrics)) {
if (this.store == null) this.store = new StoreStats();
this.store.add(store);
}
};
Consumer<FieldDataStats> fieldDataConsumer = (fieldDataStats) -> {
if (Metric.FIELDDATA.containedIn(requestedMetrics)) {
if (this.fieldData == null) this.fieldData = new FieldDataStats();
this.fieldData.add(fieldDataStats);
}
};

Consumer<QueryCacheStats> queryCacheStatsConsumer = (queryCacheStats) -> {
if (Metric.QUERY_CACHE.containedIn(requestedMetrics)) {
if (this.queryCache == null) this.queryCache = new QueryCacheStats();
this.queryCache.add(queryCacheStats);
}
};

Consumer<CompletionStats> completionStatsConsumer = (completionStats) -> {
if (Metric.COMPLETION.containedIn(requestedMetrics)) {
if (this.completion == null) this.completion = new CompletionStats();
this.completion.add(completionStats);
}
};

Consumer<SegmentsStats> segmentsStatsConsumer = (segmentsStats) -> {
if (Metric.SEGMENTS.containedIn(requestedMetrics)) {
if (this.segments == null) this.segments = new SegmentsStats();
this.segments.add(segmentsStats);
}
};

for (ClusterStatsNodeResponse r : nodeResponses) {
// Aggregated response from the node
Expand All @@ -92,12 +138,12 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
}
}

docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
docsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.docs);
storeStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.store);
fieldDataConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCacheStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completionStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.completion);
segmentsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
Expand All @@ -113,21 +159,23 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
docsStatsConsumer.accept(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
storeStatsConsumer.accept(shardCommonStats.store);
fieldDataConsumer.accept(shardCommonStats.fieldData);
queryCacheStatsConsumer.accept(shardCommonStats.queryCache);
completionStatsConsumer.accept(shardCommonStats.completion);
segmentsStatsConsumer.accept(shardCommonStats.segments);
}
}
}

shards = new ShardStats();
indexCount = countsPerIndex.size();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
if (Metric.SHARDS.containedIn(requestedMetrics)) {
shards = new ShardStats();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
}
}

this.mappings = mappingStats;
Expand Down Expand Up @@ -186,13 +234,27 @@ static final class Fields {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.COUNT, indexCount);
shards.toXContent(builder, params);
docs.toXContent(builder, params);
store.toXContent(builder, params);
fieldData.toXContent(builder, params);
queryCache.toXContent(builder, params);
completion.toXContent(builder, params);
segments.toXContent(builder, params);
if (shards != null) {
shards.toXContent(builder, params);
}
if (docs != null) {
docs.toXContent(builder, params);
}
if (store != null) {
store.toXContent(builder, params);
}
if (fieldData != null) {
fieldData.toXContent(builder, params);
}
if (queryCache != null) {
queryCache.toXContent(builder, params);
}
if (completion != null) {
completion.toXContent(builder, params);
}
if (segments != null) {
segments.toXContent(builder, params);
}
if (mappings != null) {
mappings.toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.annotation.PublicApi;
Expand Down Expand Up @@ -90,37 +91,47 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final IngestStats ingestStats;

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this(Metric.allNodesMetrics(), nodeResponses);
}

ClusterStatsNodes(Set<String> requestedMetrics, List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
this.fs = new FsInfo.Path();
this.plugins = new HashSet<>();
boolean isFSInfoRequested = Metric.FS.containedIn(requestedMetrics);
boolean isPluginsInfoRequested = Metric.PLUGINS.containedIn(requestedMetrics);
this.fs = isFSInfoRequested ? new FsInfo.Path() : null;
this.plugins = isPluginsInfoRequested ? new HashSet<>() : null;

Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.size());
List<NodeInfo> nodeInfos = new ArrayList<>(nodeResponses.size());
List<NodeStats> nodeStats = new ArrayList<>(nodeResponses.size());
List<NodeStats> nodesStats = new ArrayList<>(nodeResponses.size());
for (ClusterStatsNodeResponse nodeResponse : nodeResponses) {
nodeInfos.add(nodeResponse.nodeInfo());
nodeStats.add(nodeResponse.nodeStats());
this.versions.add(nodeResponse.nodeInfo().getVersion());
this.plugins.addAll(nodeResponse.nodeInfo().getInfo(PluginsAndModules.class).getPluginInfos());
NodeInfo nodeInfo = nodeResponse.nodeInfo();
NodeStats nodeStats = nodeResponse.nodeStats();
nodeInfos.add(nodeInfo);
nodesStats.add(nodeStats);
this.versions.add(nodeInfo.getVersion());
if (isPluginsInfoRequested) {
this.plugins.addAll(nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos());
}

// now do the stats that should be deduped by hardware (implemented by ip deduping)
TransportAddress publishAddress = nodeResponse.nodeInfo().getInfo(TransportInfo.class).address().publishAddress();
TransportAddress publishAddress = nodeInfo.getInfo(TransportInfo.class).address().publishAddress();
final InetAddress inetAddress = publishAddress.address().getAddress();
if (!seenAddresses.add(inetAddress)) {
continue;
}
if (nodeResponse.nodeStats().getFs() != null) {
this.fs.add(nodeResponse.nodeStats().getFs().getTotal());
if (isFSInfoRequested && nodeStats.getFs() != null) {
this.fs.add(nodeStats.getFs().getTotal());
}
}
this.counts = new Counts(nodeInfos);
this.os = new OsStats(nodeInfos, nodeStats);
this.process = new ProcessStats(nodeStats);
this.jvm = new JvmStats(nodeInfos, nodeStats);
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
this.os = Metric.OS.containedIn(requestedMetrics) ? new OsStats(nodeInfos, nodesStats) : null;
this.process = Metric.PROCESS.containedIn(requestedMetrics) ? new ProcessStats(nodesStats) : null;
this.jvm = Metric.JVM.containedIn(requestedMetrics) ? new JvmStats(nodeInfos, nodesStats) : null;
this.networkTypes = Metric.NETWORK_TYPES.containedIn(requestedMetrics) ? new NetworkTypes(nodeInfos) : null;
this.discoveryTypes = Metric.DISCOVERY_TYPES.containedIn(requestedMetrics) ? new DiscoveryTypes(nodeInfos) : null;
this.packagingTypes = Metric.PACKAGING_TYPES.containedIn(requestedMetrics) ? new PackagingTypes(nodeInfos) : null;
this.ingestStats = Metric.INGEST.containedIn(requestedMetrics) ? new IngestStats(nodesStats) : null;
}

public Counts getCounts() {
Expand Down Expand Up @@ -179,36 +190,54 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();

builder.startObject(Fields.OS);
os.toXContent(builder, params);
builder.endObject();
if (os != null) {
builder.startObject(Fields.OS);
os.toXContent(builder, params);
builder.endObject();
}

builder.startObject(Fields.PROCESS);
process.toXContent(builder, params);
builder.endObject();
if (process != null) {
builder.startObject(Fields.PROCESS);
process.toXContent(builder, params);
builder.endObject();
}

builder.startObject(Fields.JVM);
jvm.toXContent(builder, params);
builder.endObject();
if (jvm != null) {
builder.startObject(Fields.JVM);
jvm.toXContent(builder, params);
builder.endObject();
}

builder.field(Fields.FS);
fs.toXContent(builder, params);
if (fs != null) {
builder.field(Fields.FS);
fs.toXContent(builder, params);
}

builder.startArray(Fields.PLUGINS);
for (PluginInfo pluginInfo : plugins) {
pluginInfo.toXContent(builder, params);
if (plugins != null) {
builder.startArray(Fields.PLUGINS);
for (PluginInfo pluginInfo : plugins) {
pluginInfo.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();

builder.startObject(Fields.NETWORK_TYPES);
networkTypes.toXContent(builder, params);
builder.endObject();
if (networkTypes != null) {
builder.startObject(Fields.NETWORK_TYPES);
networkTypes.toXContent(builder, params);
builder.endObject();
}

discoveryTypes.toXContent(builder, params);
if (discoveryTypes != null) {
discoveryTypes.toXContent(builder, params);
}

packagingTypes.toXContent(builder, params);
if (packagingTypes != null) {
packagingTypes.toXContent(builder, params);
}

ingestStats.toXContent(builder, params);
if (ingestStats != null) {
ingestStats.toXContent(builder, params);
}

return builder;
}
Expand Down
Loading

0 comments on commit 0038530

Please sign in to comment.