Skip to content

Commit

Permalink
Added support for snapshot related metrics
Browse files Browse the repository at this point in the history
Closes: #295
Closes: #165

Note:
This commit is a result of squashing the following three commits:

* 2ac18e9
* a3a4058
* 03bd45f

Co-authored-by: Smit Patel <psmit@uber.com>

Signed-off-by: Lukáš Vlček <lukas.vlcek@aiven.io>
  • Loading branch information
psmitj authored and lukas-vlcek committed Jan 2, 2025
1 parent 8efef9a commit 57d6337
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 14 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ To disable exporting cluster settings use:
prometheus.cluster.settings: false
```

#### Snapshot metrics

By default, snapshot metrics are disabled. To enable exporting snapshot metrics use:
```
prometheus.snapshots: true
```

#### Nodes filter

Metrics include statistics about individual OpenSearch nodes.
Expand Down
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.opensearch.gradle.PropertyNormalization
import org.opensearch.gradle.test.RestIntegTestTask

import java.util.concurrent.Callable
Expand Down Expand Up @@ -113,7 +114,7 @@ dependencies {

restResources {
restApi {
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index'
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index', 'snapshot'
}
}

Expand All @@ -139,8 +140,13 @@ tasks.named("check").configure { dependsOn(integTest) }
// Temporary disable task :testingConventions
testingConventions.enabled = false

// Directory for snapshot repository
File repositoryDir = new File(project.layout.buildDirectory.get().asFile, "shared-repository")

testClusters.all {
numberOfNodes = 2
// Configuring repo path for 'fs' type snapshot repository
setting 'path.repo', "${repositoryDir.absolutePath}", PropertyNormalization.IGNORE_VALUE

// It seems cluster name can not be customized here. It gives an error:
// Testclusters does not allow the following settings to be changed:[cluster.name] for node{::yamlRestTest-0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.compuscene.metrics.prometheus;

import org.opensearch.action.ClusterStatsData;
import org.opensearch.action.SnapshotsResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand All @@ -37,6 +38,8 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand All @@ -54,19 +57,23 @@ public class PrometheusMetricsCollector {

private boolean isPrometheusClusterSettings;
private boolean isPrometheusIndices;
private boolean isPrometheusSnapshots;
private PrometheusMetricsCatalog catalog;

/**
* A constructor.
* @param catalog {@link PrometheusMetricsCatalog}
* @param isPrometheusIndices boolean flag for index level metric
* @param isPrometheusSnapshots boolean flag for snapshots related metrics
* @param isPrometheusClusterSettings boolean flag cluster settings metrics
*/
public PrometheusMetricsCollector(PrometheusMetricsCatalog catalog,
boolean isPrometheusIndices,
boolean isPrometheusSnapshots,
boolean isPrometheusClusterSettings) {
this.isPrometheusClusterSettings = isPrometheusClusterSettings;
this.isPrometheusIndices = isPrometheusIndices;
this.isPrometheusSnapshots = isPrometheusSnapshots;
this.catalog = catalog;
}

Expand All @@ -80,6 +87,7 @@ public void registerMetrics() {
registerNodeMetrics();
registerIndicesMetrics();
registerPerIndexMetrics();
registerSnapshotMetrics();
registerTransportMetrics();
registerHTTPMetrics();
registerThreadPoolMetrics();
Expand Down Expand Up @@ -465,6 +473,30 @@ private void updatePerIndexMetrics(@Nullable ClusterHealthResponse chr, @Nullabl
}
}

@SuppressWarnings("checkstyle:LineLength")
private void registerSnapshotMetrics() {
catalog.registerClusterGauge("min_snapshot_age", "Time elapsed in milliseconds since the most recent successful snapshot's start time", "sm_policy");
}

private void updateSnapshotsMetrics(@Nullable SnapshotsResponse snapshotsResponse) {
if (snapshotsResponse == null) {
return;
}
Map<String, Long> smPolicyMinSnapshotAge = new HashMap<>();
for (SnapshotInfo snapshotInfo : snapshotsResponse.getSnapshotInfos()) {
// emit min_snapshot_age metric only for successful snapshots
if (snapshotInfo.state() != SnapshotState.SUCCESS) {
continue;
}
String smPolicy = snapshotInfo.userMetadata() == null ? "adhoc" : snapshotInfo.userMetadata().getOrDefault("sm_policy", "adhoc").toString();
long snapshotAge = System.currentTimeMillis() - snapshotInfo.startTime();
smPolicyMinSnapshotAge.compute(smPolicy, (key, oldValue) -> oldValue == null ? snapshotAge : Math.min(oldValue, snapshotAge));
}
for(Map.Entry<String, Long> entry : smPolicyMinSnapshotAge.entrySet()) {
catalog.setClusterGauge("min_snapshot_age", entry.getValue(), entry.getKey());
}
}

@SuppressWarnings("checkstyle:LineLength")
private void updatePerIndexContextMetrics(String indexName, String context, CommonStats idx) {
catalog.setClusterGauge("index_doc_number", idx.getDocs().getCount(), indexName, context);
Expand Down Expand Up @@ -920,12 +952,14 @@ private void updateESSettings(@Nullable ClusterStatsData stats) {
* @param nodeStats NodeStats filtered using nodes filter
* @param indicesStats IndicesStatsResponse
* @param clusterStatsData ClusterStatsData
* @param snapshotsResponse SnapshotsResponse
*/
public void updateMetrics(String originNodeName, String originNodeId,
@Nullable ClusterHealthResponse clusterHealthResponse,
NodeStats[] nodeStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStatsData clusterStatsData) {
@Nullable ClusterStatsData clusterStatsData,
@Nullable SnapshotsResponse snapshotsResponse) {
Summary.Timer timer = catalog.startSummaryTimer(
new Tuple<>(originNodeName, originNodeId),
"metrics_generate_time_seconds");
Expand Down Expand Up @@ -956,7 +990,9 @@ public void updateMetrics(String originNodeName, String originNodeId,
if (isPrometheusClusterSettings) {
updateESSettings(clusterStatsData);
}

if (isPrometheusSnapshots) {
updateSnapshotsMetrics(snapshotsResponse);
}
timer.observeDuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,35 @@ public enum INDEX_FILTER_OPTIONS {

static String PROMETHEUS_CLUSTER_SETTINGS_KEY = "prometheus.cluster.settings";
static String PROMETHEUS_INDICES_KEY = "prometheus.indices";
static String PROMETHEUS_SNAPSHOTS_KEY = "prometheus.snapshots";
static String PROMETHEUS_NODES_FILTER_KEY = "prometheus.nodes.filter";
static String PROMETHEUS_SELECTED_INDICES_KEY = "prometheus.indices_filter.selected_indices";
static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option";

/**
* This setting is used configure weather to expose cluster settings metrics or not. The default value is true.
* This setting is used configure whether to expose cluster settings metrics or not. The default value is true.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_CLUSTER_SETTINGS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_CLUSTER_SETTINGS =
Setting.boolSetting(PROMETHEUS_CLUSTER_SETTINGS_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure weather to expose low level index metrics or not. The default value is true.
* This setting is used configure whether to expose low level index metrics or not. The default value is true.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_INDICES_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_INDICES =
Setting.boolSetting(PROMETHEUS_INDICES_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure whether to expose snapshot metrics or not. The default value is false.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_SNAPSHOTS =
Setting.boolSetting(PROMETHEUS_SNAPSHOTS_KEY, false,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure which cluster nodes to gather metrics from. The default value is _local.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_NODES_FILTER_KEY}.
Expand Down Expand Up @@ -97,6 +106,7 @@ public enum INDEX_FILTER_OPTIONS {

private volatile boolean clusterSettings;
private volatile boolean indices;
private volatile boolean snapshots;
private volatile String nodesFilter;
private volatile String selectedIndices;
private volatile INDEX_FILTER_OPTIONS selectedOption;
Expand All @@ -109,11 +119,13 @@ public enum INDEX_FILTER_OPTIONS {
public PrometheusSettings(Settings settings, ClusterSettings clusterSettings) {
setPrometheusClusterSettings(PROMETHEUS_CLUSTER_SETTINGS.get(settings));
setPrometheusIndices(PROMETHEUS_INDICES.get(settings));
setPrometheusSnapshots(PROMETHEUS_SNAPSHOTS.get(settings));
setPrometheusNodesFilter(PROMETHEUS_NODES_FILTER.get(settings));
setPrometheusSelectedIndices(PROMETHEUS_SELECTED_INDICES.get(settings));
setPrometheusSelectedOption(PROMETHEUS_SELECTED_OPTION.get(settings));
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_CLUSTER_SETTINGS, this::setPrometheusClusterSettings);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_INDICES, this::setPrometheusIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SNAPSHOTS, this::setPrometheusSnapshots);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_NODES_FILTER, this::setPrometheusNodesFilter);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_INDICES, this::setPrometheusSelectedIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_OPTION, this::setPrometheusSelectedOption);
Expand All @@ -127,6 +139,10 @@ private void setPrometheusIndices(boolean flag) {
this.indices = flag;
}

private void setPrometheusSnapshots(boolean flag) {
this.snapshots = flag;
}

private void setPrometheusNodesFilter(String filter) { this.nodesFilter = filter; }

private void setPrometheusSelectedIndices(String selectedIndices) {
Expand All @@ -153,6 +169,14 @@ public boolean getPrometheusIndices() {
return this.indices;
}

/**
* Get value of settings key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
* @return boolean value of the key
*/
public boolean getPrometheusSnapshots() {
return this.snapshots;
}

/**
* Get value of settings key {@link #PROMETHEUS_NODES_FILTER_KEY}.
* @return boolean value of the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.opensearch.action;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.search.pipeline.SearchPipelineStats;

import java.io.IOException;

Expand All @@ -43,6 +45,7 @@ public class NodePrometheusMetricsResponse extends ActionResponse {
private final NodeStats[] nodeStats;
@Nullable private final IndicesStatsResponse indicesStats;
private ClusterStatsData clusterStatsData = null;
@Nullable private final SnapshotsResponse snapshotsResponse;

/**
* A constructor that materialize the instance from inputStream.
Expand All @@ -56,6 +59,11 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
nodeStats = in.readArray(NodeStats::new, NodeStats[]::new);
indicesStats = PackageAccessHelper.createIndicesStatsResponse(in);
clusterStatsData = new ClusterStatsData(in);
if (in.getVersion().onOrAfter(Version.V_2_17_1)) {
snapshotsResponse = new SnapshotsResponse(in);
} else {
snapshotsResponse = null;
}
}

/**
Expand All @@ -65,6 +73,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
* @param nodesStats NodesStats
* @param indicesStats IndicesStats
* @param clusterStateResponse ClusterStateResponse
* @param snapshotsResponse SnapshotsResponse
* @param settings Settings
* @param clusterSettings ClusterSettings
*/
Expand All @@ -73,6 +82,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
NodeStats[] nodesStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStateResponse clusterStateResponse,
@Nullable SnapshotsResponse snapshotsResponse,
Settings settings,
ClusterSettings clusterSettings) {
this.clusterHealth = clusterHealth;
Expand All @@ -82,6 +92,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
if (clusterStateResponse != null) {
this.clusterStatsData = new ClusterStatsData(clusterStateResponse, settings, clusterSettings);
}
this.snapshotsResponse = snapshotsResponse;
}

/**
Expand All @@ -106,6 +117,15 @@ public NodeStats[] getNodeStats() {
return this.nodeStats;
}

/**
* Get internal {@link SnapshotsResponse} object.
* @return SnapshotsResponse object
*/
@Nullable
public SnapshotsResponse getSnapshotsResponse() {
return this.snapshotsResponse;
}

/**
* Get internal {@link IndicesStatsResponse} object.
* @return IndicesStatsResponse object
Expand All @@ -131,5 +151,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeArray(nodeStats);
out.writeOptionalWriteable(indicesStats);
clusterStatsData.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_17_1)) {
snapshotsResponse.writeTo(out);
}
}
}
79 changes: 79 additions & 0 deletions src/main/java/org/opensearch/action/SnapshotsResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright [2018] [Vincent VAN HOLLEBEKE]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.opensearch.action;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Represents a container class for holding response data related to snapshots.
*/
public class SnapshotsResponse extends ActionResponse {
private final List<SnapshotInfo> snapshotInfos;

/**
* A constructor.
* @param in A streamInput to materialize the instance from
* @throws IOException if there is an exception reading from inputStream
*/
public SnapshotsResponse(StreamInput in) throws IOException {
super(in);
snapshotInfos = Collections.synchronizedList(in.readList(SnapshotInfo::new));
}

/**
* A constructor.
*/
public SnapshotsResponse() {
this.snapshotInfos = Collections.synchronizedList(new ArrayList<>());
}

/**
* Writes the instance into {@link StreamOutput}.
*
* @param out the output stream to which the instance is to be written
* @throws IOException if there is an exception writing to the output stream
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(snapshotInfos);
}

/**
* Getter for {@code snapshotInfos} list.
*
* @return the list of {@link SnapshotInfo} objects
*/
public List<SnapshotInfo> getSnapshotInfos() {
return snapshotInfos;
}

/**
* Adds {@code snapshotInfosToAdd} to the {@code snapshotInfos} list.
*/
public void addSnapshotInfos(List<SnapshotInfo> snapshotInfosToAdd) {
snapshotInfos.addAll(snapshotInfosToAdd);
}
}
Loading

0 comments on commit 57d6337

Please sign in to comment.