Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <andy.kwok@improving.com>
  • Loading branch information
andy-k-improving committed Nov 16, 2024
1 parent 8b3fa5b commit 9a3a0ad
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 2 deletions.
37 changes: 37 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id 'java'
id 'maven-publish'
}

group = 'org.opensearch'
version = '3.0.0.0-SNAPSHOT'

repositories {
mavenLocal()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
implementation(project(":${rootProject.name}-common"))

testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
repositories {
mavenLocal()
}
}

test {
useJUnitPlatform()
}
35 changes: 35 additions & 0 deletions client/src/main/java/org/opensearch/IpEnrichmentActionClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.geospatial.action.IpEnrichmentAction;
import org.opensearch.geospatial.action.IpEnrichmentRequest;
import org.opensearch.geospatial.action.IpEnrichmentResponse;

import java.util.concurrent.ExecutionException;

/**
* Proxy for the node client operations.
*/
public class IpEnrichmentActionClient {

NodeClient nodeClient;

public IpEnrichmentActionClient(NodeClient nodeClient) {
this.nodeClient = nodeClient;
}

public String enrichIp(String ipString) throws ExecutionException, InterruptedException {
ActionFuture<ActionResponse> responseActionFuture = nodeClient.execute(IpEnrichmentAction.INSTANCE, new IpEnrichmentRequest(ipString));
ActionResponse genericActionResponse = responseActionFuture.get();
IpEnrichmentResponse enrichmentResponse = IpEnrichmentResponse.fromActionResponse(genericActionResponse);
return enrichmentResponse.getAnswer();
}

}
3 changes: 2 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
plugins {
id 'java'
id 'maven-publish'

}

group = 'org.opensearch'
Expand All @@ -15,6 +14,8 @@ repositories {
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"

testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.action;

import org.opensearch.action.ActionType;
import org.opensearch.core.action.ActionResponse;

public class IpEnrichmentAction extends ActionType<ActionResponse> {


public static final IpEnrichmentAction INSTANCE = new IpEnrichmentAction();

public static final String NAME = "cluster:admin/geospatial/ipenrichment/get";

public IpEnrichmentAction() {
super(NAME, IpEnrichmentResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

public class IpEnrichmentRequest extends ActionRequest {

private String ipString;


public IpEnrichmentRequest() {
}

public IpEnrichmentRequest(String ipString) {
this.ipString = ipString;
}

/**
* Constructor for TransportAction.
* @param streamInput
*/
public IpEnrichmentRequest(StreamInput streamInput) throws IOException {
super(streamInput);
ipString = streamInput.readString();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (ipString == null) {
errors = new ActionRequestValidationException();
errors.addValidationError("ip string should not be null");
}
return errors;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(ipString);
}

public String getIpString() {
return ipString;
}

public static IpEnrichmentRequest fromActionRequest(ActionRequest actionRequest) {
// From the same classloader
if (actionRequest instanceof IpEnrichmentRequest) {
return (IpEnrichmentRequest) actionRequest;
}

// Or else convert it
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionRequest.writeTo(osso);
try (StreamInput input =
new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new IpEnrichmentRequest(input);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionRequest into IpEnrichmentRequest", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.action;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.UUID;


public class IpEnrichmentResponse extends ActionResponse {


private String answer;

public IpEnrichmentResponse(String answer) {
this.answer = answer;
}

public IpEnrichmentResponse(StreamInput streamInput) throws IOException {
super(streamInput);
answer = streamInput.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(answer);
}

public String getAnswer() {
return answer;
}

public static IpEnrichmentResponse fromActionResponse(ActionResponse actionResponse) {
// From the same classloader
if (actionResponse instanceof IpEnrichmentResponse) {
return (IpEnrichmentResponse) actionResponse;
}

// Or else convert it
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionResponse.writeTo(osso);
try (StreamInput input =
new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new IpEnrichmentResponse(input);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionResponse into IpEnrichmentResponse", e);
}
}

@Override
public String toString() {
return "IpEnrichmentResponse{" +
"answer='" + answer + '\'' +
'}';
}
}
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ include ":libs:h3"

include 'common'
project(":common").name = rootProject.name + "-common"
include 'client'
project(":client").name = rootProject.name + "-client"
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.action.model;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.geospatial.action.IpEnrichmentAction;
import org.opensearch.geospatial.action.IpEnrichmentRequest;
import org.opensearch.geospatial.action.IpEnrichmentResponse;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class IpEnrichmentTransportAction extends HandledTransportAction<ActionRequest,
ActionResponse> {


@Inject
public IpEnrichmentTransportAction(
TransportService transportService,
ActionFilters actionFilters) {
super(IpEnrichmentAction.NAME, transportService, actionFilters, IpEnrichmentRequest::new);
}

@Override
protected void doExecute(Task task, ActionRequest request, ActionListener<ActionResponse> listener) {
IpEnrichmentRequest enrichmentRequest = IpEnrichmentRequest.fromActionRequest(request);
listener.onResponse(new IpEnrichmentResponse(enrichmentRequest.getIpString() + " Done!"));
}


// @Override
// protected void doExecute(Task task, ActionRequest request, ActionListener<IpEnrichmentResponse> listener) {
// IpEnrichmentRequest enrichmentRequest = IpEnrichmentRequest.fromActionRequest(request);
// listener.onResponse(new IpEnrichmentResponse(enrichmentRequest.getIpString() + " Done!"));
// }



}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.geospatial.action.IpEnrichmentAction;
import org.opensearch.geospatial.action.model.IpEnrichmentTransportAction;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction;
import org.opensearch.geospatial.index.mapper.xypoint.XYPointFieldMapper;
Expand Down Expand Up @@ -221,11 +223,16 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(UpdateDatasourceAction.INSTANCE, UpdateDatasourceTransportAction.class),
new ActionHandler<>(DeleteDatasourceAction.INSTANCE, DeleteDatasourceTransportAction.class)
);
String testStr = CommonMain.TEST_STR;

// Inter-cluster IP enrichment request
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> ipEnrichmentHandlers = List.of(
new ActionHandler<>(IpEnrichmentAction.INSTANCE, IpEnrichmentTransportAction.class)
);

List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> allHandlers = new ArrayList<>();
allHandlers.addAll(geoJsonHandlers);
allHandlers.addAll(ip2geoHandlers);
allHandlers.addAll(ipEnrichmentHandlers);
return allHandlers;
}

Expand Down

0 comments on commit 9a3a0ad

Please sign in to comment.