diff --git a/build.gradle b/build.gradle
index ab170a9f4..546679e67 100644
--- a/build.gradle
+++ b/build.gradle
@@ -126,9 +126,12 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'
- implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.0-rc3'
- implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
- implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'
+ implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.7.0'
+ implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.7.0'
+ implementation 'software.amazon.randomcutforest:randomcutforest-core:3.7.0'
+ //implementation files('lib/randomcutforest-core-3.5.0.jar')
+ //implementation files('lib/randomcutforest-serialization-3.5.0.jar')
+ //implementation files('lib/randomcutforest-parkservices-3.5.0.jar')
// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
@@ -402,7 +405,8 @@ testClusters.integTest {
return new RegularFile() {
@Override
File getAsFile() {
- return configurations.zipArchive.asFileTree.getSingleFile()
+ //return configurations.zipArchive.asFileTree.getSingleFile()
+ return fileTree("src/test/resources/job-scheduler").getSingleFile()
}
}
}
diff --git a/src/main/java/org/opensearch/ad/ADJobProcessor.java b/src/main/java/org/opensearch/ad/ADJobProcessor.java
new file mode 100644
index 000000000..7621f9fc9
--- /dev/null
+++ b/src/main/java/org/opensearch/ad/ADJobProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.ad;
+
+import org.opensearch.ad.indices.ADIndex;
+import org.opensearch.ad.indices.ADIndexManagement;
+import org.opensearch.ad.model.ADTask;
+import org.opensearch.ad.model.ADTaskType;
+import org.opensearch.ad.model.AnomalyResult;
+import org.opensearch.ad.settings.AnomalyDetectorSettings;
+import org.opensearch.ad.task.ADTaskCacheManager;
+import org.opensearch.ad.task.ADTaskManager;
+import org.opensearch.ad.transport.AnomalyResultAction;
+import org.opensearch.ad.transport.AnomalyResultRequest;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.timeseries.AnalysisType;
+import org.opensearch.timeseries.JobProcessor;
+import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
+import org.opensearch.timeseries.transport.ResultRequest;
+
+public class ADJobProcessor extends
+ JobProcessor {
+
+ private static ADJobProcessor INSTANCE;
+
+ public static ADJobProcessor getInstance() {
+ if (INSTANCE != null) {
+ return INSTANCE;
+ }
+ synchronized (JobProcessor.class) {
+ if (INSTANCE != null) {
+ return INSTANCE;
+ }
+ INSTANCE = new ADJobProcessor();
+ return INSTANCE;
+ }
+ }
+
+ private ADJobProcessor() {
+ // Singleton class, use getJobRunnerInstance method instead of constructor
+ super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE);
+ }
+
+ public void registerSettings(Settings settings) {
+ super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION);
+ }
+
+ @Override
+ protected ResultRequest createResultRequest(String configId, long start, long end) {
+ return new AnomalyResultRequest(configId, start, end);
+ }
+}
diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
index d9d5e3f7b..af5816497 100644
--- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
+++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
@@ -15,7 +15,6 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
-import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_FIND_CONFIG_MSG;
import java.util.List;
import java.util.Map;
@@ -35,10 +34,8 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
-import org.opensearch.ad.model.DetectorState;
import org.opensearch.ad.model.InitProgressProfile;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
@@ -49,9 +46,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
-import org.opensearch.ad.util.ExceptionUtil;
-import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
-import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -68,11 +62,19 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.timeseries.AnalysisType;
+import org.opensearch.timeseries.ProfileUtil;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
+import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
+import org.opensearch.timeseries.model.ConfigState;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
+import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
+import org.opensearch.timeseries.util.ExceptionUtil;
+import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
+import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
@@ -136,11 +138,11 @@ private void calculateTotalResponsesToWait(
listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
} else {
- listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, BAD_REQUEST));
+ listener.onFailure(new OpenSearchStatusException(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, BAD_REQUEST));
}
}, exception -> {
- logger.error(FAIL_TO_FIND_CONFIG_MSG + detectorId, exception);
- listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, INTERNAL_SERVER_ERROR));
+ logger.error(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, exception);
+ listener.onFailure(new OpenSearchStatusException(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, INTERNAL_SERVER_ERROR));
}));
}
@@ -159,7 +161,7 @@ private void prepareProfile(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
- AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
+ Job job = Job.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();
boolean isMultiEntityDetector = detector.isHighCardinality();
@@ -211,7 +213,7 @@ private void prepareProfile(
false
);
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
- adTaskManager.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
+ adTaskManager.getAndExecuteOnLatestConfigLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (adTask.isPresent()) {
long lastUpdateTimeMs = adTask.get().getLastUpdateTime().toEpochMilli();
@@ -315,6 +317,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener listener, Set profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
- profileBuilder.state(DetectorState.DISABLED);
+ profileBuilder.state(ConfigState.DISABLED);
}
if (profiles.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestHistoricalTaskProfile(detectorId, transportService, profileBuilder.build(), listener);
@@ -409,7 +413,7 @@ private void profileStateRelated(
} else {
DetectorProfile.Builder builder = new DetectorProfile.Builder();
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
- builder.state(DetectorState.DISABLED);
+ builder.state(ConfigState.DISABLED);
}
listener.onResponse(builder.build());
}
@@ -418,7 +422,7 @@ private void profileStateRelated(
private void profileModels(
AnomalyDetector detector,
Set profiles,
- AnomalyDetectorJob job,
+ Job job,
boolean forMultiEntityDetector,
MultiResponsesDelegateActionListener listener
) {
@@ -430,7 +434,7 @@ private void profileModels(
private ActionListener onModelResponse(
AnomalyDetector detector,
Set profilesToCollect,
- AnomalyDetectorJob job,
+ Job job,
MultiResponsesDelegateActionListener listener
) {
boolean isMultientityDetector = detector.isHighCardinality();
@@ -464,7 +468,7 @@ private ActionListener onModelResponse(
}
private void profileMultiEntityDetectorStateRelated(
- AnomalyDetectorJob job,
+ Job job,
Set profilesToCollect,
ProfileResponse profileResponse,
DetectorProfile.Builder profileBuilder,
@@ -478,10 +482,11 @@ private void profileMultiEntityDetectorStateRelated(
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
- .confirmDetectorRealtimeInitStatus(
+ .confirmRealtimeInitStatus(
detector,
enabledTime,
client,
+ AnalysisType.AD,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
@@ -490,7 +495,7 @@ private void profileMultiEntityDetectorStateRelated(
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
- profileBuilder.state(DetectorState.DISABLED);
+ profileBuilder.state(ConfigState.DISABLED);
}
listener.onResponse(profileBuilder.build());
}
@@ -577,7 +582,7 @@ private ActionListener onPollRCFUpdates(
private void createRunningStateAndInitProgress(Set profilesToCollect, DetectorProfile.Builder builder) {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
- builder.state(DetectorState.RUNNING).build();
+ builder.state(ConfigState.RUNNING).build();
}
if (profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
@@ -595,7 +600,7 @@ private void processInitResponse(
MultiResponsesDelegateActionListener listener
) {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
- builder.state(DetectorState.INIT);
+ builder.state(ConfigState.INIT);
}
if (profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
index 90b7d350f..9c6ce5cde 100644
--- a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
+++ b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
@@ -25,19 +25,19 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchSecurityException;
import org.opensearch.action.ActionListener;
-import org.opensearch.ad.constant.CommonValue;
-import org.opensearch.ad.feature.FeatureManager;
-import org.opensearch.ad.feature.Features;
-import org.opensearch.ad.ml.ModelManager;
+import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityAnomalyResult;
-import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.timeseries.constant.CommonValue;
+import org.opensearch.timeseries.feature.FeatureManager;
+import org.opensearch.timeseries.feature.Features;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;
+import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
/**
* Runner to trigger an anomaly detector.
@@ -45,11 +45,11 @@
public final class AnomalyDetectorRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorRunner.class);
- private final ModelManager modelManager;
+ private final ADModelManager modelManager;
private final FeatureManager featureManager;
private final int maxPreviewResults;
- public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureManager, int maxPreviewResults) {
+ public AnomalyDetectorRunner(ADModelManager modelManager, FeatureManager featureManager, int maxPreviewResults) {
this.modelManager = modelManager;
this.featureManager = featureManager;
this.maxPreviewResults = maxPreviewResults;
@@ -168,24 +168,24 @@ private List parsePreviewResult(
AnomalyResult result;
if (results != null && results.size() > i) {
- ThresholdingResult thresholdingResult = results.get(i);
- List resultsToSave = thresholdingResult
- .toIndexableResults(
- detector,
- Instant.ofEpochMilli(timeRange.getKey()),
- Instant.ofEpochMilli(timeRange.getValue()),
- null,
- null,
- featureDatas,
- Optional.ofNullable(entity),
- CommonValue.NO_SCHEMA_VERSION,
- null,
- null,
- null
+ anomalyResults
+ .addAll(
+ results
+ .get(i)
+ .toIndexableResults(
+ detector,
+ Instant.ofEpochMilli(timeRange.getKey()),
+ Instant.ofEpochMilli(timeRange.getValue()),
+ null,
+ null,
+ featureDatas,
+ Optional.ofNullable(entity),
+ CommonValue.NO_SCHEMA_VERSION,
+ null,
+ null,
+ null
+ )
);
- for (AnomalyResult r : resultsToSave) {
- anomalyResults.add(r);
- }
} else {
result = new AnomalyResult(
detector.getId(),
diff --git a/src/main/java/org/opensearch/ad/EntityProfileRunner.java b/src/main/java/org/opensearch/ad/EntityProfileRunner.java
index 491e8088f..479260e21 100644
--- a/src/main/java/org/opensearch/ad/EntityProfileRunner.java
+++ b/src/main/java/org/opensearch/ad/EntityProfileRunner.java
@@ -28,7 +28,6 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
@@ -38,8 +37,6 @@
import org.opensearch.ad.transport.EntityProfileAction;
import org.opensearch.ad.transport.EntityProfileRequest;
import org.opensearch.ad.transport.EntityProfileResponse;
-import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
-import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -53,11 +50,15 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
+import org.opensearch.timeseries.model.Job;
+import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.ParseUtils;
+import org.opensearch.timeseries.util.SecurityClientUtil;
public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
@@ -188,6 +189,7 @@ private void validateEntity(
client::search,
detector.getId(),
client,
+ AnalysisType.AD,
searchResponseListener
);
@@ -228,7 +230,7 @@ private void getJob(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
- AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
+ Job job = Job.parse(parser);
int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
@@ -331,7 +333,7 @@ private void profileStateRelated(
Entity entityValue,
Set profilesToCollect,
AnomalyDetector detector,
- AnomalyDetectorJob job,
+ Job job,
MultiResponsesDelegateActionListener delegateListener
) {
if (totalUpdates == 0) {
diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
index 4b05295ae..f16faac36 100644
--- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
+++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
@@ -11,371 +11,120 @@
package org.opensearch.ad;
-import static org.opensearch.ad.constant.ADCommonMessages.CAN_NOT_FIND_LATEST_TASK;
-
import java.time.Instant;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.update.UpdateResponse;
-import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
-import org.opensearch.ad.model.AnomalyDetector;
+import org.opensearch.ad.model.ADTask;
+import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyResult;
-import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
-import org.opensearch.ad.transport.AnomalyResultResponse;
-import org.opensearch.ad.transport.ProfileAction;
-import org.opensearch.ad.transport.ProfileRequest;
-import org.opensearch.ad.transport.RCFPollingAction;
-import org.opensearch.ad.transport.RCFPollingRequest;
-import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
-import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
-import org.opensearch.cluster.node.DiscoveryNode;
-import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
-import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
-import org.opensearch.timeseries.common.exception.EndRunException;
-import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
-import org.opensearch.timeseries.common.exception.TimeSeriesException;
+import org.opensearch.timeseries.AnalysisType;
+import org.opensearch.timeseries.ExecuteResultResponseRecorder;
+import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.model.FeatureData;
-import org.opensearch.timeseries.model.IntervalTimeConfiguration;
+import org.opensearch.timeseries.task.TaskCacheManager;
+import org.opensearch.timeseries.transport.ResultResponse;
+import org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
-public class ExecuteADResultResponseRecorder {
- private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
+public class ExecuteADResultResponseRecorder extends
+ ExecuteResultResponseRecorder {
- private ADIndexManagement anomalyDetectionIndices;
- private AnomalyIndexHandler anomalyResultHandler;
- private ADTaskManager adTaskManager;
- private DiscoveryNodeFilterer nodeFilter;
- private ThreadPool threadPool;
- private Client client;
- private NodeStateManager nodeStateManager;
- private ADTaskCacheManager adTaskCacheManager;
- private int rcfMinSamples;
+ private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
public ExecuteADResultResponseRecorder(
- ADIndexManagement anomalyDetectionIndices,
- AnomalyIndexHandler anomalyResultHandler,
- ADTaskManager adTaskManager,
+ ADIndexManagement indexManagement,
+ ResultBulkIndexingHandler resultHandler,
+ ADTaskManager taskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client,
NodeStateManager nodeStateManager,
- ADTaskCacheManager adTaskCacheManager,
+ TaskCacheManager taskCacheManager,
int rcfMinSamples
) {
- this.anomalyDetectionIndices = anomalyDetectionIndices;
- this.anomalyResultHandler = anomalyResultHandler;
- this.adTaskManager = adTaskManager;
- this.nodeFilter = nodeFilter;
- this.threadPool = threadPool;
- this.client = client;
- this.nodeStateManager = nodeStateManager;
- this.adTaskCacheManager = adTaskCacheManager;
- this.rcfMinSamples = rcfMinSamples;
+ super(
+ indexManagement,
+ resultHandler,
+ taskManager,
+ nodeFilter,
+ threadPool,
+ TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME,
+ client,
+ nodeStateManager,
+ taskCacheManager,
+ rcfMinSamples,
+ ADIndex.RESULT,
+ AnalysisType.AD
+ );
}
- public void indexAnomalyResult(
- Instant detectionStartTime,
- Instant executionStartTime,
- AnomalyResultResponse response,
- AnomalyDetector detector
+ @Override
+ protected AnomalyResult createErrorResult(
+ String configId,
+ Instant dataStartTime,
+ Instant dataEndTime,
+ Instant executeEndTime,
+ String errorMessage,
+ User user
) {
- String detectorId = detector.getId();
- try {
- // skipping writing to the result index if not necessary
- // For a single-entity detector, the result is not useful if error is null
- // and rcf score (thus anomaly grade/confidence) is null.
- // For a HCAD detector, we don't need to save on the detector level.
- // We return 0 or Double.NaN rcf score if there is no error.
- if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) {
- updateRealtimeTask(response, detectorId);
- return;
- }
- IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay();
- Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
- Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
- User user = detector.getUser();
-
- if (response.getError() != null) {
- log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError());
- }
-
- AnomalyResult anomalyResult = response
- .toAnomalyResult(
- detectorId,
- dataStartTime,
- dataEndTime,
- executionStartTime,
- Instant.now(),
- anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
- user,
- response.getError()
- );
-
- String resultIndex = detector.getCustomResultIndex();
- anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
- updateRealtimeTask(response, detectorId);
- } catch (EndRunException e) {
- throw e;
- } catch (Exception e) {
- log.error("Failed to index anomaly result for " + detectorId, e);
- }
+ return new AnomalyResult(
+ configId,
+ null, // no task id
+ new ArrayList(),
+ dataStartTime,
+ dataEndTime,
+ executeEndTime,
+ Instant.now(),
+ errorMessage,
+ Optional.empty(), // single-stream detectors have no entity
+ user,
+ indexManagement.getSchemaVersion(resultIndex),
+ null // no model id
+ );
}
/**
* Update real time task (one document per detector in state index). If the real-time task has no changes compared with local cache,
- * the task won't update. Task only updates when the state changed, or any error happened, or AD job stopped. Task is mainly consumed
- * by the front-end to track detector status. For single-stream detectors, we embed model total updates in AnomalyResultResponse and
- * update state accordingly. For HCAD, we won't wait for model finishing updating before returning a response to the job scheduler
+ * the task won't update. Task only updates when the state changed, or any error happened, or job stopped. Task is mainly consumed
+ * by the front-end to track analysis status. For single-stream analyses, we embed model total updates in ResultResponse and
+ * update state accordingly. For HC analysis, we won't wait for model finishing updating before returning a response to the job scheduler
* since it might be long before all entities finish execution. So we don't embed model total updates in AnomalyResultResponse.
* Instead, we issue a profile request to poll each model node and get the maximum total updates among all models.
* @param response response returned from executing AnomalyResultAction
- * @param detectorId Detector Id
+ * @param configId config Id
*/
- private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) {
- if (response.isHCDetector() != null && response.isHCDetector()) {
- if (adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) {
+ @Override
+ protected void updateRealtimeTask(ResultResponse response, String configId) {
+ if (response.isHC() != null && response.isHC()) {
+ if (taskManager.skipUpdateRealtimeTask(configId, response.getError())) {
return;
}
- DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
- Set profiles = new HashSet<>();
- profiles.add(DetectorProfileName.INIT_PROGRESS);
- ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes);
- Runnable profileHCInitProgress = () -> {
- client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> {
- log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates());
- updateLatestRealtimeTask(detectorId, null, r.getTotalUpdates(), response.getIntervalInMinutes(), response.getError());
- }, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); }));
- };
- if (!adTaskManager.isHCRealtimeTaskStartInitializing(detectorId)) {
- // real time init progress is 0 may mean this is a newly started detector
- // Delay real time cache update by one minute. If we are in init status, the delay may give the model training time to
- // finish. We can change the detector running immediately instead of waiting for the next interval.
- threadPool
- .schedule(profileHCInitProgress, new TimeValue(60, TimeUnit.SECONDS), TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME);
- } else {
- profileHCInitProgress.run();
- }
-
+ delayedUpdate(response, configId);
} else {
log
.debug(
"Update latest realtime task for single stream detector {}, total updates: {}",
- detectorId,
+ configId,
response.getRcfTotalUpdates()
);
- updateLatestRealtimeTask(detectorId, null, response.getRcfTotalUpdates(), response.getIntervalInMinutes(), response.getError());
- }
- }
-
- private void updateLatestRealtimeTask(
- String detectorId,
- String taskState,
- Long rcfTotalUpdates,
- Long detectorIntervalInMinutes,
- String error
- ) {
- // Don't need info as this will be printed repeatedly in each interval
- ActionListener listener = ActionListener.wrap(r -> {
- if (r != null) {
- log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
- }
- }, e -> {
- if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
- // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
- log.error("Can't find latest realtime task of detector " + detectorId);
- adTaskManager.removeRealtimeTaskCache(detectorId);
- } else {
- log.error("Failed to update latest realtime task for detector " + detectorId, e);
- }
- });
-
- // rcfTotalUpdates is null when we save exception messages
- if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) {
- // confirm the total updates number since it is possible that we have already had results after job enabling time
- // If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%.
- confirmTotalRCFUpdatesFound(
- detectorId,
- taskState,
- rcfTotalUpdates,
- detectorIntervalInMinutes,
- error,
- ActionListener
- .wrap(
- r -> adTaskManager
- .updateLatestRealtimeTaskOnCoordinatingNode(
- detectorId,
- taskState,
- r,
- detectorIntervalInMinutes,
- error,
- listener
- ),
- e -> {
- log.error("Fail to confirm rcf update", e);
- adTaskManager
- .updateLatestRealtimeTaskOnCoordinatingNode(
- detectorId,
- taskState,
- rcfTotalUpdates,
- detectorIntervalInMinutes,
- error,
- listener
- );
- }
- )
- );
- } else {
- adTaskManager
- .updateLatestRealtimeTaskOnCoordinatingNode(
- detectorId,
- taskState,
- rcfTotalUpdates,
- detectorIntervalInMinutes,
- error,
- listener
- );
- }
- }
-
- /**
- * The function is not only indexing the result with the exception, but also updating the task state after
- * 60s if the exception is related to cold start (index not found exceptions) for a single stream detector.
- *
- * @param detectionStartTime execution start time
- * @param executionStartTime execution end time
- * @param errorMessage Error message to record
- * @param taskState AD task state (e.g., stopped)
- * @param detector Detector config accessor
- */
- public void indexAnomalyResultException(
- Instant detectionStartTime,
- Instant executionStartTime,
- String errorMessage,
- String taskState,
- AnomalyDetector detector
- ) {
- String detectorId = detector.getId();
- try {
- IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay();
- Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
- Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
- User user = detector.getUser();
-
- AnomalyResult anomalyResult = new AnomalyResult(
- detectorId,
- null, // no task id
- new ArrayList(),
- dataStartTime,
- dataEndTime,
- executionStartTime,
- Instant.now(),
- errorMessage,
- Optional.empty(), // single-stream detectors have no entity
- user,
- anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
- null // no model id
+ updateLatestRealtimeTask(
+ configId,
+ null,
+ response.getRcfTotalUpdates(),
+ response.getConfigIntervalInMinutes(),
+ response.getError()
);
- String resultIndex = detector.getCustomResultIndex();
- if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) {
- // Set result index as null, will write exception to default result index.
- anomalyResultHandler.index(anomalyResult, detectorId, null);
- } else {
- anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
- }
-
- if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !detector.isHighCardinality()) {
- // single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG
- // when there is no checkpoint.
- // Delay real time cache update by one minute so we will have trained models by then and update the state
- // document accordingly.
- threadPool.schedule(() -> {
- RCFPollingRequest request = new RCFPollingRequest(detectorId);
- client.execute(RCFPollingAction.INSTANCE, request, ActionListener.wrap(rcfPollResponse -> {
- long totalUpdates = rcfPollResponse.getTotalUpdates();
- // if there are updates, don't record failures
- updateLatestRealtimeTask(
- detectorId,
- taskState,
- totalUpdates,
- detector.getIntervalInMinutes(),
- totalUpdates > 0 ? "" : errorMessage
- );
- }, e -> {
- log.error("Fail to execute RCFRollingAction", e);
- updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
- }));
- }, new TimeValue(60, TimeUnit.SECONDS), TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME);
- } else {
- updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
- }
-
- } catch (Exception e) {
- log.error("Failed to index anomaly result for " + detectorId, e);
}
}
-
- private void confirmTotalRCFUpdatesFound(
- String detectorId,
- String taskState,
- Long rcfTotalUpdates,
- Long detectorIntervalInMinutes,
- String error,
- ActionListener listener
- ) {
- nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
- if (!detectorOptional.isPresent()) {
- listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"));
- return;
- }
- nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
- if (!jobOptional.isPresent()) {
- listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"));
- return;
- }
-
- ProfileUtil
- .confirmDetectorRealtimeInitStatus(
- detectorOptional.get(),
- jobOptional.get().getEnabledTime().toEpochMilli(),
- client,
- ActionListener.wrap(searchResponse -> {
- ActionListener.completeWith(listener, () -> {
- SearchHits hits = searchResponse.getHits();
- Long correctedTotalUpdates = rcfTotalUpdates;
- if (hits.getTotalHits().value > 0L) {
- // correct the number if we have already had results after job enabling time
- // so that the detector won't stay initialized
- correctedTotalUpdates = Long.valueOf(rcfMinSamples);
- }
- adTaskCacheManager.markResultIndexQueried(detectorId);
- return correctedTotalUpdates;
- });
- }, exception -> {
- if (ExceptionUtil.isIndexNotAvailable(exception)) {
- // anomaly result index is not created yet
- adTaskCacheManager.markResultIndexQueried(detectorId);
- listener.onResponse(0L);
- } else {
- listener.onFailure(exception);
- }
- })
- );
- }, e -> listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"))));
- }, e -> listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"))));
- }
}
diff --git a/src/main/java/org/opensearch/ad/ProfileUtil.java b/src/main/java/org/opensearch/ad/ProfileUtil.java
deleted file mode 100644
index 8afd98dc3..000000000
--- a/src/main/java/org/opensearch/ad/ProfileUtil.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- *
- * Modifications Copyright OpenSearch Contributors. See
- * GitHub history for details.
- */
-
-package org.opensearch.ad;
-
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.ad.constant.ADCommonName;
-import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.AnomalyResult;
-import org.opensearch.client.Client;
-import org.opensearch.index.query.BoolQueryBuilder;
-import org.opensearch.index.query.ExistsQueryBuilder;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
-import org.opensearch.timeseries.constant.CommonName;
-
-public class ProfileUtil {
- /**
- * Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time.
- * Note this function is only meant to check for status of real time analysis.
- *
- * @param detectorId detector id
- * @param enabledTime the time when AD job is enabled in milliseconds
- * @return the search request
- */
- private static SearchRequest createRealtimeInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
- BoolQueryBuilder filterQuery = new BoolQueryBuilder();
- filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
- filterQuery.filter(QueryBuilders.rangeQuery(CommonName.EXECUTION_END_TIME_FIELD).gte(enabledTime));
- filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));
- // Historical analysis result also stored in result index, which has non-null task_id.
- // For realtime detection result, we should filter task_id == null
- ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(CommonName.TASK_ID_FIELD);
- filterQuery.mustNot(taskIdExistsFilter);
-
- SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);
-
- SearchRequest request = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);
- request.source(source);
- if (resultIndex != null) {
- request.indices(resultIndex);
- }
- return request;
- }
-
- public static void confirmDetectorRealtimeInitStatus(
- AnomalyDetector detector,
- long enabledTime,
- Client client,
- ActionListener listener
- ) {
- SearchRequest searchLatestResult = createRealtimeInittedEverRequest(detector.getId(), enabledTime, detector.getCustomResultIndex());
- client.search(searchLatestResult, listener);
- }
-}
diff --git a/src/main/java/org/opensearch/ad/caching/ADCacheBuffer.java b/src/main/java/org/opensearch/ad/caching/ADCacheBuffer.java
new file mode 100644
index 000000000..828146516
--- /dev/null
+++ b/src/main/java/org/opensearch/ad/caching/ADCacheBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ *
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+
+package org.opensearch.ad.caching;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import org.opensearch.ad.indices.ADIndex;
+import org.opensearch.ad.indices.ADIndexManagement;
+import org.opensearch.ad.ml.ADCheckpointDao;
+import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker;
+import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
+import org.opensearch.timeseries.MemoryTracker;
+import org.opensearch.timeseries.MemoryTracker.Origin;
+import org.opensearch.timeseries.caching.CacheBuffer;
+
+import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
+
+/**
+ * We use a layered cache to manage active entities’ states. We have a two-level
+ * cache that stores active entity states in each node. Each detector has its
+ * dedicated cache that stores ten (dynamically adjustable) entities’ states per
+ * node. A detector’s hottest entities load their states in the dedicated cache.
+ * If less than 10 entities use the dedicated cache, the secondary cache can use
+ * the rest of the free memory available to AD. The secondary cache is a shared
+ * memory among all detectors for the long tail. The shared cache size is 10%
+ * heap minus all of the dedicated cache consumed by single-entity and multi-entity
+ * detectors. The shared cache’s size shrinks as the dedicated cache is filled
+ * up or more detectors are started.
+ *
+ * Implementation-wise, both dedicated cache and shared cache are stored in items
+ * and minimumCapacity controls the boundary. If items size is equals to or less
+ * than minimumCapacity, consider items as dedicated cache; otherwise, consider
+ * top minimumCapacity active entities (last X entities in priorityList) as in dedicated
+ * cache and all others in shared cache.
+ */
+public class ADCacheBuffer extends
+ CacheBuffer {
+
+ public ADCacheBuffer(
+ int minimumCapacity,
+ Clock clock,
+ MemoryTracker memoryTracker,
+ int checkpointIntervalHrs,
+ Duration modelTtl,
+ long memoryConsumptionPerEntity,
+ ADCheckpointWriteWorker checkpointWriteQueue,
+ ADCheckpointMaintainWorker checkpointMaintainQueue,
+ String configId,
+ long intervalSecs
+ ) {
+ super(
+ minimumCapacity,
+ clock,
+ memoryTracker,
+ checkpointIntervalHrs,
+ modelTtl,
+ memoryConsumptionPerEntity,
+ checkpointWriteQueue,
+ checkpointMaintainQueue,
+ configId,
+ intervalSecs,
+ Origin.REAL_TIME_DETECTOR
+ );
+ }
+}
diff --git a/src/main/java/org/opensearch/ad/caching/ADCacheProvider.java b/src/main/java/org/opensearch/ad/caching/ADCacheProvider.java
new file mode 100644
index 000000000..e71c89962
--- /dev/null
+++ b/src/main/java/org/opensearch/ad/caching/ADCacheProvider.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.ad.caching;
+
+import org.opensearch.timeseries.caching.CacheProvider;
+
+import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
+
+/**
+ * Allows Guice dependency based on types. Otherwise, Guice cannot
+ * decide which instance to inject based on generic types of CacheProvider
+ *
+ */
+public class ADCacheProvider extends CacheProvider {
+
+}
diff --git a/src/main/java/org/opensearch/ad/caching/ADPriorityCache.java b/src/main/java/org/opensearch/ad/caching/ADPriorityCache.java
new file mode 100644
index 000000000..95decc8e8
--- /dev/null
+++ b/src/main/java/org/opensearch/ad/caching/ADPriorityCache.java
@@ -0,0 +1,130 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ *
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+
+package org.opensearch.ad.caching;
+
+import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_DEDICATED_CACHE_SIZE;
+import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import org.opensearch.ad.indices.ADIndex;
+import org.opensearch.ad.indices.ADIndexManagement;
+import org.opensearch.ad.ml.ADCheckpointDao;
+import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker;
+import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
+import org.opensearch.ad.settings.ADEnabledSetting;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.timeseries.MemoryTracker;
+import org.opensearch.timeseries.MemoryTracker.Origin;
+import org.opensearch.timeseries.caching.PriorityCache;
+import org.opensearch.timeseries.ml.ModelManager;
+import org.opensearch.timeseries.ml.ModelState;
+import org.opensearch.timeseries.model.Config;
+
+import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
+
+public class ADPriorityCache extends
+ PriorityCache {
+ private ADCheckpointWriteWorker checkpointWriteQueue;
+ private ADCheckpointMaintainWorker checkpointMaintainQueue;
+
+ public ADPriorityCache(
+ ADCheckpointDao checkpointDao,
+ int hcDedicatedCacheSize,
+ Setting checkpointTtl,
+ int maxInactiveStates,
+ MemoryTracker memoryTracker,
+ int numberOfTrees,
+ Clock clock,
+ ClusterService clusterService,
+ Duration modelTtl,
+ ThreadPool threadPool,
+ String threadPoolName,
+ int maintenanceFreqConstant,
+ Settings settings,
+ Setting checkpointSavingFreq,
+ ADCheckpointWriteWorker checkpointWriteQueue,
+ ADCheckpointMaintainWorker checkpointMaintainQueue
+ ) {
+ super(
+ checkpointDao,
+ hcDedicatedCacheSize,
+ checkpointTtl,
+ maxInactiveStates,
+ memoryTracker,
+ numberOfTrees,
+ clock,
+ clusterService,
+ modelTtl,
+ threadPool,
+ threadPoolName,
+ maintenanceFreqConstant,
+ settings,
+ checkpointSavingFreq,
+ Origin.REAL_TIME_DETECTOR,
+ AD_DEDICATED_CACHE_SIZE,
+ AD_MODEL_MAX_SIZE_PERCENTAGE
+ );
+
+ this.checkpointWriteQueue = checkpointWriteQueue;
+ this.checkpointMaintainQueue = checkpointMaintainQueue;
+ }
+
+ @Override
+ protected ADCacheBuffer createEmptyCacheBuffer(Config detector, long memoryConsumptionPerEntity) {
+ return new ADCacheBuffer(
+ detector.isHighCardinality() ? hcDedicatedCacheSize : 1,
+ clock,
+ memoryTracker,
+ checkpointIntervalHrs,
+ modelTtl,
+ memoryConsumptionPerEntity,
+ checkpointWriteQueue,
+ checkpointMaintainQueue,
+ detector.getId(),
+ detector.getIntervalInSeconds()
+ );
+ }
+
+ @Override
+ protected Callable> createInactiveEntityCacheLoader(String modelId, String detectorId) {
+ return new Callable>() {
+ @Override
+ public ModelState call() {
+ return new ModelState<>(
+ null,
+ modelId,
+ detectorId,
+ ModelManager.ModelType.TRCF.getName(),
+ clock,
+ 0,
+ null,
+ Optional.empty(),
+ new ArrayDeque<>()
+ );
+ }
+ };
+ }
+
+ @Override
+ protected boolean isDoorKeeperInCacheEnabled() {
+ return ADEnabledSetting.isDoorKeeperInCacheEnabled();
+ }
+}
diff --git a/src/main/java/org/opensearch/ad/caching/CacheProvider.java b/src/main/java/org/opensearch/ad/caching/CacheProvider.java
deleted file mode 100644
index ab8fd191c..000000000
--- a/src/main/java/org/opensearch/ad/caching/CacheProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- *
- * Modifications Copyright OpenSearch Contributors. See
- * GitHub history for details.
- */
-
-package org.opensearch.ad.caching;
-
-import org.opensearch.common.inject.Provider;
-
-/**
- * A wrapper to call concrete implementation of caching. Used in transport
- * action. Don't use interface because transport action handler constructor
- * requires a concrete class as input.
- *
- */
-public class CacheProvider implements Provider {
- private EntityCache cache;
-
- public CacheProvider() {
-
- }
-
- @Override
- public EntityCache get() {
- return cache;
- }
-
- public void set(EntityCache cache) {
- this.cache = cache;
- }
-}
diff --git a/src/main/java/org/opensearch/ad/caching/EntityCache.java b/src/main/java/org/opensearch/ad/caching/EntityCache.java
deleted file mode 100644
index 0a6a303d6..000000000
--- a/src/main/java/org/opensearch/ad/caching/EntityCache.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- *
- * Modifications Copyright OpenSearch Contributors. See
- * GitHub history for details.
- */
-
-package org.opensearch.ad.caching;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.opensearch.ad.CleanState;
-import org.opensearch.ad.DetectorModelSize;
-import org.opensearch.ad.MaintenanceState;
-import org.opensearch.ad.ml.EntityModel;
-import org.opensearch.ad.ml.ModelState;
-import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.ModelProfile;
-import org.opensearch.timeseries.model.Entity;
-
-public interface EntityCache extends MaintenanceState, CleanState, DetectorModelSize {
- /**
- * Get the ModelState associated with the entity. May or may not load the
- * ModelState depending on the underlying cache's eviction policy.
- *
- * @param modelId Model Id
- * @param detector Detector config object
- * @return the ModelState associated with the model or null if no cached item
- * for the entity
- */
- ModelState get(String modelId, AnomalyDetector detector);
-
- /**
- * Get the number of active entities of a detector
- * @param detector Detector Id
- * @return The number of active entities
- */
- int getActiveEntities(String detector);
-
- /**
- *
- * @return total active entities in the cache
- */
- int getTotalActiveEntities();
-
- /**
- * Whether an entity is active or not
- * @param detectorId The Id of the detector that an entity belongs to
- * @param entityModelId Entity model Id
- * @return Whether an entity is active or not
- */
- boolean isActive(String detectorId, String entityModelId);
-
- /**
- * Get total updates of detector's most active entity's RCF model.
- *
- * @param detectorId detector id
- * @return RCF model total updates of most active entity.
- */
- long getTotalUpdates(String detectorId);
-
- /**
- * Get RCF model total updates of specific entity
- *
- * @param detectorId detector id
- * @param entityModelId entity model id
- * @return RCF model total updates of specific entity.
- */
- long getTotalUpdates(String detectorId, String entityModelId);
-
- /**
- * Gets modelStates of all model hosted on a node
- *
- * @return list of modelStates
- */
- List> getAllModels();
-
- /**
- * Return when the last active time of an entity's state.
- *
- * If the entity's state is active in the cache, the value indicates when the cache
- * is lastly accessed (get/put). If the entity's state is inactive in the cache,
- * the value indicates when the cache state is created or when the entity is evicted
- * from active entity cache.
- *
- * @param detectorId The Id of the detector that an entity belongs to
- * @param entityModelId Entity's Model Id
- * @return if the entity is in the cache, return the timestamp in epoch
- * milliseconds when the entity's state is lastly used. Otherwise, return -1.
- */
- long getLastActiveMs(String detectorId, String entityModelId);
-
- /**
- * Release memory when memory circuit breaker is open
- */
- void releaseMemoryForOpenCircuitBreaker();
-
- /**
- * Select candidate entities for which we can load models
- * @param cacheMissEntities Cache miss entities
- * @param detectorId Detector Id
- * @param detector Detector object
- * @return A list of entities that are admitted into the cache as a result of the
- * update and the left-over entities
- */
- Pair, List> selectUpdateCandidate(
- Collection cacheMissEntities,
- String detectorId,
- AnomalyDetector detector
- );
-
- /**
- *
- * @param detector Detector config
- * @param toUpdate Model state candidate
- * @return if we can host the given model state
- */
- boolean hostIfPossible(AnomalyDetector detector, ModelState toUpdate);
-
- /**
- *
- * @param detectorId Detector Id
- * @return a detector's model information
- */
- List getAllModelProfile(String detectorId);
-
- /**
- * Gets an entity's model sizes
- *
- * @param detectorId Detector Id
- * @param entityModelId Entity's model Id
- * @return the entity's memory size
- */
- Optional getModelProfile(String detectorId, String entityModelId);
-
- /**
- * Get a model state without incurring priority update. Used in maintenance.
- * @param detectorId Detector Id
- * @param modelId Model Id
- * @return Model state
- */
- Optional> getForMaintainance(String detectorId, String modelId);
-
- /**
- * Remove entity model from active entity buffer and delete checkpoint. Used to clean corrupted model.
- * @param detectorId Detector Id
- * @param entityModelId Model Id
- */
- void removeEntityModel(String detectorId, String entityModelId);
-}
diff --git a/src/main/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanup.java
index 325361aec..3f8fe461e 100644
--- a/src/main/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanup.java
+++ b/src/main/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanup.java
@@ -22,7 +22,6 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.IndicesOptions;
-import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
@@ -30,6 +29,7 @@
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.index.store.StoreStats;
+import org.opensearch.timeseries.util.ClientUtil;
/**
* Clean up the old docs for indices.
diff --git a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java
index e20dc8fd1..091a24bd1 100644
--- a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java
+++ b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java
@@ -43,7 +43,6 @@ public class ADCommonMessages {
public static String EXCEED_HISTORICAL_ANALYSIS_LIMIT = "Exceed max historical analysis limit per node";
public static String NO_ELIGIBLE_NODE_TO_RUN_DETECTOR = "No eligible node to run detector ";
public static String EMPTY_STALE_RUNNING_ENTITIES = "Empty stale running entities";
- public static String CAN_NOT_FIND_LATEST_TASK = "can't find latest task";
public static String NO_ENTITY_FOUND = "No entity found";
public static String HISTORICAL_ANALYSIS_CANCELLED = "Historical analysis cancelled by user";
public static String HC_DETECTOR_TASK_IS_UPDATING = "HC detector task is updating";
diff --git a/src/main/java/org/opensearch/ad/constant/ADCommonName.java b/src/main/java/org/opensearch/ad/constant/ADCommonName.java
index 3a97db889..55a2a58be 100644
--- a/src/main/java/org/opensearch/ad/constant/ADCommonName.java
+++ b/src/main/java/org/opensearch/ad/constant/ADCommonName.java
@@ -59,7 +59,6 @@ public class ADCommonName {
public static final String MODELS = "models";
public static final String MODEL = "model";
public static final String INIT_PROGRESS = "init_progress";
- public static final String CATEGORICAL_FIELD = "category_field";
public static final String TOTAL_ENTITIES = "total_entities";
public static final String ACTIVE_ENTITIES = "active_entities";
public static final String ENTITY_INFO = "entity_info";
@@ -87,11 +86,8 @@ public class ADCommonName {
public static final String CONFIDENCE_JSON_KEY = "confidence";
public static final String ANOMALY_GRADE_JSON_KEY = "anomalyGrade";
public static final String QUEUE_JSON_KEY = "queue";
- // ======================================
- // Used for backward-compatibility in messaging
- // ======================================
- public static final String EMPTY_FIELD = "";
+ // ======================================
// Validation
// ======================================
// detector validation aspect
diff --git a/src/main/java/org/opensearch/ad/constant/CommonValue.java b/src/main/java/org/opensearch/ad/constant/ADCommonValue.java
similarity index 81%
rename from src/main/java/org/opensearch/ad/constant/CommonValue.java
rename to src/main/java/org/opensearch/ad/constant/ADCommonValue.java
index f5d5b15eb..91b9f72f7 100644
--- a/src/main/java/org/opensearch/ad/constant/CommonValue.java
+++ b/src/main/java/org/opensearch/ad/constant/ADCommonValue.java
@@ -11,9 +11,7 @@
package org.opensearch.ad.constant;
-public class CommonValue {
- // unknown or no schema version
- public static Integer NO_SCHEMA_VERSION = 0;
+public class ADCommonValue {
public static String INTERNAL_ACTION_PREFIX = "cluster:admin/opendistro/adinternal/";
public static String EXTERNAL_ACTION_PREFIX = "cluster:admin/opendistro/ad/";
}
diff --git a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java b/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java
similarity index 60%
rename from src/main/java/org/opensearch/ad/ml/CheckpointDao.java
rename to src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java
index 738acd197..a261cc979 100644
--- a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java
+++ b/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java
@@ -11,64 +11,52 @@
package org.opensearch.ad.ml;
+import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
+
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Clock;
-import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
+import java.util.Deque;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.opensearch.ExceptionsHelper;
-import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
-import org.opensearch.action.bulk.BulkAction;
-import org.opensearch.action.bulk.BulkItemResponse;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.get.MultiGetAction;
-import org.opensearch.action.get.MultiGetRequest;
-import org.opensearch.action.get.MultiGetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.IndicesOptions;
-import org.opensearch.action.update.UpdateRequest;
-import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
-import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.Client;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.core.xcontent.DeprecationHandler;
+import org.opensearch.core.xcontent.NamedXContentRegistry;
+import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.MatchQueryBuilder;
-import org.opensearch.index.reindex.BulkByScrollResponse;
-import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
-import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
-import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
+import org.opensearch.timeseries.ml.CheckpointDao;
+import org.opensearch.timeseries.ml.ModelManager;
+import org.opensearch.timeseries.ml.ModelState;
+import org.opensearch.timeseries.ml.Sample;
+import org.opensearch.timeseries.ml.SingleStreamModelIdMapper;
import org.opensearch.timeseries.model.Entity;
+import org.opensearch.timeseries.util.ClientUtil;
import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
@@ -89,29 +77,18 @@
/**
* DAO for model checkpoints.
*/
-public class CheckpointDao {
-
- private static final Logger logger = LogManager.getLogger(CheckpointDao.class);
- static final String TIMEOUT_LOG_MSG = "Timeout while deleting checkpoints of";
- static final String BULK_FAILURE_LOG_MSG = "Bulk failure while deleting checkpoints of";
- static final String SEARCH_FAILURE_LOG_MSG = "Search failure while deleting checkpoints of";
- static final String DOC_GOT_DELETED_LOG_MSG = "checkpoints docs get deleted";
- static final String INDEX_DELETED_LOG_MSG = "Checkpoint index has been deleted. Has nothing to do:";
- static final String NOT_ABLE_TO_DELETE_LOG_MSG = "Cannot delete all checkpoints of detector";
+public class ADCheckpointDao extends CheckpointDao {
+ private static final Logger logger = LogManager.getLogger(ADCheckpointDao.class);
+ // ======================================
+ // Model serialization/deserialization
+ // ======================================
public static final String ENTITY_RCF = "rcf";
public static final String ENTITY_THRESHOLD = "th";
public static final String ENTITY_TRCF = "trcf";
public static final String FIELD_MODELV2 = "modelV2";
public static final String DETECTOR_ID = "detectorId";
- // dependencies
- private final Client client;
- private final ClientUtil clientUtil;
-
- // configuration
- private final String indexName;
-
private Gson gson;
private RandomCutForestMapper mapper;
@@ -130,11 +107,7 @@ public class CheckpointDao {
private final ADIndexManagement indexUtil;
private final JsonParser parser = new JsonParser();
- // we won't read/write a checkpoint larger than a threshold
- private final int maxCheckpointBytes;
- private final GenericObjectPool serializeRCFBufferPool;
- private final int serializeRCFBufferSize;
// anomaly rate
private double anomalyRate;
@@ -156,10 +129,9 @@ public class CheckpointDao {
* @param serializeRCFBufferSize the size of the buffer for RCF serialization
* @param anomalyRate anomaly rate
*/
- public CheckpointDao(
+ public ADCheckpointDao(
Client client,
ClientUtil clientUtil,
- String indexName,
Gson gson,
RandomCutForestMapper mapper,
V1JsonToV3StateConverter converter,
@@ -170,36 +142,29 @@ public CheckpointDao(
int maxCheckpointBytes,
GenericObjectPool serializeRCFBufferPool,
int serializeRCFBufferSize,
- double anomalyRate
+ double anomalyRate,
+ Clock clock
) {
- this.client = client;
- this.clientUtil = clientUtil;
- this.indexName = indexName;
- this.gson = gson;
+ super(
+ client,
+ clientUtil,
+ ADCommonName.CHECKPOINT_INDEX_NAME,
+ gson,
+ maxCheckpointBytes,
+ serializeRCFBufferPool,
+ serializeRCFBufferSize,
+ indexUtil,
+ clock
+ );
this.mapper = mapper;
this.converter = converter;
this.trcfMapper = trcfMapper;
this.trcfSchema = trcfSchema;
this.thresholdingModelClass = thresholdingModelClass;
this.indexUtil = indexUtil;
- this.maxCheckpointBytes = maxCheckpointBytes;
- this.serializeRCFBufferPool = serializeRCFBufferPool;
- this.serializeRCFBufferSize = serializeRCFBufferSize;
this.anomalyRate = anomalyRate;
}
- private void saveModelCheckpointSync(Map source, String modelId) {
- clientUtil.timedRequest(new IndexRequest(indexName).id(modelId).source(source), logger, client::index);
- }
-
- private void putModelCheckpoint(String modelId, Map source, ActionListener listener) {
- if (indexUtil.doesCheckpointIndexExist()) {
- saveModelCheckpointAsync(source, modelId, listener);
- } else {
- onCheckpointNotExist(source, modelId, true, listener);
- }
- }
-
/**
* Puts a rcf model checkpoint in the storage.
*
@@ -234,66 +199,25 @@ public void putThresholdCheckpoint(String modelId, ThresholdingModel threshold,
putModelCheckpoint(modelId, source, listener);
}
- private void onCheckpointNotExist(Map source, String modelId, boolean isAsync, ActionListener listener) {
- indexUtil.initCheckpointIndex(ActionListener.wrap(initResponse -> {
- if (initResponse.isAcknowledged()) {
- if (isAsync) {
- saveModelCheckpointAsync(source, modelId, listener);
- } else {
- saveModelCheckpointSync(source, modelId);
- }
- } else {
- throw new RuntimeException("Creating checkpoint with mappings call not acknowledged.");
- }
- }, exception -> {
- if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
- // It is possible the index has been created while we sending the create request
- if (isAsync) {
- saveModelCheckpointAsync(source, modelId, listener);
- } else {
- saveModelCheckpointSync(source, modelId);
- }
- } else {
- logger.error(String.format(Locale.ROOT, "Unexpected error creating index %s", indexName), exception);
- }
- }));
- }
-
- /**
- * Update the model doc using fields in source. This ensures we won't touch
- * the old checkpoint and nodes with old/new logic can coexist in a cluster.
- * This is useful for introducing compact rcf new model format.
- *
- * @param source fields to update
- * @param modelId model Id, used as doc id in the checkpoint index
- * @param listener Listener to return response
- */
- private void saveModelCheckpointAsync(Map source, String modelId, ActionListener listener) {
-
- UpdateRequest updateRequest = new UpdateRequest(indexName, modelId);
- updateRequest.doc(source);
- // If the document does not already exist, the contents of the upsert element are inserted as a new document.
- // If the document exists, update fields in the map
- updateRequest.docAsUpsert(true);
- clientUtil
- .asyncRequest(
- updateRequest,
- client::update,
- ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)
- );
- }
-
/**
* Prepare for index request using the contents of the given model state
* @param modelState an entity model state
* @return serialized JSON map or empty map if the state is too bloated
* @throws IOException when serialization fails
*/
- public Map toIndexSource(ModelState modelState) throws IOException {
+ @Override
+ public Map toIndexSource(ModelState modelState) throws IOException {
String modelId = modelState.getModelId();
Map source = new HashMap<>();
- EntityModel model = modelState.getModel();
- Optional serializedModel = toCheckpoint(model, modelId);
+
+ Object model = modelState.getModel();
+ if (modelState.getEntity().isEmpty()) {
+ throw new IllegalArgumentException("Excpect model state to be an entity model");
+ }
+
+ ThresholdedRandomCutForest entityModel = (ThresholdedRandomCutForest) model;
+
+ Optional serializedModel = toCheckpoint(entityModel, modelId);
if (!serializedModel.isPresent() || serializedModel.get().length() > maxCheckpointBytes) {
logger
.warn(
@@ -305,13 +229,25 @@ public Map toIndexSource(ModelState modelState) thr
);
return source;
}
- String detectorId = modelState.getId();
+ source.put(FIELD_MODELV2, serializedModel.get());
+
+ if (modelState.getSamples() != null && !(modelState.getSamples().isEmpty())) {
+ source.put(CommonName.ENTITY_SAMPLE_QUEUE, toCheckpoint(modelState.getSamples()).get());
+ }
+
+ // if there are no samples and no model, no need to index as other information are meta data
+ if (!source.containsKey(CommonName.ENTITY_SAMPLE_QUEUE) && !source.containsKey(FIELD_MODELV2)) {
+ return source;
+ }
+
+ String detectorId = modelState.getConfigId();
source.put(DETECTOR_ID, detectorId);
// we cannot pass Optional as OpenSearch does not know how to serialize an Optional value
- source.put(FIELD_MODELV2, serializedModel.get());
+
source.put(CommonName.TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
- source.put(CommonName.SCHEMA_VERSION_FIELD, indexUtil.getSchemaVersion(ADIndex.CHECKPOINT));
- Optional entity = model.getEntity();
+ source.put(org.opensearch.timeseries.constant.CommonName.SCHEMA_VERSION_FIELD, indexUtil.getSchemaVersion(ADIndex.CHECKPOINT));
+
+ Optional entity = modelState.getEntity();
if (entity.isPresent()) {
source.put(CommonName.ENTITY_KEY, entity.get());
}
@@ -325,7 +261,7 @@ public Map toIndexSource(ModelState modelState) thr
* @param modelId model id
* @return serialized string
*/
- public Optional toCheckpoint(EntityModel model, String modelId) {
+ public Optional toCheckpoint(ThresholdedRandomCutForest model, String modelId) {
return AccessController.doPrivileged((PrivilegedAction>) () -> {
if (model == null) {
logger.warn("Empty model");
@@ -333,11 +269,8 @@ public Optional toCheckpoint(EntityModel model, String modelId) {
}
try {
JsonObject json = new JsonObject();
- if (model.getSamples() != null && !(model.getSamples().isEmpty())) {
- json.add(CommonName.ENTITY_SAMPLE, gson.toJsonTree(model.getSamples()));
- }
- if (model.getTrcf().isPresent()) {
- json.addProperty(ENTITY_TRCF, toCheckpoint(model.getTrcf().get()));
+ if (model != null) {
+ json.addProperty(ENTITY_TRCF, toCheckpoint(model));
}
// if json is empty, it will be an empty Json string {}. No need to save it on disk.
return json.entrySet().isEmpty() ? Optional.empty() : Optional.ofNullable(gson.toJson(json));
@@ -382,21 +315,6 @@ private String toCheckpoint(ThresholdedRandomCutForest trcf) {
return checkpoint;
}
- private Map.Entry checkoutOrNewBuffer() {
- LinkedBuffer buffer = null;
- boolean isCheckout = true;
- try {
- buffer = serializeRCFBufferPool.borrowObject();
- } catch (Exception e) {
- logger.warn("Failed to borrow a buffer from pool", e);
- }
- if (buffer == null) {
- buffer = LinkedBuffer.allocate(serializeRCFBufferSize);
- isCheckout = false;
- }
- return new SimpleImmutableEntry(buffer, isCheckout);
- }
-
private String toCheckpoint(ThresholdedRandomCutForest trcf, LinkedBuffer buffer) {
try {
byte[] bytes = AccessController.doPrivileged((PrivilegedAction) () -> {
@@ -409,73 +327,6 @@ private String toCheckpoint(ThresholdedRandomCutForest trcf, LinkedBuffer buffer
}
}
- /**
- * Deletes the model checkpoint for the model.
- *
- * @param modelId id of the model
- * @param listener onReponse is called with null when the operation is completed
- */
- public void deleteModelCheckpoint(String modelId, ActionListener listener) {
- clientUtil
- .asyncRequest(
- new DeleteRequest(indexName, modelId),
- client::delete,
- ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)
- );
- }
-
- /**
- * Delete checkpoints associated with a detector. Used in multi-entity detector.
- * @param detectorID Detector Id
- */
- public void deleteModelCheckpointByDetectorId(String detectorID) {
- // A bulk delete request is performed for each batch of matching documents. If a
- // search or bulk request is rejected, the requests are retried up to 10 times,
- // with exponential back off. If the maximum retry limit is reached, processing
- // halts and all failed requests are returned in the response. Any delete
- // requests that completed successfully still stick, they are not rolled back.
- DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(ADCommonName.CHECKPOINT_INDEX_NAME)
- .setQuery(new MatchQueryBuilder(DETECTOR_ID, detectorID))
- .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
- .setAbortOnVersionConflict(false) // when current delete happens, previous might not finish.
- // Retry in this case
- .setRequestsPerSecond(500); // throttle delete requests
- logger.info("Delete checkpoints of detector {}", detectorID);
- client.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> {
- if (response.isTimedOut() || !response.getBulkFailures().isEmpty() || !response.getSearchFailures().isEmpty()) {
- logFailure(response, detectorID);
- }
- // can return 0 docs get deleted because:
- // 1) we cannot find matching docs
- // 2) bad stats from OpenSearch. In this case, docs are deleted, but
- // OpenSearch says deleted is 0.
- logger.info("{} " + DOC_GOT_DELETED_LOG_MSG, response.getDeleted());
- }, exception -> {
- if (exception instanceof IndexNotFoundException) {
- logger.info(INDEX_DELETED_LOG_MSG + " {}", detectorID);
- } else {
- // Gonna eventually delete in daily cron.
- logger.error(NOT_ABLE_TO_DELETE_LOG_MSG, exception);
- }
- }));
- }
-
- private void logFailure(BulkByScrollResponse response, String detectorID) {
- if (response.isTimedOut()) {
- logger.warn(TIMEOUT_LOG_MSG + " {}", detectorID);
- } else if (!response.getBulkFailures().isEmpty()) {
- logger.warn(BULK_FAILURE_LOG_MSG + " {}", detectorID);
- for (BulkItemResponse.Failure bulkFailure : response.getBulkFailures()) {
- logger.warn(bulkFailure);
- }
- } else {
- logger.warn(SEARCH_FAILURE_LOG_MSG + " {}", detectorID);
- for (ScrollableHitSource.SearchFailure searchFailure : response.getSearchFailures()) {
- logger.warn(searchFailure);
- }
- }
- }
-
/**
* Load json checkpoint into models
*
@@ -484,9 +335,14 @@ private void logFailure(BulkByScrollResponse response, String detectorID) {
* @return a pair of entity model and its last checkpoint time; or empty if
* the raw checkpoint is too large
*/
- public Optional> fromEntityModelCheckpoint(Map checkpoint, String modelId) {
+ @Override
+ protected ModelState fromEntityModelCheckpoint(
+ Map checkpoint,
+ String modelId,
+ String configId
+ ) {
try {
- return AccessController.doPrivileged((PrivilegedAction>>) () -> {
+ return AccessController.doPrivileged((PrivilegedAction>) () -> {
Object modelObj = checkpoint.get(FIELD_MODELV2);
if (modelObj == null) {
// in case there is old -format checkpoint
@@ -494,24 +350,14 @@ public Optional> fromEntityModelCheckpoint(Map maxCheckpointBytes) {
logger.warn(new ParameterizedMessage("[{}]'s model too large: [{}] bytes", modelId, model.length()));
- return Optional.empty();
+ return null;
}
JsonObject json = parser.parse(model).getAsJsonObject();
- ArrayDeque samples = null;
- if (json.has(CommonName.ENTITY_SAMPLE)) {
- // verified, don't need privileged call to get permission
- samples = new ArrayDeque<>(
- Arrays.asList(this.gson.fromJson(json.getAsJsonArray(CommonName.ENTITY_SAMPLE), new double[0][0].getClass()))
- );
- } else {
- // avoid possible null pointer exception
- samples = new ArrayDeque<>();
- }
ThresholdedRandomCutForest trcf = null;
if (json.has(ENTITY_TRCF)) {
@@ -540,6 +386,25 @@ public Optional> fromEntityModelCheckpoint(Map sampleQueue = new ArrayDeque<>();
+ Object samples = checkpoint.get(CommonName.ENTITY_SAMPLE_QUEUE);
+ if (samples != null) {
+ try (
+ XContentParser sampleParser = JsonXContent.jsonXContent
+ .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, (String) samples)
+ ) {
+ ensureExpectedToken(XContentParser.Token.START_ARRAY, sampleParser.currentToken(), sampleParser);
+ while (sampleParser.nextToken() != XContentParser.Token.END_ARRAY) {
+ sampleQueue.add(Sample.parse(sampleParser));
+ }
+ } catch (Exception e) {
+ logger.warn("Exception while deserializing samples for " + modelId, e);
+ // checkpoint corrupted (e.g., a checkpoint not recognized by current code
+ // due to bugs). Better redo training.
+ return null;
+ }
+ }
+
String lastCheckpointTimeString = (String) (checkpoint.get(CommonName.TIMESTAMP));
Instant timestamp = Instant.parse(lastCheckpointTimeString);
Entity entity = null;
@@ -551,14 +416,27 @@ public Optional> fromEntityModelCheckpoint(Map(entityModel, timestamp));
+
+ ModelState modelState = new ModelState(
+ trcf,
+ modelId,
+ configId,
+ ModelManager.ModelType.TRCF.getName(),
+ clock,
+ 0,
+ // TODO: track last processed sample in AD
+ new Sample(),
+ Optional.ofNullable(entity),
+ sampleQueue
+ );
+ modelState.setLastCheckpointTime(timestamp);
+ return modelState;
});
} catch (Exception e) {
logger.warn("Exception while deserializing checkpoint " + modelId, e);
// checkpoint corrupted (e.g., a checkpoint not recognized by current code
// due to bugs). Better redo training.
- return Optional.empty();
+ return null;
}
}
@@ -634,33 +512,14 @@ private void deserializeTRCFModel(
}
}
- /**
- * Read a checkpoint from the index and return the EntityModel object
- * @param modelId Model Id
- * @param listener Listener to return a pair of entity model and its last checkpoint time
- */
- public void deserializeModelCheckpoint(String modelId, ActionListener>> listener) {
- clientUtil
- .asyncRequest(
- new GetRequest(indexName, modelId),
- client::get,
- ActionListener.wrap(response -> { listener.onResponse(processGetResponse(response, modelId)); }, listener::onFailure)
- );
- }
-
- /**
- * Process a checkpoint GetResponse and return the EntityModel object
- * @param response Checkpoint Index GetResponse
- * @param modelId Model Id
- * @return a pair of entity model and its last checkpoint time
- */
- public Optional> processGetResponse(GetResponse response, String modelId) {
- Optional
*
*/
-public abstract class AbstractAnomalyDetectorActionHandler {
- public static final String EXCEEDED_MAX_MULTI_ENTITY_DETECTORS_PREFIX_MSG = "Can't create more than %d multi-entity anomaly detectors.";
- public static final String EXCEEDED_MAX_SINGLE_ENTITY_DETECTORS_PREFIX_MSG =
- "Can't create more than %d single-entity anomaly detectors.";
+public abstract class AbstractAnomalyDetectorActionHandler extends
+ AbstractTimeSeriesActionHandler {
+ protected final Logger logger = LogManager.getLogger(AbstractAnomalyDetectorActionHandler.class);
+
+ public static final String EXCEEDED_MAX_HC_DETECTORS_PREFIX_MSG = "Can't create more than %d HC anomaly detectors.";
+ public static final String EXCEEDED_MAX_SINGLE_STREAM_DETECTORS_PREFIX_MSG =
+ "Can't create more than %d single-stream anomaly detectors.";
public static final String NO_DOCS_IN_USER_INDEX_MSG = "Can't create anomaly detector as no document is found in the indices: ";
- public static final String ONLY_ONE_CATEGORICAL_FIELD_ERR_MSG = "We can have only one categorical field.";
- public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "A categorical field must be of type keyword or ip.";
- public static final String CATEGORY_NOT_FOUND_ERR_MSG = "Can't find the categorical field %s";
public static final String DUPLICATE_DETECTOR_MSG = "Cannot create anomaly detector with name [%s] as it's already used by detector %s";
- public static final String NAME_REGEX = "[a-zA-Z0-9._-]+";
- public static final Integer MAX_DETECTOR_NAME_SIZE = 64;
- private static final Set DEFAULT_VALIDATION_ASPECTS = Sets.newHashSet(ValidationAspect.DETECTOR);
-
- public static String INVALID_NAME_SIZE = "Name should be shortened. The maximum limit is " + MAX_DETECTOR_NAME_SIZE + " characters.";
-
- protected final ADIndexManagement anomalyDetectionIndices;
- protected final String detectorId;
- protected final Long seqNo;
- protected final Long primaryTerm;
- protected final WriteRequest.RefreshPolicy refreshPolicy;
- protected final AnomalyDetector anomalyDetector;
- protected final ClusterService clusterService;
+ public static final String VALIDATION_FEATURE_FAILURE = "Validation failed for feature(s) of detector %s";
- protected final Logger logger = LogManager.getLogger(AbstractAnomalyDetectorActionHandler.class);
- protected final TimeValue requestTimeout;
- protected final Integer maxSingleEntityAnomalyDetectors;
- protected final Integer maxMultiEntityAnomalyDetectors;
- protected final Integer maxAnomalyFeatures;
- protected final AnomalyDetectorActionHandler handler = new AnomalyDetectorActionHandler();
- protected final RestRequest.Method method;
- protected final Client client;
- protected final SecurityClientUtil clientUtil;
- protected final TransportService transportService;
- protected final NamedXContentRegistry xContentRegistry;
- protected final ActionListener listener;
- protected final User user;
- protected final ADTaskManager adTaskManager;
- protected final SearchFeatureDao searchFeatureDao;
- protected final boolean isDryRun;
+ protected final Integer maxSingleStreamDetectors;
+ protected final Integer maxHCAnomalyDetectors;
+ protected final TaskManager adTaskManager;
protected final Clock clock;
- protected final String validationType;
protected final Settings settings;
/**
@@ -165,7 +116,6 @@ public abstract class AbstractAnomalyDetectorActionHandler listener,
ADIndexManagement anomalyDetectionIndices,
String detectorId,
Long seqNo,
@@ -199,748 +149,164 @@ public AbstractAnomalyDetectorActionHandler(
WriteRequest.RefreshPolicy refreshPolicy,
AnomalyDetector anomalyDetector,
TimeValue requestTimeout,
- Integer maxSingleEntityAnomalyDetectors,
- Integer maxMultiEntityAnomalyDetectors,
- Integer maxAnomalyFeatures,
+ Integer maxSingleStreamAnomalyDetectors,
+ Integer maxHCAnomalyDetectors,
+ Integer maxFeatures,
+ Integer maxCategoricalFields,
RestRequest.Method method,
NamedXContentRegistry xContentRegistry,
User user,
- ADTaskManager adTaskManager,
+ TaskManager adTaskManager,
SearchFeatureDao searchFeatureDao,
String validationType,
boolean isDryRun,
Clock clock,
Settings settings
) {
- this.clusterService = clusterService;
- this.client = client;
- this.clientUtil = clientUtil;
- this.transportService = transportService;
- this.anomalyDetectionIndices = anomalyDetectionIndices;
- this.listener = listener;
- this.detectorId = detectorId;
- this.seqNo = seqNo;
- this.primaryTerm = primaryTerm;
- this.refreshPolicy = refreshPolicy;
- this.anomalyDetector = anomalyDetector;
- this.requestTimeout = requestTimeout;
- this.maxSingleEntityAnomalyDetectors = maxSingleEntityAnomalyDetectors;
- this.maxMultiEntityAnomalyDetectors = maxMultiEntityAnomalyDetectors;
- this.maxAnomalyFeatures = maxAnomalyFeatures;
- this.method = method;
- this.xContentRegistry = xContentRegistry;
- this.user = user;
+ super(
+ anomalyDetector,
+ anomalyDetectionIndices,
+ isDryRun,
+ client,
+ detectorId,
+ clientUtil,
+ user,
+ method,
+ clusterService,
+ xContentRegistry,
+ transportService,
+ requestTimeout,
+ refreshPolicy,
+ seqNo,
+ primaryTerm,
+ validationType,
+ searchFeatureDao,
+ maxFeatures,
+ maxCategoricalFields,
+ AnalysisType.AD
+ );
+ this.maxSingleStreamDetectors = maxSingleStreamAnomalyDetectors;
+ this.maxHCAnomalyDetectors = maxHCAnomalyDetectors;
this.adTaskManager = adTaskManager;
- this.searchFeatureDao = searchFeatureDao;
- this.validationType = validationType;
- this.isDryRun = isDryRun;
this.clock = clock;
this.settings = settings;
}
- /**
- * Start function to process create/update/validate anomaly detector request.
- * If detector is not using custom result index, check if anomaly detector
- * index exist first, if not, will create first. Otherwise, check if custom
- * result index exists or not. If exists, will check if index mapping matches
- * AD result index mapping and if user has correct permission to write index.
- * If doesn't exist, will create custom result index with AD result index
- * mapping.
- */
- public void start() {
- String resultIndex = anomalyDetector.getCustomResultIndex();
- // use default detector result index which is system index
- if (resultIndex == null) {
- createOrUpdateDetector();
- return;
- }
-
- if (this.isDryRun) {
- if (anomalyDetectionIndices.doesIndexExist(resultIndex)) {
- anomalyDetectionIndices
- .validateCustomResultIndexAndExecute(
- resultIndex,
- () -> createOrUpdateDetector(),
- ActionListener.wrap(r -> createOrUpdateDetector(), ex -> {
- logger.error(ex);
- listener
- .onFailure(
- new ValidationException(ex.getMessage(), ValidationIssueType.RESULT_INDEX, ValidationAspect.DETECTOR)
- );
- return;
- })
- );
- return;
- } else {
- createOrUpdateDetector();
- return;
- }
- }
- // use custom result index if not validating and resultIndex not null
- anomalyDetectionIndices.initCustomResultIndexAndExecute(resultIndex, () -> createOrUpdateDetector(), listener);
- }
-
- // if isDryRun is true then this method is being executed through Validation API meaning actual
- // index won't be created, only validation checks will be executed throughout the class
- private void createOrUpdateDetector() {
- try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
- if (!anomalyDetectionIndices.doesConfigIndexExist() && !this.isDryRun) {
- logger.info("AnomalyDetector Indices do not exist");
- anomalyDetectionIndices
- .initConfigIndex(
- ActionListener
- .wrap(response -> onCreateMappingsResponse(response, false), exception -> listener.onFailure(exception))
- );
- } else {
- logger.info("AnomalyDetector Indices do exist, calling prepareAnomalyDetectorIndexing");
- logger.info("DryRun variable " + this.isDryRun);
- validateDetectorName(this.isDryRun);
- }
- } catch (Exception e) {
- logger.error("Failed to create or update detector " + detectorId, e);
- listener.onFailure(e);
- }
+ @Override
+ protected TimeSeriesException createValidationException(String msg, ValidationIssueType type) {
+ return new ValidationException(msg, type, ValidationAspect.DETECTOR);
}
- // These validation checks are executed here and not in AnomalyDetector.parse()
- // in order to not break any past detectors that were made with invalid names
- // because it was never check on the backend in the past
- protected void validateDetectorName(boolean indexingDryRun) {
- if (!anomalyDetector.getName().matches(NAME_REGEX)) {
- listener.onFailure(new ValidationException(CommonMessages.INVALID_NAME, ValidationIssueType.NAME, ValidationAspect.DETECTOR));
- return;
-
- }
- if (anomalyDetector.getName().length() > MAX_DETECTOR_NAME_SIZE) {
- listener.onFailure(new ValidationException(INVALID_NAME_SIZE, ValidationIssueType.NAME, ValidationAspect.DETECTOR));
- return;
- }
- validateTimeField(indexingDryRun);
+ @Override
+ protected AnomalyDetector parse(XContentParser parser, GetResponse response) throws IOException {
+ return AnomalyDetector.parse(parser, response.getId(), response.getVersion());
}
- protected void validateTimeField(boolean indexingDryRun) {
- String givenTimeField = anomalyDetector.getTimeField();
- GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest();
- getMappingsRequest.indices(anomalyDetector.getIndices().toArray(new String[0])).fields(givenTimeField);
- getMappingsRequest.indicesOptions(IndicesOptions.strictExpand());
-
- // comments explaining fieldMappingResponse parsing can be found inside following method:
- // AbstractAnomalyDetectorActionHandler.validateCategoricalField(String, boolean)
- ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> {
- boolean foundField = false;
- Map> mappingsByIndex = getMappingsResponse.mappings();
-
- for (Map mappingsByField : mappingsByIndex.values()) {
- for (Map.Entry field2Metadata : mappingsByField.entrySet()) {
-
- GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue();
- if (fieldMetadata != null) {
- // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field
- Map fieldMap = fieldMetadata.sourceAsMap();
- if (fieldMap != null) {
- for (Object type : fieldMap.values()) {
- if (type instanceof Map) {
- foundField = true;
- Map metadataMap = (Map) type;
- String typeName = (String) metadataMap.get(CommonName.TYPE);
- if (!typeName.equals(CommonName.DATE_TYPE)) {
- listener
- .onFailure(
- new ValidationException(
- String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField),
- ValidationIssueType.TIMEFIELD_FIELD,
- ValidationAspect.DETECTOR
- )
- );
- return;
- }
- }
- }
- }
- }
- }
- }
- if (!foundField) {
- listener
- .onFailure(
- new ValidationException(
- String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField),
- ValidationIssueType.TIMEFIELD_FIELD,
- ValidationAspect.DETECTOR
- )
- );
- return;
- }
- prepareAnomalyDetectorIndexing(indexingDryRun);
- }, error -> {
- String message = String.format(Locale.ROOT, "Fail to get the index mapping of %s", anomalyDetector.getIndices());
- logger.error(message, error);
- listener.onFailure(new IllegalArgumentException(message));
- });
- clientUtil.executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, mappingsListener);
- }
-
- /**
- * Prepare for indexing a new anomaly detector.
- * @param indexingDryRun if this is dryrun for indexing; when validation, it is true; when create/update, it is false
- */
- protected void prepareAnomalyDetectorIndexing(boolean indexingDryRun) {
- if (method == RestRequest.Method.PUT) {
- handler
- .getDetectorJob(
- clusterService,
- client,
- detectorId,
- listener,
- () -> updateAnomalyDetector(detectorId, indexingDryRun),
- xContentRegistry
- );
- } else {
- createAnomalyDetector(indexingDryRun);
- }
- }
-
- protected void updateAnomalyDetector(String detectorId, boolean indexingDryRun) {
- GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, detectorId);
- client
- .get(
- request,
- ActionListener
- .wrap(
- response -> onGetAnomalyDetectorResponse(response, indexingDryRun, detectorId),
- exception -> listener.onFailure(exception)
- )
- );
- }
-
- private void onGetAnomalyDetectorResponse(GetResponse response, boolean indexingDryRun, String detectorId) {
- if (!response.isExists()) {
- listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, RestStatus.NOT_FOUND));
- return;
- }
- try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
- ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
- AnomalyDetector existingDetector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());
- // If detector category field changed, frontend may not be able to render AD result for different detector types correctly.
- // For example, if detector changed from HC to single entity detector, AD result page may show multiple anomaly
- // result points on the same time point if there are multiple entities have anomaly results.
- // If single-category HC changed category field from IP to error type, the AD result page may show both IP and error type
- // in top N entities list. That's confusing.
- // So we decide to block updating detector category field.
- if (!listEqualsWithoutConsideringOrder(existingDetector.getCategoryFields(), anomalyDetector.getCategoryFields())) {
- listener.onFailure(new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_CATEGORY_FIELD, RestStatus.BAD_REQUEST));
- return;
- }
- if (!Objects.equals(existingDetector.getCustomResultIndex(), anomalyDetector.getCustomResultIndex())) {
- listener
- .onFailure(new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX, RestStatus.BAD_REQUEST));
- return;
- }
-
- adTaskManager.getAndExecuteOnLatestDetectorLevelTask(detectorId, HISTORICAL_DETECTOR_TASK_TYPES, (adTask) -> {
- if (adTask.isPresent() && !adTask.get().isDone()) {
- // can't update detector if there is AD task running
- listener.onFailure(new OpenSearchStatusException("Detector is running", RestStatus.INTERNAL_SERVER_ERROR));
- } else {
- validateExistingDetector(existingDetector, indexingDryRun);
- }
- }, transportService, true, listener);
- } catch (IOException e) {
- String message = "Failed to parse anomaly detector " + detectorId;
- logger.error(message, e);
- listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR));
- }
-
- }
-
- protected void validateExistingDetector(AnomalyDetector existingDetector, boolean indexingDryRun) {
- if (!hasCategoryField(existingDetector) && hasCategoryField(this.anomalyDetector)) {
- validateAgainstExistingMultiEntityAnomalyDetector(detectorId, indexingDryRun);
- } else {
- validateCategoricalField(detectorId, indexingDryRun);
- }
- }
-
- protected boolean hasCategoryField(AnomalyDetector detector) {
- return detector.getCategoryFields() != null && !detector.getCategoryFields().isEmpty();
- }
-
- protected void validateAgainstExistingMultiEntityAnomalyDetector(String detectorId, boolean indexingDryRun) {
- if (anomalyDetectionIndices.doesConfigIndexExist()) {
- QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.existsQuery(AnomalyDetector.CATEGORY_FIELD));
-
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeout);
-
- SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder);
- client
- .search(
- searchRequest,
- ActionListener
- .wrap(
- response -> onSearchMultiEntityAdResponse(response, detectorId, indexingDryRun),
- exception -> listener.onFailure(exception)
- )
- );
- } else {
- validateCategoricalField(detectorId, indexingDryRun);
- }
-
- }
-
- protected void createAnomalyDetector(boolean indexingDryRun) {
- try {
- List categoricalFields = anomalyDetector.getCategoryFields();
- if (categoricalFields != null && categoricalFields.size() > 0) {
- validateAgainstExistingMultiEntityAnomalyDetector(null, indexingDryRun);
+ @Override
+ protected void confirmHistoricalRunning(String id, ActionListener listener) {
+ adTaskManager.getAndExecuteOnLatestConfigLevelTask(id, HISTORICAL_DETECTOR_TASK_TYPES, (adTask) -> {
+ if (adTask.isPresent() && !adTask.get().isDone()) {
+ // can't update detector if there is AD task running
+ listener.onFailure(new OpenSearchStatusException("Detector is running", RestStatus.INTERNAL_SERVER_ERROR));
} else {
- if (anomalyDetectionIndices.doesConfigIndexExist()) {
- QueryBuilder query = QueryBuilders.matchAllQuery();
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeout);
-
- SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder);
-
- client
- .search(
- searchRequest,
- ActionListener
- .wrap(
- response -> onSearchSingleEntityAdResponse(response, indexingDryRun),
- exception -> listener.onFailure(exception)
- )
- );
- } else {
- searchAdInputIndices(null, indexingDryRun);
- }
-
+ listener.onResponse(null);
}
- } catch (Exception e) {
- listener.onFailure(e);
- }
+ }, transportService, true, listener);
}
- protected void onSearchSingleEntityAdResponse(SearchResponse response, boolean indexingDryRun) throws IOException {
- if (response.getHits().getTotalHits().value >= maxSingleEntityAnomalyDetectors) {
- String errorMsgSingleEntity = String
- .format(Locale.ROOT, EXCEEDED_MAX_SINGLE_ENTITY_DETECTORS_PREFIX_MSG, maxSingleEntityAnomalyDetectors);
- logger.error(errorMsgSingleEntity);
- if (indexingDryRun) {
- listener
- .onFailure(
- new ValidationException(errorMsgSingleEntity, ValidationIssueType.GENERAL_SETTINGS, ValidationAspect.DETECTOR)
- );
- return;
- }
- listener.onFailure(new IllegalArgumentException(errorMsgSingleEntity));
- } else {
- searchAdInputIndices(null, indexingDryRun);
- }
+ @Override
+ protected String getExceedMaxSingleStreamConfigsErrorMsg(int maxSingleStreamConfigs) {
+ return String.format(Locale.ROOT, EXCEEDED_MAX_SINGLE_STREAM_DETECTORS_PREFIX_MSG, getMaxSingleStreamConfigs());
}
- protected void onSearchMultiEntityAdResponse(SearchResponse response, String detectorId, boolean indexingDryRun) throws IOException {
- if (response.getHits().getTotalHits().value >= maxMultiEntityAnomalyDetectors) {
- String errorMsg = String.format(Locale.ROOT, EXCEEDED_MAX_MULTI_ENTITY_DETECTORS_PREFIX_MSG, maxMultiEntityAnomalyDetectors);
- logger.error(errorMsg);
- if (indexingDryRun) {
- listener.onFailure(new ValidationException(errorMsg, ValidationIssueType.GENERAL_SETTINGS, ValidationAspect.DETECTOR));
- return;
- }
- listener.onFailure(new IllegalArgumentException(errorMsg));
- } else {
- validateCategoricalField(detectorId, indexingDryRun);
- }
+ @Override
+ protected String getExceedMaxHCConfigsErrorMsg(int maxHCConfigs) {
+ return String.format(Locale.ROOT, EXCEEDED_MAX_HC_DETECTORS_PREFIX_MSG, getMaxHCConfigs());
}
- @SuppressWarnings("unchecked")
- protected void validateCategoricalField(String detectorId, boolean indexingDryRun) {
- List categoryField = anomalyDetector.getCategoryFields();
-
- if (categoryField == null) {
- searchAdInputIndices(detectorId, indexingDryRun);
- return;
- }
-
- // we only support a certain number of categorical field
- // If there is more fields than required, AnomalyDetector's constructor
- // throws ADValidationException before reaching this line
- int maxCategoryFields = ADNumericSetting.maxCategoricalFields();
- if (categoryField.size() > maxCategoryFields) {
- listener
- .onFailure(
- new ValidationException(
- CommonMessages.getTooManyCategoricalFieldErr(maxCategoryFields),
- ValidationIssueType.CATEGORY,
- ValidationAspect.DETECTOR
- )
- );
- return;
- }
-
- String categoryField0 = categoryField.get(0);
-
- GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest();
- getMappingsRequest.indices(anomalyDetector.getIndices().toArray(new String[0])).fields(categoryField.toArray(new String[0]));
- getMappingsRequest.indicesOptions(IndicesOptions.strictExpand());
-
- ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> {
- // example getMappingsResponse:
- // GetFieldMappingsResponse{mappings={server-metrics={_doc={service=FieldMappingMetadata{fullName='service',
- // source=org.opensearch.core.common.bytes.BytesArray@7ba87dbd}}}}}
- // for nested field, it would be
- // GetFieldMappingsResponse{mappings={server-metrics={_doc={host_nest.host2=FieldMappingMetadata{fullName='host_nest.host2',
- // source=org.opensearch.core.common.bytes.BytesArray@8fb4de08}}}}}
- boolean foundField = false;
-
- // Review why the change from FieldMappingMetadata to GetFieldMappingsResponse.FieldMappingMetadata
- Map> mappingsByIndex = getMappingsResponse.mappings();
-
- for (Map mappingsByField : mappingsByIndex.values()) {
- for (Map.Entry field2Metadata : mappingsByField.entrySet()) {
- // example output:
- // host_nest.host2=FieldMappingMetadata{fullName='host_nest.host2',
- // source=org.opensearch.core.common.bytes.BytesArray@8fb4de08}
-
- // Review why the change from FieldMappingMetadata to GetFieldMappingsResponse.FieldMappingMetadata
-
- GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue();
-
- if (fieldMetadata != null) {
- // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field
- Map fieldMap = fieldMetadata.sourceAsMap();
- if (fieldMap != null) {
- for (Object type : fieldMap.values()) {
- if (type != null && type instanceof Map) {
- foundField = true;
- Map metadataMap = (Map) type;
- String typeName = (String) metadataMap.get(CommonName.TYPE);
- if (!typeName.equals(CommonName.KEYWORD_TYPE) && !typeName.equals(CommonName.IP_TYPE)) {
- listener
- .onFailure(
- new ValidationException(
- CATEGORICAL_FIELD_TYPE_ERR_MSG,
- ValidationIssueType.CATEGORY,
- ValidationAspect.DETECTOR
- )
- );
- return;
- }
- }
- }
- }
-
- }
- }
- }
-
- if (foundField == false) {
- listener
- .onFailure(
- new ValidationException(
- String.format(Locale.ROOT, CATEGORY_NOT_FOUND_ERR_MSG, categoryField0),
- ValidationIssueType.CATEGORY,
- ValidationAspect.DETECTOR
- )
- );
- return;
- }
-
- searchAdInputIndices(detectorId, indexingDryRun);
- }, error -> {
- String message = String.format(Locale.ROOT, "Fail to get the index mapping of %s", anomalyDetector.getIndices());
- logger.error(message, error);
- listener.onFailure(new IllegalArgumentException(message));
- });
-
- clientUtil.executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, mappingsListener);
+ @Override
+ protected String getNoDocsInUserIndexErrorMsg(String suppliedIndices) {
+ return String.format(Locale.ROOT, NO_DOCS_IN_USER_INDEX_MSG, suppliedIndices);
}
- protected void searchAdInputIndices(String detectorId, boolean indexingDryRun) {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(0)
- .timeout(requestTimeout);
-
- SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0])).source(searchSourceBuilder);
-
- ActionListener searchResponseListener = ActionListener
- .wrap(
- searchResponse -> onSearchAdInputIndicesResponse(searchResponse, detectorId, indexingDryRun),
- exception -> listener.onFailure(exception)
- );
-
- clientUtil.asyncRequestWithInjectedSecurity(searchRequest, client::search, user, client, searchResponseListener);
+ @Override
+ protected String getDuplicateConfigErrorMsg(String name, List otherConfigIds) {
+ return String.format(Locale.ROOT, DUPLICATE_DETECTOR_MSG, name, otherConfigIds);
}
- protected void onSearchAdInputIndicesResponse(SearchResponse response, String detectorId, boolean indexingDryRun) throws IOException {
- if (response.getHits().getTotalHits().value == 0) {
- String errorMsg = NO_DOCS_IN_USER_INDEX_MSG + Arrays.toString(anomalyDetector.getIndices().toArray(new String[0]));
- logger.error(errorMsg);
- if (indexingDryRun) {
- listener.onFailure(new ValidationException(errorMsg, ValidationIssueType.INDICES, ValidationAspect.DETECTOR));
- return;
- }
- listener.onFailure(new IllegalArgumentException(errorMsg));
- } else {
- validateAnomalyDetectorFeatures(detectorId, indexingDryRun);
- }
+ @Override
+ protected AnomalyDetector copyConfig(User user, Config config) {
+ return new AnomalyDetector(
+ config.getId(),
+ config.getVersion(),
+ config.getName(),
+ config.getDescription(),
+ config.getTimeField(),
+ config.getIndices(),
+ config.getFeatureAttributes(),
+ config.getFilterQuery(),
+ config.getInterval(),
+ config.getWindowDelay(),
+ config.getShingleSize(),
+ config.getUiMetadata(),
+ config.getSchemaVersion(),
+ Instant.now(),
+ config.getCategoryFields(),
+ user,
+ config.getCustomResultIndex(),
+ config.getImputationOption()
+ );
}
- protected void checkADNameExists(String detectorId, boolean indexingDryRun) throws IOException {
- if (anomalyDetectionIndices.doesConfigIndexExist()) {
- BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
- // src/main/resources/mappings/anomaly-detectors.json#L14
- boolQueryBuilder.must(QueryBuilders.termQuery("name.keyword", anomalyDetector.getName()));
- if (StringUtils.isNotBlank(detectorId)) {
- boolQueryBuilder.mustNot(QueryBuilders.termQuery(RestHandlerUtils._ID, detectorId));
- }
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(boolQueryBuilder).timeout(requestTimeout);
- SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder);
- client
- .search(
- searchRequest,
- ActionListener
- .wrap(
- searchResponse -> onSearchADNameResponse(searchResponse, detectorId, anomalyDetector.getName(), indexingDryRun),
- exception -> listener.onFailure(exception)
- )
- );
- } else {
- tryIndexingAnomalyDetector(indexingDryRun);
- }
-
+ @SuppressWarnings("unchecked")
+ @Override
+ protected T createIndexConfigResponse(IndexResponse indexResponse, Config config) {
+ return (T) new IndexAnomalyDetectorResponse(
+ indexResponse.getId(),
+ indexResponse.getVersion(),
+ indexResponse.getSeqNo(),
+ indexResponse.getPrimaryTerm(),
+ (AnomalyDetector) config,
+ RestStatus.CREATED
+ );
}
- protected void onSearchADNameResponse(SearchResponse response, String detectorId, String name, boolean indexingDryRun)
- throws IOException {
- if (response.getHits().getTotalHits().value > 0) {
- String errorMsg = String
- .format(
- Locale.ROOT,
- DUPLICATE_DETECTOR_MSG,
- name,
- Arrays.stream(response.getHits().getHits()).map(hit -> hit.getId()).collect(Collectors.toList())
- );
- logger.warn(errorMsg);
- listener.onFailure(new ValidationException(errorMsg, ValidationIssueType.NAME, ValidationAspect.DETECTOR));
- } else {
- tryIndexingAnomalyDetector(indexingDryRun);
- }
+ @Override
+ protected Set getDefaultValidationType() {
+ return Sets.newHashSet(ValidationAspect.DETECTOR);
}
- protected void tryIndexingAnomalyDetector(boolean indexingDryRun) throws IOException {
- if (!indexingDryRun) {
- indexAnomalyDetector(detectorId);
- } else {
- finishDetectorValidationOrContinueToModelValidation();
- }
+ @Override
+ protected Integer getMaxSingleStreamConfigs() {
+ return maxSingleStreamDetectors;
}
- protected Set getValidationTypes(String validationType) {
- if (StringUtils.isBlank(validationType)) {
- return DEFAULT_VALIDATION_ASPECTS;
- } else {
- Set typesInRequest = new HashSet<>(Arrays.asList(validationType.split(",")));
- return ValidationAspect
- .getNames(Sets.intersection(RestValidateAnomalyDetectorAction.ALL_VALIDATION_ASPECTS_STRS, typesInRequest));
- }
+ @Override
+ protected Integer getMaxHCConfigs() {
+ return maxHCAnomalyDetectors;
}
- protected void finishDetectorValidationOrContinueToModelValidation() {
- logger.info("Skipping indexing detector. No blocking issue found so far.");
- if (!getValidationTypes(validationType).contains(ValidationAspect.MODEL)) {
- listener.onResponse(null);
- } else {
- ModelValidationActionHandler modelValidationActionHandler = new ModelValidationActionHandler(
- clusterService,
- client,
- clientUtil,
- (ActionListener) listener,
- anomalyDetector,
- requestTimeout,
- xContentRegistry,
- searchFeatureDao,
- validationType,
- clock,
- settings,
- user
- );
- modelValidationActionHandler.checkIfMultiEntityDetector();
- }
+ @Override
+ protected String getFeatureErrorMsg(String name) {
+ return String.format(Locale.ROOT, VALIDATION_FEATURE_FAILURE, name);
}
- @SuppressWarnings("unchecked")
- protected void indexAnomalyDetector(String detectorId) throws IOException {
- AnomalyDetector detector = new AnomalyDetector(
- anomalyDetector.getId(),
- anomalyDetector.getVersion(),
- anomalyDetector.getName(),
- anomalyDetector.getDescription(),
- anomalyDetector.getTimeField(),
- anomalyDetector.getIndices(),
- anomalyDetector.getFeatureAttributes(),
- anomalyDetector.getFilterQuery(),
- anomalyDetector.getInterval(),
- anomalyDetector.getWindowDelay(),
- anomalyDetector.getShingleSize(),
- anomalyDetector.getUiMetadata(),
- anomalyDetector.getSchemaVersion(),
- Instant.now(),
- anomalyDetector.getCategoryFields(),
- user,
- anomalyDetector.getCustomResultIndex(),
- anomalyDetector.getImputationOption()
+ @Override
+ protected void validateModel(ActionListener listener) {
+ ModelValidationActionHandler modelValidationActionHandler = new ModelValidationActionHandler(
+ clusterService,
+ client,
+ clientUtil,
+ (ActionListener) listener,
+ (AnomalyDetector) config,
+ requestTimeout,
+ xContentRegistry,
+ searchFeatureDao,
+ validationType,
+ clock,
+ settings,
+ user
);
- IndexRequest indexRequest = new IndexRequest(CommonName.CONFIG_INDEX)
- .setRefreshPolicy(refreshPolicy)
- .source(detector.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE))
- .setIfSeqNo(seqNo)
- .setIfPrimaryTerm(primaryTerm)
- .timeout(requestTimeout);
- if (StringUtils.isNotBlank(detectorId)) {
- indexRequest.id(detectorId);
- }
-
- client.index(indexRequest, new ActionListener() {
- @Override
- public void onResponse(IndexResponse indexResponse) {
- String errorMsg = checkShardsFailure(indexResponse);
- if (errorMsg != null) {
- listener.onFailure(new OpenSearchStatusException(errorMsg, indexResponse.status()));
- return;
- }
- listener
- .onResponse(
- (T) new IndexAnomalyDetectorResponse(
- indexResponse.getId(),
- indexResponse.getVersion(),
- indexResponse.getSeqNo(),
- indexResponse.getPrimaryTerm(),
- detector,
- RestStatus.CREATED
- )
- );
- }
-
- @Override
- public void onFailure(Exception e) {
- logger.warn("Failed to update detector", e);
- if (e.getMessage() != null && e.getMessage().contains("version conflict")) {
- listener
- .onFailure(
- new IllegalArgumentException("There was a problem updating the historical detector:[" + detectorId + "]")
- );
- } else {
- listener.onFailure(e);
- }
- }
- });
- }
-
- protected void onCreateMappingsResponse(CreateIndexResponse response, boolean indexingDryRun) throws IOException {
- if (response.isAcknowledged()) {
- logger.info("Created {} with mappings.", CommonName.CONFIG_INDEX);
- prepareAnomalyDetectorIndexing(indexingDryRun);
- } else {
- logger.warn("Created {} with mappings call not acknowledged.", CommonName.CONFIG_INDEX);
- listener
- .onFailure(
- new OpenSearchStatusException(
- "Created " + CommonName.CONFIG_INDEX + "with mappings call not acknowledged.",
- RestStatus.INTERNAL_SERVER_ERROR
- )
- );
- }
- }
-
- protected String checkShardsFailure(IndexResponse response) {
- StringBuilder failureReasons = new StringBuilder();
- if (response.getShardInfo().getFailed() > 0) {
- for (ReplicationResponse.ShardInfo.Failure failure : response.getShardInfo().getFailures()) {
- failureReasons.append(failure);
- }
- return failureReasons.toString();
- }
- return null;
- }
-
- /**
- * Validate config/syntax, and runtime error of detector features
- * @param detectorId detector id
- * @param indexingDryRun if false, then will eventually index detector; true, skip indexing detector
- * @throws IOException when fail to parse feature aggregation
- */
- // TODO: move this method to util class so that it can be re-usable for more use cases
- // https://github.com/opensearch-project/anomaly-detection/issues/39
- protected void validateAnomalyDetectorFeatures(String detectorId, boolean indexingDryRun) throws IOException {
- if (anomalyDetector != null
- && (anomalyDetector.getFeatureAttributes() == null || anomalyDetector.getFeatureAttributes().isEmpty())) {
- checkADNameExists(detectorId, indexingDryRun);
- return;
- }
- // checking configuration/syntax error of detector features
- String error = RestHandlerUtils.checkFeaturesSyntax(anomalyDetector, maxAnomalyFeatures);
- if (StringUtils.isNotBlank(error)) {
- if (indexingDryRun) {
- listener.onFailure(new ValidationException(error, ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.DETECTOR));
- return;
- }
- listener.onFailure(new OpenSearchStatusException(error, RestStatus.BAD_REQUEST));
- return;
- }
- // checking runtime error from feature query
- ActionListener>> validateFeatureQueriesListener = ActionListener
- .wrap(
- response -> { checkADNameExists(detectorId, indexingDryRun); },
- exception -> {
- listener
- .onFailure(
- new ValidationException(
- exception.getMessage(),
- ValidationIssueType.FEATURE_ATTRIBUTES,
- ValidationAspect.DETECTOR
- )
- );
- }
- );
- MultiResponsesDelegateActionListener>> multiFeatureQueriesResponseListener =
- new MultiResponsesDelegateActionListener>>(
- validateFeatureQueriesListener,
- anomalyDetector.getFeatureAttributes().size(),
- String.format(Locale.ROOT, "Validation failed for feature(s) of detector %s", anomalyDetector.getName()),
- false
- );
-
- for (Feature feature : anomalyDetector.getFeatureAttributes()) {
- SearchSourceBuilder ssb = new SearchSourceBuilder().size(1).query(QueryBuilders.matchAllQuery());
- AggregatorFactories.Builder internalAgg = parseAggregators(
- feature.getAggregation().toString(),
- xContentRegistry,
- feature.getId()
- );
- ssb.aggregation(internalAgg.getAggregatorFactories().iterator().next());
- SearchRequest searchRequest = new SearchRequest().indices(anomalyDetector.getIndices().toArray(new String[0])).source(ssb);
- ActionListener searchResponseListener = ActionListener.wrap(response -> {
- Optional aggFeatureResult = searchFeatureDao.parseResponse(response, Arrays.asList(feature.getId()));
- if (aggFeatureResult.isPresent()) {
- multiFeatureQueriesResponseListener
- .onResponse(
- new MergeableList>(new ArrayList>(Arrays.asList(aggFeatureResult)))
- );
- } else {
- String errorMessage = CommonMessages.FEATURE_WITH_EMPTY_DATA_MSG + feature.getName();
- logger.error(errorMessage);
- multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST));
- }
- }, e -> {
- String errorMessage;
- if (isExceptionCausedByInvalidQuery(e)) {
- errorMessage = CommonMessages.FEATURE_WITH_INVALID_QUERY_MSG + feature.getName();
- } else {
- errorMessage = CommonMessages.UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG + feature.getName();
- }
- logger.error(errorMessage, e);
- multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e));
- });
- clientUtil.asyncRequestWithInjectedSecurity(searchRequest, client::search, user, client, searchResponseListener);
- }
+ modelValidationActionHandler.checkIfMultiEntityDetector();
}
}
diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java
index b401ce007..51e3df820 100644
--- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java
+++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java
@@ -11,14 +11,14 @@
package org.opensearch.ad.rest.handler;
-import org.opensearch.action.ActionListener;
import org.opensearch.action.support.WriteRequest;
-import org.opensearch.ad.feature.SearchFeatureDao;
+import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
+import org.opensearch.ad.model.ADTask;
+import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.task.ADTaskManager;
+import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
-import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
@@ -26,6 +26,9 @@
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.rest.RestRequest;
+import org.opensearch.timeseries.feature.SearchFeatureDao;
+import org.opensearch.timeseries.task.TaskManager;
+import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;
/**
@@ -50,9 +53,10 @@ public class IndexAnomalyDetectorActionHandler extends AbstractAnomalyDetectorAc
* @param refreshPolicy refresh policy
* @param anomalyDetector anomaly detector instance
* @param requestTimeout request time out configuration
- * @param maxSingleEntityAnomalyDetectors max single-entity anomaly detectors allowed
- * @param maxMultiEntityAnomalyDetectors max multi-entity detectors allowed
- * @param maxAnomalyFeatures max features allowed per detector
+ * @param maxSingleStreamDetectors max single-stream anomaly detectors allowed
+ * @param maxHCDetectors max HC detectors allowed
+ * @param maxFeatures max features allowed per detector
+ * @param maxCategoricalFields max number of categorical fields
* @param method Rest Method type
* @param xContentRegistry Registry which is used for XContentParser
* @param user User context
@@ -65,7 +69,6 @@ public IndexAnomalyDetectorActionHandler(
Client client,
SecurityClientUtil clientUtil,
TransportService transportService,
- ActionListener listener,
ADIndexManagement anomalyDetectionIndices,
String detectorId,
Long seqNo,
@@ -73,13 +76,14 @@ public IndexAnomalyDetectorActionHandler(
WriteRequest.RefreshPolicy refreshPolicy,
AnomalyDetector anomalyDetector,
TimeValue requestTimeout,
- Integer maxSingleEntityAnomalyDetectors,
- Integer maxMultiEntityAnomalyDetectors,
- Integer maxAnomalyFeatures,
+ Integer maxSingleStreamDetectors,
+ Integer maxHCDetectors,
+ Integer maxFeatures,
+ Integer maxCategoricalFields,
RestRequest.Method method,
NamedXContentRegistry xContentRegistry,
User user,
- ADTaskManager adTaskManager,
+ TaskManager adTaskManager,
SearchFeatureDao searchFeatureDao,
Settings settings
) {
@@ -88,7 +92,6 @@ public IndexAnomalyDetectorActionHandler(
client,
clientUtil,
transportService,
- listener,
anomalyDetectionIndices,
detectorId,
seqNo,
@@ -96,9 +99,10 @@ public IndexAnomalyDetectorActionHandler(
refreshPolicy,
anomalyDetector,
requestTimeout,
- maxSingleEntityAnomalyDetectors,
- maxMultiEntityAnomalyDetectors,
- maxAnomalyFeatures,
+ maxSingleStreamDetectors,
+ maxHCDetectors,
+ maxFeatures,
+ maxCategoricalFields,
method,
xContentRegistry,
user,
@@ -110,12 +114,4 @@ public IndexAnomalyDetectorActionHandler(
settings
);
}
-
- /**
- * Start function to process create/update anomaly detector request.
- */
- @Override
- public void start() {
- super.start();
- }
}
diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java
deleted file mode 100644
index 824c6fc21..000000000
--- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- *
- * Modifications Copyright OpenSearch Contributors. See
- * GitHub history for details.
- */
-
-package org.opensearch.ad.rest.handler;
-
-import static org.opensearch.action.DocWriteResponse.Result.CREATED;
-import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
-import static org.opensearch.ad.util.ExceptionUtil.getShardsFailure;
-import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
-import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.opensearch.OpenSearchStatusException;
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.get.GetRequest;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.support.WriteRequest;
-import org.opensearch.ad.ExecuteADResultResponseRecorder;
-import org.opensearch.ad.indices.ADIndexManagement;
-import org.opensearch.ad.model.ADTaskState;
-import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.AnomalyDetectorJob;
-import org.opensearch.ad.task.ADTaskManager;
-import org.opensearch.ad.transport.AnomalyDetectorJobResponse;
-import org.opensearch.ad.transport.AnomalyResultAction;
-import org.opensearch.ad.transport.AnomalyResultRequest;
-import org.opensearch.ad.transport.StopDetectorAction;
-import org.opensearch.ad.transport.StopDetectorRequest;
-import org.opensearch.ad.transport.StopDetectorResponse;
-import org.opensearch.client.Client;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.common.xcontent.XContentFactory;
-import org.opensearch.core.rest.RestStatus;
-import org.opensearch.core.xcontent.NamedXContentRegistry;
-import org.opensearch.core.xcontent.XContentParser;
-import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
-import org.opensearch.jobscheduler.spi.schedule.Schedule;
-import org.opensearch.timeseries.constant.CommonName;
-import org.opensearch.timeseries.function.ExecutorFunction;
-import org.opensearch.timeseries.model.IntervalTimeConfiguration;
-import org.opensearch.timeseries.util.RestHandlerUtils;
-import org.opensearch.transport.TransportService;
-
-import com.google.common.base.Throwables;
-
-/**
- * Anomaly detector job REST action handler to process POST/PUT request.
- */
-public class IndexAnomalyDetectorJobActionHandler {
-
- private final ADIndexManagement anomalyDetectionIndices;
- private final String detectorId;
- private final Long seqNo;
- private final Long primaryTerm;
- private final Client client;
- private final NamedXContentRegistry xContentRegistry;
- private final TransportService transportService;
- private final ADTaskManager adTaskManager;
-
- private final Logger logger = LogManager.getLogger(IndexAnomalyDetectorJobActionHandler.class);
- private final TimeValue requestTimeout;
- private final ExecuteADResultResponseRecorder recorder;
-
- /**
- * Constructor function.
- *
- * @param client ES node client that executes actions on the local node
- * @param anomalyDetectionIndices anomaly detector index manager
- * @param detectorId detector identifier
- * @param seqNo sequence number of last modification
- * @param primaryTerm primary term of last modification
- * @param requestTimeout request time out configuration
- * @param xContentRegistry Registry which is used for XContentParser
- * @param transportService transport service
- * @param adTaskManager AD task manager
- * @param recorder Utility to record AnomalyResultAction execution result
- */
- public IndexAnomalyDetectorJobActionHandler(
- Client client,
- ADIndexManagement anomalyDetectionIndices,
- String detectorId,
- Long seqNo,
- Long primaryTerm,
- TimeValue requestTimeout,
- NamedXContentRegistry xContentRegistry,
- TransportService transportService,
- ADTaskManager adTaskManager,
- ExecuteADResultResponseRecorder recorder
- ) {
- this.client = client;
- this.anomalyDetectionIndices = anomalyDetectionIndices;
- this.detectorId = detectorId;
- this.seqNo = seqNo;
- this.primaryTerm = primaryTerm;
- this.requestTimeout = requestTimeout;
- this.xContentRegistry = xContentRegistry;
- this.transportService = transportService;
- this.adTaskManager = adTaskManager;
- this.recorder = recorder;
- }
-
- /**
- * Start anomaly detector job.
- * 1. If job doesn't exist, create new job.
- * 2. If job exists: a). if job enabled, return error message; b). if job disabled, enable job.
- * @param detector anomaly detector
- * @param listener Listener to send responses
- */
- public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) {
- // this start listener is created & injected throughout the job handler so that whenever the job response is received,
- // there's the extra step of trying to index results and update detector state with a 60s delay.
- ActionListener startListener = ActionListener.wrap(r -> {
- try {
- Instant executionEndTime = Instant.now();
- IntervalTimeConfiguration schedule = (IntervalTimeConfiguration) detector.getInterval();
- Instant executionStartTime = executionEndTime.minus(schedule.getInterval(), schedule.getUnit());
- AnomalyResultRequest getRequest = new AnomalyResultRequest(
- detector.getId(),
- executionStartTime.toEpochMilli(),
- executionEndTime.toEpochMilli()
- );
- client
- .execute(
- AnomalyResultAction.INSTANCE,
- getRequest,
- ActionListener
- .wrap(
- response -> recorder.indexAnomalyResult(executionStartTime, executionEndTime, response, detector),
- exception -> {
-
- recorder
- .indexAnomalyResultException(
- executionStartTime,
- executionEndTime,
- Throwables.getStackTraceAsString(exception),
- null,
- detector
- );
- }
- )
- );
- } catch (Exception ex) {
- listener.onFailure(ex);
- return;
- }
- listener.onResponse(r);
-
- }, listener::onFailure);
- if (!anomalyDetectionIndices.doesJobIndexExist()) {
- anomalyDetectionIndices.initJobIndex(ActionListener.wrap(response -> {
- if (response.isAcknowledged()) {
- logger.info("Created {} with mappings.", CommonName.CONFIG_INDEX);
- createJob(detector, startListener);
- } else {
- logger.warn("Created {} with mappings call not acknowledged.", CommonName.CONFIG_INDEX);
- startListener
- .onFailure(
- new OpenSearchStatusException(
- "Created " + CommonName.CONFIG_INDEX + " with mappings call not acknowledged.",
- RestStatus.INTERNAL_SERVER_ERROR
- )
- );
- }
- }, exception -> startListener.onFailure(exception)));
- } else {
- createJob(detector, startListener);
- }
- }
-
- private void createJob(AnomalyDetector detector, ActionListener listener) {
- try {
- IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getInterval();
- Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit());
- Duration duration = Duration.of(interval.getInterval(), interval.getUnit());
-
- AnomalyDetectorJob job = new AnomalyDetectorJob(
- detector.getId(),
- schedule,
- detector.getWindowDelay(),
- true,
- Instant.now(),
- null,
- Instant.now(),
- duration.getSeconds(),
- detector.getUser(),
- detector.getCustomResultIndex()
- );
-
- getAnomalyDetectorJobForWrite(detector, job, listener);
- } catch (Exception e) {
- String message = "Failed to parse anomaly detector job " + detectorId;
- logger.error(message, e);
- listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR));
- }
- }
-
- private void getAnomalyDetectorJobForWrite(
- AnomalyDetector detector,
- AnomalyDetectorJob job,
- ActionListener listener
- ) {
- GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
-
- client
- .get(
- getRequest,
- ActionListener
- .wrap(
- response -> onGetAnomalyDetectorJobForWrite(response, detector, job, listener),
- exception -> listener.onFailure(exception)
- )
- );
- }
-
- private void onGetAnomalyDetectorJobForWrite(
- GetResponse response,
- AnomalyDetector detector,
- AnomalyDetectorJob job,
- ActionListener listener
- ) throws IOException {
- if (response.isExists()) {
- try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
- ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
- AnomalyDetectorJob currentAdJob = AnomalyDetectorJob.parse(parser);
- if (currentAdJob.isEnabled()) {
- listener
- .onFailure(new OpenSearchStatusException("Anomaly detector job is already running: " + detectorId, RestStatus.OK));
- return;
- } else {
- AnomalyDetectorJob newJob = new AnomalyDetectorJob(
- job.getName(),
- job.getSchedule(),
- job.getWindowDelay(),
- job.isEnabled(),
- Instant.now(),
- currentAdJob.getDisabledTime(),
- Instant.now(),
- job.getLockDurationSeconds(),
- job.getUser(),
- job.getCustomResultIndex()
- );
- // Get latest realtime task and check its state before index job. Will reset running realtime task
- // as STOPPED first if job disabled, then start new job and create new realtime task.
- adTaskManager
- .startDetector(
- detector,
- null,
- job.getUser(),
- transportService,
- ActionListener
- .wrap(
- r -> { indexAnomalyDetectorJob(newJob, null, listener); },
- e -> {
- // Have logged error message in ADTaskManager#startDetector
- listener.onFailure(e);
- }
- )
- );
- }
- } catch (IOException e) {
- String message = "Failed to parse anomaly detector job " + job.getName();
- logger.error(message, e);
- listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR));
- }
- } else {
- adTaskManager
- .startDetector(
- detector,
- null,
- job.getUser(),
- transportService,
- ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null, listener); }, e -> listener.onFailure(e))
- );
- }
- }
-
- private void indexAnomalyDetectorJob(
- AnomalyDetectorJob job,
- ExecutorFunction function,
- ActionListener listener
- ) throws IOException {
- IndexRequest indexRequest = new IndexRequest(CommonName.JOB_INDEX)
- .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- .source(job.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE))
- .setIfSeqNo(seqNo)
- .setIfPrimaryTerm(primaryTerm)
- .timeout(requestTimeout)
- .id(detectorId);
- client
- .index(
- indexRequest,
- ActionListener
- .wrap(
- response -> onIndexAnomalyDetectorJobResponse(response, function, listener),
- exception -> listener.onFailure(exception)
- )
- );
- }
-
- private void onIndexAnomalyDetectorJobResponse(
- IndexResponse response,
- ExecutorFunction function,
- ActionListener listener
- ) {
- if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) {
- String errorMsg = getShardsFailure(response);
- listener.onFailure(new OpenSearchStatusException(errorMsg, response.status()));
- return;
- }
- if (function != null) {
- function.execute();
- } else {
- AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(
- response.getId(),
- response.getVersion(),
- response.getSeqNo(),
- response.getPrimaryTerm(),
- RestStatus.OK
- );
- listener.onResponse(anomalyDetectorJobResponse);
- }
- }
-
- /**
- * Stop anomaly detector job.
- * 1.If job not exists, return error message
- * 2.If job exists: a).if job state is disabled, return error message; b).if job state is enabled, disable job.
- *
- * @param detectorId detector identifier
- * @param listener Listener to send responses
- */
- public void stopAnomalyDetectorJob(String detectorId, ActionListener listener) {
- GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
-
- client.get(getRequest, ActionListener.wrap(response -> {
- if (response.isExists()) {
- try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
- ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
- AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
- if (!job.isEnabled()) {
- adTaskManager.stopLatestRealtimeTask(detectorId, ADTaskState.STOPPED, null, transportService, listener);
- } else {
- AnomalyDetectorJob newJob = new AnomalyDetectorJob(
- job.getName(),
- job.getSchedule(),
- job.getWindowDelay(),
- false,
- job.getEnabledTime(),
- Instant.now(),
- Instant.now(),
- job.getLockDurationSeconds(),
- job.getUser(),
- job.getCustomResultIndex()
- );
- indexAnomalyDetectorJob(
- newJob,
- () -> client
- .execute(
- StopDetectorAction.INSTANCE,
- new StopDetectorRequest(detectorId),
- stopAdDetectorListener(detectorId, listener)
- ),
- listener
- );
- }
- } catch (IOException e) {
- String message = "Failed to parse anomaly detector job " + detectorId;
- logger.error(message, e);
- listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR));
- }
- } else {
- listener.onFailure(new OpenSearchStatusException("Anomaly detector job not exist: " + detectorId, RestStatus.BAD_REQUEST));
- }
- }, exception -> listener.onFailure(exception)));
- }
-
- private ActionListener stopAdDetectorListener(
- String detectorId,
- ActionListener listener
- ) {
- return new ActionListener() {
- @Override
- public void onResponse(StopDetectorResponse stopDetectorResponse) {
- if (stopDetectorResponse.success()) {
- logger.info("AD model deleted successfully for detector {}", detectorId);
- // StopDetectorTransportAction will send out DeleteModelAction which will clear all realtime cache.
- // Pass null transport service to method "stopLatestRealtimeTask" to not re-clear coordinating node cache.
- adTaskManager.stopLatestRealtimeTask(detectorId, ADTaskState.STOPPED, null, null, listener);
- } else {
- logger.error("Failed to delete AD model for detector {}", detectorId);
- // If failed to clear all realtime cache, will try to re-clear coordinating node cache.
- adTaskManager
- .stopLatestRealtimeTask(
- detectorId,
- ADTaskState.FAILED,
- new OpenSearchStatusException("Failed to delete AD model", RestStatus.INTERNAL_SERVER_ERROR),
- transportService,
- listener
- );
- }
- }
-
- @Override
- public void onFailure(Exception e) {
- logger.error("Failed to delete AD model for detector " + detectorId, e);
- // If failed to clear all realtime cache, will try to re-clear coordinating node cache.
- adTaskManager
- .stopLatestRealtimeTask(
- detectorId,
- ADTaskState.FAILED,
- new OpenSearchStatusException("Failed to execute stop detector action", RestStatus.INTERNAL_SERVER_ERROR),
- transportService,
- listener
- );
- }
- };
- }
-
-}
diff --git a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java
index ada684808..1ffb271ff 100644
--- a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java
+++ b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java
@@ -35,13 +35,9 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.ADCommonMessages;
-import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.model.AnomalyDetector;
-import org.opensearch.ad.model.MergeableList;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.transport.ValidateAnomalyDetectorResponse;
-import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
-import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
@@ -68,15 +64,21 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
+import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.constant.CommonMessages;
+import org.opensearch.timeseries.feature.SearchFeatureDao;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
+import org.opensearch.timeseries.model.MergeableList;
import org.opensearch.timeseries.model.TimeConfiguration;
import org.opensearch.timeseries.model.ValidationAspect;
import org.opensearch.timeseries.model.ValidationIssueType;
+import org.opensearch.timeseries.rest.handler.ConfigUpdateConfirmer;
+import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.ParseUtils;
+import org.opensearch.timeseries.util.SecurityClientUtil;
/**
*
This class executes all validation checks that are not blocking on the 'model' level.
@@ -94,7 +96,7 @@ public class ModelValidationActionHandler {
protected final ClusterService clusterService;
protected final Logger logger = LogManager.getLogger(AbstractAnomalyDetectorActionHandler.class);
protected final TimeValue requestTimeout;
- protected final AnomalyDetectorActionHandler handler = new AnomalyDetectorActionHandler();
+ protected final ConfigUpdateConfirmer handler = new ConfigUpdateConfirmer();
protected final Client client;
protected final SecurityClientUtil clientUtil;
protected final NamedXContentRegistry xContentRegistry;
@@ -104,6 +106,7 @@ public class ModelValidationActionHandler {
protected final String validationType;
protected final Settings settings;
protected final User user;
+ protected final AnalysisType context;
/**
* Constructor function.
@@ -147,6 +150,7 @@ public ModelValidationActionHandler(
this.clock = clock;
this.settings = settings;
this.user = user;
+ this.context = AnalysisType.AD;
}
// Need to first check if multi entity detector or not before doing any sort of validation.
@@ -253,6 +257,7 @@ private void getTopEntity(ActionListener