Skip to content

Commit

Permalink
prelim framework for jobscheduler and datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
jowg-amazon committed Oct 2, 2023
1 parent 52190ab commit 994739c
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 142 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ buildscript {

dependencies {
classpath "org.opensearch.gradle:build-tools:${opensearch_version}"
classpath "io.freefair.gradle:lombok-plugin:6.4.3"
}
}

Expand All @@ -48,6 +49,7 @@ apply plugin: 'opensearch.java-rest-test'
apply plugin: 'opensearch.pluginzip'
apply from: 'build-tools/opensearchplugin-coverage.gradle'
apply from: 'gradle/formatting.gradle'
apply plugin: 'io.freefair.lombok'

ext {
projectSubstitutions = [:]
Expand Down Expand Up @@ -159,6 +161,7 @@ dependencies {
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
implementation "org.apache.commons:commons-csv:1.10.0"

// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.common;

import java.util.function.Supplier;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;

/**
* Helper class to run code with stashed thread context
*
* Code need to be run with stashed thread context if it interacts with system index
* when security plugin is enabled.
*/
public class StashedThreadContext {
/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing
*/
public static void run(final Client client, final Runnable function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
function.run();
}
}

/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function supplier function that needs to be executed after thread context has been stashed, return object
*/
public static <T> T run(final Client client, final Supplier<T> function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
return function.get();
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.common;

import java.util.concurrent.ExecutorService;

import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;

/**
* Provide a list of static methods related with executors for threat intel
*/
public class ThreatIntelExecutor {
private static final String THREAD_POOL_NAME = "_plugin_securityanalytics_threatintel_datasource_update";
private final ThreadPool threadPool;

public ThreatIntelExecutor(final ThreadPool threadPool) {
this.threadPool = threadPool;
}

/**
* We use fixed thread count of 1 for updating datasource as updating datasource is running background
* once a day at most and no need to expedite the task.
*
* @param settings the settings
* @return the executor builder
*/
public static ExecutorBuilder executorBuilder(final Settings settings) {
return new FixedExecutorBuilder(settings, THREAD_POOL_NAME, 1, 1000, THREAD_POOL_NAME, false);
}

/**
* Return an executor service for datasource update task
*
* @return the executor service
*/
public ExecutorService forDatasourceUpdate() {
return threadPool.executor(THREAD_POOL_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,81 @@
*/
public class ThreatIntelSettings {

/**
* Default endpoint to be used in threatIP datasource creation API
*/
public static final Setting<String> DATASOURCE_ENDPOINT = Setting.simpleString(
"plugins.securityanalytics.threatintel.datasource.endpoint",
"https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json",
new DatasourceEndpointValidator(),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Default update interval to be used in Ip2Geo datasource creation API
*/
public static final Setting<Long> DATASOURCE_UPDATE_INTERVAL = Setting.longSetting(
"plugins.geospatial.ip2geo.datasource.update_interval_in_days",
3l,
1l,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Bulk size for indexing GeoIP data
*/
public static final Setting<Integer> BATCH_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.datasource.batch_size",
10000,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// /**
// * Default endpoint to be used in threatIP datasource creation API
// */
// public static final Setting<String> DATASOURCE_ENDPOINT = Setting.simpleString(
// "plugins.security_analytics.threatintel.datasource.endpoint",
// "https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json",
// new DatasourceEndpointValidator(),
// Setting.Property.NodeScope,
// Setting.Property.Dynamic
// );
//
// /**
// * Default update interval to be used in Ip2Geo datasource creation API
// */
// public static final Setting<Long> DATASOURCE_UPDATE_INTERVAL = Setting.longSetting(
// "plugins.security_analytics.threatintel.datasource.update_interval_in_days",
// 3l,
// 1l,
// Setting.Property.NodeScope,
// Setting.Property.Dynamic
// );
//
// /**
// * Bulk size for indexing GeoIP data
// */
// public static final Setting<Integer> BATCH_SIZE = Setting.intSetting(
// "plugins.security_analytics.threatintel.datasource.batch_size",
// 10000,
// 1,
// Setting.Property.NodeScope,
// Setting.Property.Dynamic
// );
//
/**
* Timeout value for Ip2Geo processor
*/
public static final Setting<TimeValue> TIMEOUT = Setting.timeSetting(
"plugins.geospatial.ip2geo.timeout",
"plugins.security_analytics.index_timeout",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(1),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Max size for geo data cache
*/
public static final Setting<Long> CACHE_SIZE = Setting.longSetting(
"plugins.geospatial.ip2geo.processor.cache_size",
1000,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
// /**
// * Max size for geo data cache
// */
// public static final Setting<Long> CACHE_SIZE = Setting.longSetting(
// "plugins.security_analytics.threatintel.processor.cache_size",
// 1000,
// 0,
// Setting.Property.NodeScope,
// Setting.Property.Dynamic
// );

/**
* Return all settings of Ip2Geo feature
* @return a list of all settings for Ip2Geo feature
* Return all settings of threatIntel feature
* @return a list of all settings for threatIntel feature
*/
public static final List<Setting<?>> settings() {
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, BATCH_SIZE, TIMEOUT, CACHE_SIZE);
// return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, BATCH_SIZE, TIMEOUT, CACHE_SIZE);
return List.of(TIMEOUT);
}

/**
* Visible for testing
*/
protected static class DatasourceEndpointValidator implements Setting.Validator<String> {
@Override
public void validate(final String value) {
try {
new URL(value).toURI();
} catch (MalformedURLException | URISyntaxException e) {
throw new IllegalArgumentException("Invalid URL format is provided");
}
}
}
// /**
// * Visible for testing
// */
// protected static class DatasourceEndpointValidator implements Setting.Validator<String> {
// @Override
// public void validate(final String value) {
// try {
// new URL(value).toURI();
// } catch (MalformedURLException | URISyntaxException e) {
// throw new IllegalArgumentException("Invalid URL format is provided");
// }
// }
// }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,143 @@
package org.opensearch.securityanalytics.threatintel.dao;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;

import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.securityanalytics.model.DetectorTrigger;
import org.opensearch.securityanalytics.threatintel.common.StashedThreadContext;
import org.opensearch.securityanalytics.threatintel.common.ThreatIntelSettings;
import org.opensearch.securityanalytics.threatintel.jobscheduler.DatasourceExtension;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.securityanalytics.threatintel.jobscheduler.Datasource;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.stream.Collectors;

public class DatasourceDao {
private static final Logger log = LogManager.getLogger(DetectorTrigger.class);

private static final Integer MAX_SIZE = 1000;
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;

public DatasourceDao(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
}

// /**
// * Create datasource index
// *
// * @param stepListener setup listener
// */
// public void createIndexIfNotExists(final StepListener<Void> stepListener) {
// if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
// stepListener.onResponse(null);
// return;
// }
// final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
// .settings(DatasourceExtension.INDEX_SETTING);
//
// StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
// @Override
// public void onResponse(final CreateIndexResponse createIndexResponse) {
// stepListener.onResponse(null);
// }
//
// @Override
// public void onFailure(final Exception e) {
// if (e instanceof ResourceAlreadyExistsException) {
// log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME);
// stepListener.onResponse(null);
// return;
// }
// stepListener.onFailure(e);
// }
// }));
// }

private String getIndexMapping() {
try {
try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threatintel_datasource.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param name the name of a datasource
* @return datasource
* @throws IOException exception
*/
public Datasource getDatasource(final String name) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetResponse response;
try {
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(ThreatIntelSettings.TIMEOUT)));
if (response.isExists() == false) {
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME);
return null;
}
} catch (IndexNotFoundException e) {
log.error("Index[{}] is not found", DatasourceExtension.JOB_INDEX_NAME);
return null;
}

XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getSourceAsBytesRef()
);
return Datasource.PARSER.parse(parser, null);
}

/**
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param datasource the datasource
* @return index response
*/
public IndexResponse updateDatasource(final Datasource datasource) {
datasource.setLastUpdateTime(Instant.now());
return StashedThreadContext.run(client, () -> {
try {
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(ThreatIntelSettings.TIMEOUT));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

}
Loading

0 comments on commit 994739c

Please sign in to comment.