Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qsb framework changes #13007

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ protected void executePhaseOnShard(
if (request != null) {
request.setInboundNetworkTime(System.currentTimeMillis());
}
request.setSandboxId(getTask().getSandboxId());
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SandboxableTask;
import org.opensearch.tasks.SearchBackpressureTask;

import java.util.Map;
Expand All @@ -50,9 +51,10 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, SandboxableTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making the SanboxableTask an interface like this it would imply there are numerous tasks which may eventually support sandboxing (or whatever it ends up being called); do you have some other ideas around what this may look like?

I think that may help with defining the interface more accurately. Personally, I had thought it was only for search so I could have been misunderstanding things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were thinking that this feature can supersede the AdmissionControl and SearchBackpressure, We can extend this idea to indexing traffic as well.

// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;
private String sandboxId;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
Expand All @@ -75,6 +77,14 @@ public String getTaskMetadata() {
return metadataSupplier.get();
}

public String getSandboxId() {
return sandboxId;
}

public void setSandboxId(String sandboxId) {
this.sandboxId = sandboxId;
}

@Override
public boolean supportsResourceTracking() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SandboxableTask;
import org.opensearch.tasks.SearchBackpressureTask;

import java.util.Map;
Expand All @@ -49,10 +50,11 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
public class SearchTask extends CancellableTask implements SearchBackpressureTask, SandboxableTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private String sandboxId;

public SearchTask(
long id,
Expand Down Expand Up @@ -95,6 +97,14 @@ public final void setProgressListener(SearchProgressListener progressListener) {
this.progressListener = progressListener;
}

public String getSandboxId() {
return sandboxId;
}

public void setSandboxId(String sandboxId) {
this.sandboxId = sandboxId;
}

/**
* Return the {@link SearchProgressListener} attached to this task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.sandbox.RequestSandboxClassifier;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
Expand Down Expand Up @@ -187,6 +188,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,

private SearchQueryCategorizer searchQueryCategorizer;

private final RequestSandboxClassifier requestSandboxClassifier;

@Inject
public TransportSearchAction(
NodeClient client,
Expand All @@ -201,6 +204,7 @@ public TransportSearchAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
RequestSandboxClassifier requestSandboxClassifier
MetricsRegistry metricsRegistry,
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
Tracer tracer
Expand All @@ -220,6 +224,7 @@ public TransportSearchAction(
this.searchPipelineService = searchPipelineService;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.requestSandboxClassifier = requestSandboxClassifier;
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
Expand Down Expand Up @@ -1104,6 +1109,10 @@ private void executeSearch(
concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size()
);
// Set sandbox for the request
final String requestSandboxId = requestSandboxClassifier.resolveSandboxFor(searchRequest);
task.setSandboxId(requestSandboxId);

searchAsyncActionProvider.asyncSearchAction(
task,
searchRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.opensearch.search.sandbox.QuerySandboxServiceSettings;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
Expand Down Expand Up @@ -726,6 +727,11 @@ public void apply(Settings value, Settings current, Settings previous) {

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING

// Query Sandbox Settings
QuerySandboxServiceSettings.MAX_SANDBOX_COUNT,
QuerySandboxServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD,
QuerySandboxServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ public void executeDfsPhase(
ActionListener<SearchPhaseResult> listener
) {
final IndexShard shard = getShard(request);
task.setSandboxId(request.getSandboxId());
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest rewritten) {
Expand Down Expand Up @@ -556,6 +557,7 @@ public void executeQueryPhase(
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
task.setSandboxId(request.getSandboxId());
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
Expand Down Expand Up @@ -663,6 +665,7 @@ public void executeQueryPhase(
throw e;
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
// TODO: might want to explore this execution path
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
Expand All @@ -686,6 +689,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
task.setSandboxId(shardSearchRequest.getSandboxId());
runAsync(getExecutor(readerContext.indexShard()), () -> {
readerContext.setAggregatedDfs(request.dfs());
try (
Expand Down Expand Up @@ -767,6 +771,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
task.setSandboxId(shardSearchRequest.getSandboxId());
runAsync(getExecutor(readerContext.indexShard()), () -> {
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
if (request.lastEmittedDoc() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
private SearchSourceBuilder source;
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private String sandboxId;

public ShardSearchRequest(
OriginalIndices originalIndices,
Expand Down Expand Up @@ -265,6 +266,9 @@ public ShardSearchRequest(StreamInput in) throws IOException {
bottomSortValues = in.readOptionalWriteable(SearchSortValuesAndFormats::new);
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
keepAlive = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
sandboxId = in.readOptionalString();
}
originalIndices = OriginalIndices.readOriginalIndices(in);
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}
Expand All @@ -290,6 +294,7 @@ public ShardSearchRequest(ShardSearchRequest clone) {
this.originalIndices = clone.originalIndices;
this.readerId = clone.readerId;
this.keepAlive = clone.keepAlive;
this.sandboxId = clone.sandboxId;
}

@Override
Expand Down Expand Up @@ -335,6 +340,10 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
}

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalString(sandboxId);
}
}

@Override
Expand All @@ -353,6 +362,14 @@ public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}

public String getSandboxId() {
return sandboxId;
}

public void setSandboxId(String sandboxId) {
this.sandboxId = sandboxId;
}

public ShardId shardId() {
return shardId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.sandbox;

import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;

/**
* This interface defines the key APIs for implementing Sandbox persistence
*/
public interface Persistable {
/**
* persists the @link Sandbox in a durable storage
* @param sandbox
*/
<U extends ActionResponse> void persist(Object sandbox, ActionListener<U> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.sandbox;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;

/**
* Main service which will run periodically to track and cancel resource constraint violating tasks in sandboxes
*/
public class QuerySandboxService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(QuerySandboxService.class);

private final SandboxedRequestTrackerService requestTrackerService;
private volatile Scheduler.Cancellable scheduledFuture;
private final QuerySandboxServiceSettings sandboxServiceSettings;
private final ThreadPool threadPool;

public QuerySandboxService(
SandboxedRequestTrackerService requestTrackerService,
QuerySandboxServiceSettings sandboxServiceSettings,
ThreadPool threadPool
) {
this.requestTrackerService = requestTrackerService;
this.sandboxServiceSettings = sandboxServiceSettings;
this.threadPool = threadPool;
}

private void doRun() {
requestTrackerService.updateSandboxResourceUsages();
requestTrackerService.cancelViolatingTasks();
requestTrackerService.pruneSandboxes();
}

@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
} catch (Exception e) {
logger.debug("Exception occurred in Query Sandbox service", e);
}
}, sandboxServiceSettings.getRunIntervalMillis(), ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
}

@Override
protected void doClose() throws IOException {}

public SandboxStatsHolder stats() {
return requestTrackerService.getSandboxLevelStats();
}
}
Loading
Loading