Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Views for OpenSearch #139

Closed
wants to merge 13 commits into from
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
import org.opensearch.action.admin.indices.upgrade.post.UpgradeSettingsAction;
import org.opensearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.opensearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.opensearch.action.admin.indices.view.CreateViewAction;
import org.opensearch.action.bulk.BulkAction;
import org.opensearch.action.bulk.TransportBulkAction;
import org.opensearch.action.bulk.TransportShardBulkAction;
Expand Down Expand Up @@ -721,6 +722,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class);

// Views:
actions.register(CreateViewAction.INSTANCE, CreateViewAction.TransportAction.class);

// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package org.opensearch.action.admin.indices.view;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.ActionType;
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ViewService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/** Action to create a view */
public class CreateViewAction extends ActionType<CreateViewAction.Response> {

public static final CreateViewAction INSTANCE = new CreateViewAction();
public static final String NAME = "cluster:views:create";

private CreateViewAction() {
super(NAME, CreateViewAction.Response::new);
}


/** View target representation for create requests */
public static class ViewTarget implements Writeable {
public final String indexPattern;

public ViewTarget(final String indexPattern) {
this.indexPattern = indexPattern;
}

public ViewTarget(final StreamInput in) throws IOException {
this.indexPattern = in.readString();
}

public String getIndexPattern() {
return indexPattern;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexPattern);
}

public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

if (Strings.isNullOrEmpty(indexPattern)) {
validationException = ValidateActions.addValidationError("index pattern cannot be empty or null", validationException);
}

return validationException;
}

}

/**
* Request for Creating View
*/
public static class Request extends ClusterManagerNodeRequest<Request> {
private final String name;
private final String description;
private final List<ViewTarget> targets;

public Request(final String name, final String description, final List<ViewTarget> targets) {
this.name = name;
this.description = description;
this.targets = targets;
}

public String getName() {
return name;
}

public String getDescription() {
return description;
}

public List<ViewTarget> getTargets() {
return new ArrayList<>(targets);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (Strings.isNullOrEmpty(name)) {
validationException = ValidateActions.addValidationError("Name is cannot be empty or null", validationException);
}
if (targets.isEmpty()) {
validationException = ValidateActions.addValidationError("targets cannot be empty", validationException);
}

for (final ViewTarget target : targets) {
validationException = target.validate();
}

return validationException;
}

public Request(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
this.description = in.readString();
this.targets = in.readList(ViewTarget::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(description);
out.writeList(targets);
}
}

/** Response after view is created */
public static class Response extends ActionResponse {

private final org.opensearch.cluster.metadata.View createdView;

public Response(final org.opensearch.cluster.metadata.View createdView) {
this.createdView = createdView;
}

public Response(final StreamInput in) throws IOException {
super(in);
this.createdView = new org.opensearch.cluster.metadata.View(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
this.createdView.writeTo(out);
}
}

/**
* Transport Action for creating a View
*/
public static class TransportAction extends TransportClusterManagerNodeAction<Request, Response> {

private final ViewService viewService;

@Inject
public TransportAction(
final TransportService transportService,
final ClusterService clusterService,
final ThreadPool threadPool,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final ViewService viewService
) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.viewService = viewService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}

@Override
protected void clusterManagerOperation(Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
viewService.createView(request, listener);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** View transport handlers. */
package org.opensearch.action.admin.indices.view;
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;

private final String localClusterAlias;
private final long absoluteStartMillis;
private final boolean finalReduce;
protected final String localClusterAlias;
protected final long absoluteStartMillis;
protected final boolean finalReduce;

private SearchType searchType = SearchType.DEFAULT;

private String[] indices = Strings.EMPTY_ARRAY;
protected String[] indices = Strings.EMPTY_ARRAY;

@Nullable
private String routing;
Expand Down Expand Up @@ -189,7 +189,7 @@ static SearchRequest subSearchRequest(
return new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce);
}

private SearchRequest(
protected SearchRequest(
SearchRequest searchRequest,
String[] indices,
String localClusterAlias,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.View;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;

import static org.opensearch.action.ValidateActions.addValidationError;

/** Wraps the functionality of search requests and tailors for what is available when searching through views
*/
@ExperimentalApi
public class ViewSearchRequest extends SearchRequest {

public final View view;

public ViewSearchRequest(final View view) {
super();
this.view = view;
}

public ViewSearchRequest(final StreamInput in) throws IOException {
super(in);
view = new View(in);
}

@Override
public ActionRequestValidationException validate() {
final Function<String, String> unsupported = (String x) -> x + " is not supported when searching views";
ActionRequestValidationException validationException = super.validate();

if (scroll() != null) {
validationException = addValidationError(unsupported.apply("Scroll"), validationException);
}

// TODO: Filter out anything additional search features that are not supported

return validationException;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
view.writeTo(out);
}

@Override
public boolean equals(final Object o) {
// TODO: Maybe this isn't standard practice
return this.hashCode() == o.hashCode();
}

@Override
public int hashCode() {
return Objects.hash(view, super.hashCode());
}

@Override
public String toString() {
return super.toString().replace("SearchRequest{", "ViewSearchRequest{view=" + view + ",");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -195,6 +196,7 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, ViewMetadata.TYPE, ViewMetadata::new, ViewMetadata::readDiffFrom);
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
registerMetadataCustom(
entries,
Expand Down Expand Up @@ -292,6 +294,7 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DataStreamMetadata::fromXContent
)
);
entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(ViewMetadata.TYPE), ViewMetadata::fromXContent));
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ public Map<String, DataStream> dataStreams() {
.orElse(Collections.emptyMap());
}

public Map<String, View> views() {
return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap());
}

public DecommissionAttributeMetadata decommissionAttributeMetadata() {
return custom(DecommissionAttributeMetadata.TYPE);
}
Expand Down Expand Up @@ -1325,6 +1329,36 @@ public Builder removeDataStream(String name) {
return this;
}

private Map<String, View> getViews() {
return Optional.ofNullable(customs.get(ViewMetadata.TYPE))
.map(o -> (ViewMetadata) o)
.map(vmd -> vmd.views())
.orElse(new HashMap<>());
}

public View view(final String viewName) {
return getViews().get(viewName);
}

public Builder views(final Map<String, View> views) {
this.customs.put(ViewMetadata.TYPE, new ViewMetadata(views));
return this;
}

public Builder put(final View view) {
Objects.requireNonNull(view, "view cannot be null");
final var replacementViews = new HashMap<>(getViews());
replacementViews.put(view.name, view);
return views(replacementViews);
}

public Builder removeView(final String viewName) {
Objects.requireNonNull(viewName, "viewName cannot be null");
final var replacementViews = new HashMap<>(getViews());
replacementViews.remove(viewName);
return views(replacementViews);
}

public Custom getCustom(String type) {
return customs.get(type);
}
Expand Down
Loading
Loading