From 9a3a0ad4dca1379a08a2acf9c4548c4b5fa3a34f Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Fri, 15 Nov 2024 16:13:39 -0800 Subject: [PATCH] POC Signed-off-by: Andy Kwok --- client/build.gradle | 37 +++++++++ .../opensearch/IpEnrichmentActionClient.java | 35 ++++++++ common/build.gradle | 3 +- .../geospatial/action/IpEnrichmentAction.java | 21 +++++ .../action/IpEnrichmentRequest.java | 79 +++++++++++++++++++ .../action/IpEnrichmentResponse.java | 69 ++++++++++++++++ settings.gradle | 2 + .../model/IpEnrichmentTransportAction.java | 46 +++++++++++ .../geospatial/plugin/GeospatialPlugin.java | 9 ++- 9 files changed, 299 insertions(+), 2 deletions(-) create mode 100644 client/build.gradle create mode 100644 client/src/main/java/org/opensearch/IpEnrichmentActionClient.java create mode 100644 common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentAction.java create mode 100644 common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentRequest.java create mode 100644 common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentResponse.java create mode 100644 src/main/java/org/opensearch/geospatial/action/model/IpEnrichmentTransportAction.java diff --git a/client/build.gradle b/client/build.gradle new file mode 100644 index 0000000000..11cbe0947e --- /dev/null +++ b/client/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'java' + id 'maven-publish' +} + +group = 'org.opensearch' +version = '3.0.0.0-SNAPSHOT' + +repositories { + mavenLocal() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } + mavenCentral() + maven { url "https://plugins.gradle.org/m2/" } +} + +dependencies { + compileOnly "org.opensearch:opensearch:${opensearch_version}" + implementation(project(":${rootProject.name}-common")) + + testImplementation platform('org.junit:junit-bom:5.10.0') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +publishing { + publications { + mavenJava(MavenPublication) { + from components.java + } + } + repositories { + mavenLocal() + } +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/client/src/main/java/org/opensearch/IpEnrichmentActionClient.java b/client/src/main/java/org/opensearch/IpEnrichmentActionClient.java new file mode 100644 index 0000000000..152cbd591e --- /dev/null +++ b/client/src/main/java/org/opensearch/IpEnrichmentActionClient.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.geospatial.action.IpEnrichmentAction; +import org.opensearch.geospatial.action.IpEnrichmentRequest; +import org.opensearch.geospatial.action.IpEnrichmentResponse; + +import java.util.concurrent.ExecutionException; + +/** + * Proxy for the node client operations. + */ +public class IpEnrichmentActionClient { + + NodeClient nodeClient; + + public IpEnrichmentActionClient(NodeClient nodeClient) { + this.nodeClient = nodeClient; + } + + public String enrichIp(String ipString) throws ExecutionException, InterruptedException { + ActionFuture responseActionFuture = nodeClient.execute(IpEnrichmentAction.INSTANCE, new IpEnrichmentRequest(ipString)); + ActionResponse genericActionResponse = responseActionFuture.get(); + IpEnrichmentResponse enrichmentResponse = IpEnrichmentResponse.fromActionResponse(genericActionResponse); + return enrichmentResponse.getAnswer(); + } + +} diff --git a/common/build.gradle b/common/build.gradle index 3fcb256675..6dff39b000 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -1,7 +1,6 @@ plugins { id 'java' id 'maven-publish' - } group = 'org.opensearch' @@ -15,6 +14,8 @@ repositories { } dependencies { + compileOnly "org.opensearch:opensearch:${opensearch_version}" + testImplementation platform('org.junit:junit-bom:5.10.0') testImplementation 'org.junit.jupiter:junit-jupiter' } diff --git a/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentAction.java b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentAction.java new file mode 100644 index 0000000000..16ba321e52 --- /dev/null +++ b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentAction.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.action; + +import org.opensearch.action.ActionType; +import org.opensearch.core.action.ActionResponse; + +public class IpEnrichmentAction extends ActionType { + + + public static final IpEnrichmentAction INSTANCE = new IpEnrichmentAction(); + + public static final String NAME = "cluster:admin/geospatial/ipenrichment/get"; + + public IpEnrichmentAction() { + super(NAME, IpEnrichmentResponse::new); + } +} diff --git a/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentRequest.java b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentRequest.java new file mode 100644 index 0000000000..ff47cd7035 --- /dev/null +++ b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentRequest.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class IpEnrichmentRequest extends ActionRequest { + + private String ipString; + + + public IpEnrichmentRequest() { + } + + public IpEnrichmentRequest(String ipString) { + this.ipString = ipString; + } + + /** + * Constructor for TransportAction. + * @param streamInput + */ + public IpEnrichmentRequest(StreamInput streamInput) throws IOException { + super(streamInput); + ipString = streamInput.readString(); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = null; + if (ipString == null) { + errors = new ActionRequestValidationException(); + errors.addValidationError("ip string should not be null"); + } + return errors; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(ipString); + } + + public String getIpString() { + return ipString; + } + + public static IpEnrichmentRequest fromActionRequest(ActionRequest actionRequest) { + // From the same classloader + if (actionRequest instanceof IpEnrichmentRequest) { + return (IpEnrichmentRequest) actionRequest; + } + + // Or else convert it + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { + actionRequest.writeTo(osso); + try (StreamInput input = + new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { + return new IpEnrichmentRequest(input); + } + } catch (IOException e) { + throw new UncheckedIOException("failed to parse ActionRequest into IpEnrichmentRequest", e); + } + } +} diff --git a/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentResponse.java b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentResponse.java new file mode 100644 index 0000000000..b9f14e9cd1 --- /dev/null +++ b/common/src/main/java/org/opensearch/geospatial/action/IpEnrichmentResponse.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.action; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; + + +public class IpEnrichmentResponse extends ActionResponse { + + + private String answer; + + public IpEnrichmentResponse(String answer) { + this.answer = answer; + } + + public IpEnrichmentResponse(StreamInput streamInput) throws IOException { + super(streamInput); + answer = streamInput.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(answer); + } + + public String getAnswer() { + return answer; + } + + public static IpEnrichmentResponse fromActionResponse(ActionResponse actionResponse) { + // From the same classloader + if (actionResponse instanceof IpEnrichmentResponse) { + return (IpEnrichmentResponse) actionResponse; + } + + // Or else convert it + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { + actionResponse.writeTo(osso); + try (StreamInput input = + new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { + return new IpEnrichmentResponse(input); + } + } catch (IOException e) { + throw new UncheckedIOException("failed to parse ActionResponse into IpEnrichmentResponse", e); + } + } + + @Override + public String toString() { + return "IpEnrichmentResponse{" + + "answer='" + answer + '\'' + + '}'; + } +} diff --git a/settings.gradle b/settings.gradle index 5a76591554..70918f704d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,3 +14,5 @@ include ":libs:h3" include 'common' project(":common").name = rootProject.name + "-common" +include 'client' +project(":client").name = rootProject.name + "-client" diff --git a/src/main/java/org/opensearch/geospatial/action/model/IpEnrichmentTransportAction.java b/src/main/java/org/opensearch/geospatial/action/model/IpEnrichmentTransportAction.java new file mode 100644 index 0000000000..6b254466c0 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/action/model/IpEnrichmentTransportAction.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.action.model; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.geospatial.action.IpEnrichmentAction; +import org.opensearch.geospatial.action.IpEnrichmentRequest; +import org.opensearch.geospatial.action.IpEnrichmentResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class IpEnrichmentTransportAction extends HandledTransportAction { + + + @Inject + public IpEnrichmentTransportAction( + TransportService transportService, + ActionFilters actionFilters) { + super(IpEnrichmentAction.NAME, transportService, actionFilters, IpEnrichmentRequest::new); + } + + @Override + protected void doExecute(Task task, ActionRequest request, ActionListener listener) { + IpEnrichmentRequest enrichmentRequest = IpEnrichmentRequest.fromActionRequest(request); + listener.onResponse(new IpEnrichmentResponse(enrichmentRequest.getIpString() + " Done!")); + } + + +// @Override +// protected void doExecute(Task task, ActionRequest request, ActionListener listener) { +// IpEnrichmentRequest enrichmentRequest = IpEnrichmentRequest.fromActionRequest(request); +// listener.onResponse(new IpEnrichmentResponse(enrichmentRequest.getIpString() + " Done!")); +// } + + + +} diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 68a949b5ac..bfc8de3bd7 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -31,6 +31,8 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.geospatial.action.IpEnrichmentAction; +import org.opensearch.geospatial.action.model.IpEnrichmentTransportAction; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction; import org.opensearch.geospatial.index.mapper.xypoint.XYPointFieldMapper; @@ -221,11 +223,16 @@ public List getRestHandlers( new ActionHandler<>(UpdateDatasourceAction.INSTANCE, UpdateDatasourceTransportAction.class), new ActionHandler<>(DeleteDatasourceAction.INSTANCE, DeleteDatasourceTransportAction.class) ); - String testStr = CommonMain.TEST_STR; + + // Inter-cluster IP enrichment request + List> ipEnrichmentHandlers = List.of( + new ActionHandler<>(IpEnrichmentAction.INSTANCE, IpEnrichmentTransportAction.class) + ); List> allHandlers = new ArrayList<>(); allHandlers.addAll(geoJsonHandlers); allHandlers.addAll(ip2geoHandlers); + allHandlers.addAll(ipEnrichmentHandlers); return allHandlers; }