From 87e357a1dba91059f18b331f660a6af168fc18d5 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Fri, 28 Jun 2024 04:24:54 -0700 Subject: [PATCH 01/25] adding Document and Image messages to S3 via uploader --- pom.xml | 59 ++++++++++++++- src/main/java/com/meta/cp4m/PreProcessor.java | 18 +++++ .../java/com/meta/cp4m/S3PreProcessor.java | 72 +++++++++++++++++++ src/main/java/com/meta/cp4m/Service.java | 18 +++++ .../com/meta/cp4m/ServiceConfiguration.java | 9 ++- .../cp4m/configuration/RootConfiguration.java | 4 +- .../meta/cp4m/message/WAMessageHandler.java | 71 ++++++++++++++++++ .../com/meta/cp4m/S3PreProcessorTest.java | 31 ++++++++ 8 files changed, 279 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/meta/cp4m/PreProcessor.java create mode 100644 src/main/java/com/meta/cp4m/S3PreProcessor.java create mode 100644 src/test/java/com/meta/cp4m/S3PreProcessorTest.java diff --git a/pom.xml b/pom.xml index 90a3a3a..2842a65 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,64 @@ 2.1.1 ${project.artifactId}-${project.version} + + + + software.amazon.awssdk + bom + 2.24.1 + pom + import + + + + + software.amazon.awssdk + s3 + 2.8.3 + + + software.amazon.awssdk + netty-nio-client + + + software.amazon.awssdk + apache-client + + + + + commons-logging + commons-logging + 1.3.1 + + + software.amazon.awssdk + sso + 2.13.15 + + + software.amazon.awssdk + ssooidc + 2.17.94 + + + software.amazon.awssdk + apache-client + 2.25.35 + + + commons-logging + commons-logging + + + + + org.json + json + 20240303 + com.google.guava guava @@ -111,7 +168,7 @@ ai.djl.huggingface tokenizers - 0.23.0 + 0.28.0 diff --git a/src/main/java/com/meta/cp4m/PreProcessor.java b/src/main/java/com/meta/cp4m/PreProcessor.java new file mode 100644 index 0000000..b4b7f27 --- /dev/null +++ b/src/main/java/com/meta/cp4m/PreProcessor.java @@ -0,0 +1,18 @@ +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.cp4m; + +import com.meta.cp4m.message.Message; +import com.meta.cp4m.message.ThreadState; + +@FunctionalInterface +public interface PreProcessor { + + ThreadState run(ThreadState in); +} \ No newline at end of file diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java new file mode 100644 index 0000000..a543001 --- /dev/null +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -0,0 +1,72 @@ +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.cp4m; +import com.meta.cp4m.message.Message; +import com.meta.cp4m.message.Payload; +import com.meta.cp4m.message.ThreadState; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.internal.util.Mimetype; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.*; + +public class S3PreProcessor implements PreProcessor { +// private final String aws_access_key_id; +// private final String aws_secret_access_key; + private S3Client s3Client; + + + @Override + public ThreadState run(ThreadState in) { + // TODO: intercept the last message and redirect to upload it to S3 +// System.out.println(in.tail()); +// System.out.println(in.tail().payload().value()); + this.sendRequest((byte[]) in.tail().payload().value()); + System.out.println("Inside Run"); + return null; + } + + + public S3PreProcessor( +// @JsonProperty("app_secret") String appSecret, +// @JsonProperty("access_token") String accessToken + ) { + AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create( + "testKey", + "testsecret", + "" + ); + + // Create a credentials provider + StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); + + // Create an S3 client using the session credentials + System.out.println("S3 client building..."); + this.s3Client = S3Client.builder() + .region(Region.US_EAST_2) // Specify your region (TODO: might be worth adding this to config) + .credentialsProvider(credentialsProvider) + .build(); + } + + + public void sendRequest(byte[] media) { + String bucket = "cp4m-general-purpose-bucket-1"; + String key = "paper-7"; + System.out.println("Sending request to S3"); +// System.out.println(media); + System.out.println(media.length); + + PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).contentType("application/pdf") + .build(), + RequestBody.fromBytes(media)); + s3Client.close(); + } +} \ No newline at end of file diff --git a/src/main/java/com/meta/cp4m/Service.java b/src/main/java/com/meta/cp4m/Service.java index 3b6074b..6558b9e 100644 --- a/src/main/java/com/meta/cp4m/Service.java +++ b/src/main/java/com/meta/cp4m/Service.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -34,12 +35,23 @@ public class Service { private final Plugin plugin; private final String path; + private final List > preProcessors; public Service(ChatStore store, MessageHandler handler, Plugin plugin, String path) { this.handler = Objects.requireNonNull(handler); this.store = Objects.requireNonNull(store); this.plugin = Objects.requireNonNull(plugin); this.path = Objects.requireNonNull(path); + this.preProcessors = Collections.emptyList(); + } + + public Service( + ChatStore store, MessageHandler handler, Plugin plugin, List> preProcessors, String path) { + this.handler = Objects.requireNonNull(handler); + this.store = Objects.requireNonNull(store); + this.plugin = plugin; + this.path = path; + this.preProcessors = Collections.unmodifiableList(preProcessors); } void handler(Context ctx, IN in, RequestProcessor processor) { @@ -54,6 +66,7 @@ void handler(Context ctx, IN in, RequestProcessor processor) { .setMessage("unable to process request") .setCause(e) .log(); + // something like messages = Collections.emptyList(); needs to go here, look at s3 branch on github throw e; } // TODO: once we have a non-volatile store, on startup send stored but not replied to messages @@ -80,6 +93,11 @@ public Plugin plugin() { } private void execute(ThreadState thread) { + ThreadState preproccessed = thread; + for (PreProcessor preProcessor : preProcessors) { + preproccessed = preProcessor.run(preproccessed); + } + T pluginResponse; try { pluginResponse = plugin.handle(thread); diff --git a/src/main/java/com/meta/cp4m/ServiceConfiguration.java b/src/main/java/com/meta/cp4m/ServiceConfiguration.java index ee6d8ed..ea6aaf6 100644 --- a/src/main/java/com/meta/cp4m/ServiceConfiguration.java +++ b/src/main/java/com/meta/cp4m/ServiceConfiguration.java @@ -19,13 +19,15 @@ public class ServiceConfiguration { private final String handler; private final @Nullable String store; private final String plugin; + private final @Nullable String awsS3; @JsonCreator ServiceConfiguration( @JsonProperty("webhook_path") String webhookPath, @JsonProperty("handler") String handler, @JsonProperty("store") @Nullable String store, - @JsonProperty("plugin") String plugin) { + @JsonProperty("plugin") String plugin, + @JsonProperty("aws_s3") String awsS3) { Preconditions.checkArgument( webhookPath != null && webhookPath.startsWith("/"), "webhook_path must be present and it must start with a forward slash (/)"); @@ -33,6 +35,7 @@ public class ServiceConfiguration { this.handler = Objects.requireNonNull(handler, "handler must be present"); this.store = store; this.plugin = Objects.requireNonNull(plugin, "plugin must be present"); + this.awsS3 = awsS3; } public String webhookPath() { @@ -51,4 +54,8 @@ public String store() { public String plugin() { return plugin; } + + // TODO: optional; if we want to add awss3 as a service + @Nullable + public String awsS3() {return awsS3;} } diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 76a5156..893e9a3 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.meta.cp4m.S3PreProcessor; import com.meta.cp4m.Service; import com.meta.cp4m.ServiceConfiguration; import com.meta.cp4m.ServicesRunner; @@ -24,6 +25,7 @@ import com.meta.cp4m.store.StoreConfig; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -137,7 +139,7 @@ private Service createService( } else { store = new NullStore(); } - return new Service<>(store, handler, plugin, serviceConfig.webhookPath()); + return new Service<>(store, handler, plugin, List.of(new S3PreProcessor<>()), serviceConfig.webhookPath()); } public ServicesRunner toServicesRunner() { diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index 731366d..322744b 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -8,6 +8,13 @@ package com.meta.cp4m.message; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.hc.client5.http.fluent.Content; + import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.json.JsonMapper; @@ -72,6 +79,7 @@ private List> post(Context ctx, WebhookPayload payload) { .forEachOrdered( change -> { Identifier phoneNumberId = change.value().metadata().phoneNumberId(); + //use phone number from above for (WebhookMessage message : change.value().messages()) { if (messageDeduplicator.addAndGetIsDuplicate(message.id())) { continue; // message is a duplicate @@ -79,6 +87,28 @@ private List> post(Context ctx, WebhookPayload payload) { Payload payloadValue; switch (message) { case TextWebhookMessage m -> payloadValue = new Payload.Text(m.text().body()); + case ImageWebhookMessage m -> { + try { + String url = this.getUrlFromID(m.image().id()); + Content media = this.getMediaFromUrl(url); + System.out.println(media.getType()); + payloadValue = new Payload.Image(media.asBytes(), m.image().mimeType()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + case DocumentWebhookMessage m -> { + try { + String url = this.getUrlFromID(m.document().id()); + Content media = this.getMediaFromUrl(url); + System.out.println(media.getType()); + payloadValue = new Payload.Document(media.asBytes(), m.document().mimeType()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + case WelcomeWebhookMessage ignored -> { if (welcomeMessage != null) { WAMessage welcome = @@ -257,4 +287,45 @@ private void markRead(Identifier phoneNumberId, String messageId) { LOGGER.error("unable to mark message as read", e); } } + + private String getUrlFromID(String mediaID) throws IOException { + String graphAPI = "https://graph.facebook.com/v20.0/"; + try { + + Content content = Request.get(graphAPI + mediaID) + .setHeader("Authorization", "Bearer " + accessToken) + .setHeader("appsecret_proof", appSecretProof) + .execute().returnContent(); + + String jsonResponse = content.asString(); + + // Print the JSON response + System.out.println(jsonResponse); + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(jsonResponse); + + System.out.println(jsonNode); + System.out.println("Page Name: " + jsonNode.get("url").asText()); + + return jsonNode.get("url").asText(); + } catch (IOException e) { + LOGGER.error("Unable to retrieve media URL from ID", e); + return ""; + } + } + + private Content getMediaFromUrl(String url) throws IOException { + try { + Content media = Request.get(url) + .setHeader("Authorization", "Bearer " + accessToken) + .setHeader("appsecret_proof", appSecretProof) + .execute().returnContent(); + System.out.println(media.getType()); + return media; + } catch (IOException e) { + LOGGER.error("Unable to fetch media from URL", e); + return new Content(new byte[0], (ContentType) null ); + } + } } diff --git a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java new file mode 100644 index 0000000..d14a5c5 --- /dev/null +++ b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.cp4m; + +import com.meta.cp4m.plugin.DummyPlugin; +import com.meta.cp4m.message.WAMessage; +import com.meta.cp4m.message.WAMessageHandler; +import com.meta.cp4m.message.WAMessengerConfig; +import com.meta.cp4m.store.NullStore; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class S3PreProcessorTest { + + // @Test + void run() { + WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify","", "") //TODO: use credentials from config + .toMessageHandler(); + Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>()), "/whatsapp"); + ServicesRunner.newInstance().service(service).port(8080).start(); + } +} \ No newline at end of file From 793e4951a3c4cee0b45015dc3af388be1f1fced0 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Sun, 30 Jun 2024 21:06:40 -0700 Subject: [PATCH 02/25] fixing test case --- src/main/java/com/meta/cp4m/PreProcessorConfig.java | 2 ++ src/main/java/com/meta/cp4m/S3PreProcessorConfig.java | 2 ++ src/test/java/com/meta/cp4m/S3PreProcessorTest.java | 6 +++--- 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/meta/cp4m/PreProcessorConfig.java create mode 100644 src/main/java/com/meta/cp4m/S3PreProcessorConfig.java diff --git a/src/main/java/com/meta/cp4m/PreProcessorConfig.java b/src/main/java/com/meta/cp4m/PreProcessorConfig.java new file mode 100644 index 0000000..0e11eda --- /dev/null +++ b/src/main/java/com/meta/cp4m/PreProcessorConfig.java @@ -0,0 +1,2 @@ +package com.meta.cp4m;public class PreProcessorConfig { +} diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java new file mode 100644 index 0000000..bff74f6 --- /dev/null +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -0,0 +1,2 @@ +package com.meta.cp4m;public class s3PreProcessorConfig { +} diff --git a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java index d14a5c5..9e5ea56 100644 --- a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java +++ b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java @@ -21,11 +21,11 @@ class S3PreProcessorTest { - // @Test + @Test void run() { - WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify","", "") //TODO: use credentials from config + WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify","SomeSecret", "someToken") //TODO: use credentials from config .toMessageHandler(); - Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>()), "/whatsapp"); + Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion")), "/whatsapp"); ServicesRunner.newInstance().service(service).port(8080).start(); } } \ No newline at end of file From de4c8bb07df6deceb509faf41b8cd8e46d15bfbe Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Sun, 30 Jun 2024 23:03:46 -0700 Subject: [PATCH 03/25] routing service creation through proper way, dynamic config variables, and senderID as media name --- .../com/meta/cp4m/PreProcessorConfig.java | 23 ++++++- .../java/com/meta/cp4m/S3PreProcessor.java | 55 ++++++++-------- .../com/meta/cp4m/S3PreProcessorConfig.java | 63 ++++++++++++++++++- .../com/meta/cp4m/ServiceConfiguration.java | 14 ++--- .../cp4m/configuration/RootConfiguration.java | 25 ++++++-- .../meta/cp4m/message/WAMessageHandler.java | 11 ---- .../com/meta/cp4m/S3PreProcessorTest.java | 2 +- 7 files changed, 139 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/meta/cp4m/PreProcessorConfig.java b/src/main/java/com/meta/cp4m/PreProcessorConfig.java index 0e11eda..e347ca6 100644 --- a/src/main/java/com/meta/cp4m/PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/PreProcessorConfig.java @@ -1,2 +1,23 @@ -package com.meta.cp4m;public class PreProcessorConfig { +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.cp4m; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.meta.cp4m.message.Message; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = S3PreProcessorConfig.class, name = "aws_s3") +}) +public interface PreProcessorConfig { + String name(); + + PreProcessor toPreProcessor(); } diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index a543001..c711e49 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -8,65 +8,64 @@ package com.meta.cp4m; import com.meta.cp4m.message.Message; -import com.meta.cp4m.message.Payload; import com.meta.cp4m.message.ThreadState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.*; +import java.time.Instant; + public class S3PreProcessor implements PreProcessor { -// private final String aws_access_key_id; -// private final String aws_secret_access_key; + private final String awsAccessKeyID; + private final String awsSecretAccessKey; + private final String region; private S3Client s3Client; + private final String bucket; + private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); + @Override public ThreadState run(ThreadState in) { - // TODO: intercept the last message and redirect to upload it to S3 -// System.out.println(in.tail()); -// System.out.println(in.tail().payload().value()); - this.sendRequest((byte[]) in.tail().payload().value()); - System.out.println("Inside Run"); + if(in.tail().payload().getClass().getName().contains("Image") || in.tail().payload().getClass().getName().contains("Document")) { + this.sendRequest((byte[]) in.tail().payload().value(), in.tail().senderId().toString()); + } return null; } + public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String region, String bucket) { + this.awsAccessKeyID = awsAccessKeyID; + this.awsSecretAccessKey = awsSecretAccessKey; + this.region = region; + this.bucket = bucket; - public S3PreProcessor( -// @JsonProperty("app_secret") String appSecret, -// @JsonProperty("access_token") String accessToken - ) { AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create( - "testKey", - "testsecret", + this.awsAccessKeyID, + this.awsSecretAccessKey, "" ); - // Create a credentials provider StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); - - // Create an S3 client using the session credentials - System.out.println("S3 client building..."); this.s3Client = S3Client.builder() - .region(Region.US_EAST_2) // Specify your region (TODO: might be worth adding this to config) + // TODO: Add check to make sure the region is in kebab case + .region(Region.of(this.region)) .credentialsProvider(credentialsProvider) .build(); } - public void sendRequest(byte[] media) { - String bucket = "cp4m-general-purpose-bucket-1"; - String key = "paper-7"; - System.out.println("Sending request to S3"); -// System.out.println(media); - System.out.println(media.length); - - PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).contentType("application/pdf") + public void sendRequest(byte[] media, String senderID) { + String key = senderID + "_" + Instant.now().toEpochMilli(); + PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(this.bucket).key(key).contentType("application/*") .build(), RequestBody.fromBytes(media)); s3Client.close(); + LOGGER.info("Media uploaded to AWS S3"); + } } \ No newline at end of file diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index bff74f6..6fed754 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -1,2 +1,63 @@ -package com.meta.cp4m;public class s3PreProcessorConfig { +/* + * + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.meta.cp4m; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.meta.cp4m.message.Message; +import java.util.Objects; + +public class S3PreProcessorConfig implements PreProcessorConfig{ + private final String name; + private final String awsAccessKeyId; + private final String awsSecretAccessKey; + private final String region; + private final String bucket; + + @JsonCreator + public S3PreProcessorConfig( + @JsonProperty("name") String name, + @JsonProperty("aws_access_key_id") String awsAccessKeyId, + @JsonProperty("aws_secret_access_key") String awsSecretAccessKey, + @JsonProperty("region") String region, + @JsonProperty("bucket") String bucket){ + this.name = Objects.requireNonNull(name, "name is a required parameter"); + this.awsAccessKeyId = Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter"); + this.awsSecretAccessKey = Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter"); + this.region = Objects.requireNonNull(region, "region is a required parameter"); + this.bucket = Objects.requireNonNull(bucket, "bucket is a required parameter"); + } + + @Override + public String name() { + return name; + } + + public String awsAccessKeyId() { + return awsAccessKeyId; + } + + public String awsSecretAccessKey() { + return awsSecretAccessKey; + } + + public String region() { + return region; + } + + public String bucket() { + return bucket; + } + + @Override + public PreProcessor toPreProcessor() { + return new S3PreProcessor<>(awsAccessKeyId, awsSecretAccessKey, region, bucket); + } } + diff --git a/src/main/java/com/meta/cp4m/ServiceConfiguration.java b/src/main/java/com/meta/cp4m/ServiceConfiguration.java index ea6aaf6..7ae14bc 100644 --- a/src/main/java/com/meta/cp4m/ServiceConfiguration.java +++ b/src/main/java/com/meta/cp4m/ServiceConfiguration.java @@ -19,7 +19,7 @@ public class ServiceConfiguration { private final String handler; private final @Nullable String store; private final String plugin; - private final @Nullable String awsS3; + private final @Nullable String preProcessor; @JsonCreator ServiceConfiguration( @@ -27,7 +27,7 @@ public class ServiceConfiguration { @JsonProperty("handler") String handler, @JsonProperty("store") @Nullable String store, @JsonProperty("plugin") String plugin, - @JsonProperty("aws_s3") String awsS3) { + @JsonProperty("pre_processor") @Nullable String preProcessor) { Preconditions.checkArgument( webhookPath != null && webhookPath.startsWith("/"), "webhook_path must be present and it must start with a forward slash (/)"); @@ -35,13 +35,17 @@ public class ServiceConfiguration { this.handler = Objects.requireNonNull(handler, "handler must be present"); this.store = store; this.plugin = Objects.requireNonNull(plugin, "plugin must be present"); - this.awsS3 = awsS3; + this.preProcessor = preProcessor; } public String webhookPath() { return webhookPath; } + public String preProcessor() { + return preProcessor; + } + public String handler() { return handler; } @@ -54,8 +58,4 @@ public String store() { public String plugin() { return plugin; } - - // TODO: optional; if we want to add awss3 as a service - @Nullable - public String awsS3() {return awsS3;} } diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 893e9a3..805809b 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -11,10 +11,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.meta.cp4m.S3PreProcessor; -import com.meta.cp4m.Service; -import com.meta.cp4m.ServiceConfiguration; -import com.meta.cp4m.ServicesRunner; +import com.meta.cp4m.*; import com.meta.cp4m.message.HandlerConfig; import com.meta.cp4m.message.Message; import com.meta.cp4m.message.MessageHandler; @@ -35,6 +32,7 @@ public class RootConfiguration { private final Map plugins; private final Map stores; private final Map handlers; + private final Map preProcessors; private final Collection services; private final int port; @@ -45,12 +43,14 @@ public class RootConfiguration { @JsonProperty("plugins") Collection plugins, @JsonProperty("stores") @Nullable Collection stores, @JsonProperty("handlers") Collection handlers, + @JsonProperty("preProcessors") @Nullable Collection preProcessors, @JsonProperty("services") Collection services, @JsonProperty("port") @Nullable Integer port, @JsonProperty("heartbeat_path") @Nullable String heartbeatPath) { this.port = port == null ? 8080 : port; this.heartbeatPath = heartbeatPath == null ? "/heartbeat" : heartbeatPath; stores = stores == null ? Collections.emptyList() : stores; + preProcessors = preProcessors == null ? Collections.emptyList(): preProcessors; Preconditions.checkArgument( this.port >= 0 && this.port <= 65535, "port must be between 0 and 65535"); @@ -72,6 +72,9 @@ public class RootConfiguration { plugins.stream() .collect(Collectors.toUnmodifiableMap(PluginConfig::name, Function.identity())); + this.preProcessors = preProcessors.stream() + .collect(Collectors.toUnmodifiableMap(PreProcessorConfig::name, Function.identity())); + Preconditions.checkArgument( stores.size() == stores.stream() @@ -110,6 +113,10 @@ Collection plugins() { return Collections.unmodifiableCollection(plugins.values()); } + Collection preProcessors() { + return Collections.unmodifiableCollection(preProcessors.values()); + } + Collection stores() { return Collections.unmodifiableCollection(stores.values()); } @@ -139,7 +146,15 @@ private Service createService( } else { store = new NullStore(); } - return new Service<>(store, handler, plugin, List.of(new S3PreProcessor<>()), serviceConfig.webhookPath()); + + PreProcessor preProcessor; + if(serviceConfig.preProcessor() != null){ + preProcessor = preProcessors.get(serviceConfig.preProcessor()).toPreProcessor(); + return new Service<>(store, handler, plugin, List.of(preProcessor), serviceConfig.webhookPath()); + } else { + // TODO: flow never goes into else. Need to debug why method inherits container annotation + return new Service<>(store, handler, plugin, serviceConfig.webhookPath()); + } } public ServicesRunner toServicesRunner() { diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index 322744b..a8460ef 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -91,7 +91,6 @@ private List> post(Context ctx, WebhookPayload payload) { try { String url = this.getUrlFromID(m.image().id()); Content media = this.getMediaFromUrl(url); - System.out.println(media.getType()); payloadValue = new Payload.Image(media.asBytes(), m.image().mimeType()); } catch (IOException e) { throw new RuntimeException(e); @@ -102,7 +101,6 @@ private List> post(Context ctx, WebhookPayload payload) { try { String url = this.getUrlFromID(m.document().id()); Content media = this.getMediaFromUrl(url); - System.out.println(media.getType()); payloadValue = new Payload.Document(media.asBytes(), m.document().mimeType()); } catch (IOException e) { throw new RuntimeException(e); @@ -298,16 +296,8 @@ private String getUrlFromID(String mediaID) throws IOException { .execute().returnContent(); String jsonResponse = content.asString(); - - // Print the JSON response - System.out.println(jsonResponse); - ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(jsonResponse); - - System.out.println(jsonNode); - System.out.println("Page Name: " + jsonNode.get("url").asText()); - return jsonNode.get("url").asText(); } catch (IOException e) { LOGGER.error("Unable to retrieve media URL from ID", e); @@ -321,7 +311,6 @@ private Content getMediaFromUrl(String url) throws IOException { .setHeader("Authorization", "Bearer " + accessToken) .setHeader("appsecret_proof", appSecretProof) .execute().returnContent(); - System.out.println(media.getType()); return media; } catch (IOException e) { LOGGER.error("Unable to fetch media from URL", e); diff --git a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java index 9e5ea56..56924c4 100644 --- a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java +++ b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java @@ -25,7 +25,7 @@ class S3PreProcessorTest { void run() { WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify","SomeSecret", "someToken") //TODO: use credentials from config .toMessageHandler(); - Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion")), "/whatsapp"); + Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion", "someRegion")), "/whatsapp"); ServicesRunner.newInstance().service(service).port(8080).start(); } } \ No newline at end of file From 73236392a29ff73f098786c5a7c085d6b62fd257 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Sun, 30 Jun 2024 23:09:12 -0700 Subject: [PATCH 04/25] clean up --- src/main/java/com/meta/cp4m/S3PreProcessor.java | 3 --- src/main/java/com/meta/cp4m/Service.java | 1 - src/main/java/com/meta/cp4m/message/WAMessageHandler.java | 1 - 3 files changed, 5 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index c711e49..5693971 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -28,8 +28,6 @@ public class S3PreProcessor implements PreProcessor { private final String bucket; private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); - - @Override public ThreadState run(ThreadState in) { if(in.tail().payload().getClass().getName().contains("Image") || in.tail().payload().getClass().getName().contains("Document")) { @@ -58,7 +56,6 @@ public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String r .build(); } - public void sendRequest(byte[] media, String senderID) { String key = senderID + "_" + Instant.now().toEpochMilli(); PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(this.bucket).key(key).contentType("application/*") diff --git a/src/main/java/com/meta/cp4m/Service.java b/src/main/java/com/meta/cp4m/Service.java index 6558b9e..03598a1 100644 --- a/src/main/java/com/meta/cp4m/Service.java +++ b/src/main/java/com/meta/cp4m/Service.java @@ -66,7 +66,6 @@ void handler(Context ctx, IN in, RequestProcessor processor) { .setMessage("unable to process request") .setCause(e) .log(); - // something like messages = Collections.emptyList(); needs to go here, look at s3 branch on github throw e; } // TODO: once we have a non-volatile store, on startup send stored but not replied to messages diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index a8460ef..5955192 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -79,7 +79,6 @@ private List> post(Context ctx, WebhookPayload payload) { .forEachOrdered( change -> { Identifier phoneNumberId = change.value().metadata().phoneNumberId(); - //use phone number from above for (WebhookMessage message : change.value().messages()) { if (messageDeduplicator.addAndGetIsDuplicate(message.id())) { continue; // message is a duplicate From 0bfacf3cff84926c1bf76e6e8fdf8985fa9d042a Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Mon, 1 Jul 2024 11:19:09 -0700 Subject: [PATCH 05/25] adding extension to media in S3 upload and clean up switchCase before sending to S3 --- .../java/com/meta/cp4m/S3PreProcessor.java | 28 ++++++++++++++----- .../com/meta/cp4m/S3PreProcessorConfig.java | 5 ++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index 5693971..e42bef7 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -8,6 +8,7 @@ package com.meta.cp4m; import com.meta.cp4m.message.Message; +import com.meta.cp4m.message.Payload; import com.meta.cp4m.message.ThreadState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +31,20 @@ public class S3PreProcessor implements PreProcessor { @Override public ThreadState run(ThreadState in) { - if(in.tail().payload().getClass().getName().contains("Image") || in.tail().payload().getClass().getName().contains("Document")) { - this.sendRequest((byte[]) in.tail().payload().value(), in.tail().senderId().toString()); + + switch (in.tail().payload()) { + case Payload.Image i -> { + this.sendRequest(i.value(), in.userId().toString(), i.extension()); + } + case Payload.Document i -> { + this.sendRequest(i.value(), in.userId().toString(), i.extension()); + } + default -> { + System.out.println("Here"); + return in; + } } - return null; + return in; // TODO: remove last message } public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String region, String bucket) { @@ -56,13 +67,16 @@ public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String r .build(); } - public void sendRequest(byte[] media, String senderID) { - String key = senderID + "_" + Instant.now().toEpochMilli(); - PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(this.bucket).key(key).contentType("application/*") + public void sendRequest(byte[] media, String senderID, String extension) { + String key = senderID + + '_' + + Instant.now().toEpochMilli() + + '.' + + extension; + PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(this.bucket).key(key).contentType("application/" + extension) .build(), RequestBody.fromBytes(media)); s3Client.close(); LOGGER.info("Media uploaded to AWS S3"); - } } \ No newline at end of file diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index 6fed754..f186092 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -13,7 +13,7 @@ import com.meta.cp4m.message.Message; import java.util.Objects; -public class S3PreProcessorConfig implements PreProcessorConfig{ +public class S3PreProcessorConfig implements PreProcessorConfig { private final String name; private final String awsAccessKeyId; private final String awsSecretAccessKey; @@ -26,7 +26,7 @@ public S3PreProcessorConfig( @JsonProperty("aws_access_key_id") String awsAccessKeyId, @JsonProperty("aws_secret_access_key") String awsSecretAccessKey, @JsonProperty("region") String region, - @JsonProperty("bucket") String bucket){ + @JsonProperty("bucket") String bucket) { this.name = Objects.requireNonNull(name, "name is a required parameter"); this.awsAccessKeyId = Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter"); this.awsSecretAccessKey = Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter"); @@ -60,4 +60,3 @@ public PreProcessor toPreProcessor() { return new S3PreProcessor<>(awsAccessKeyId, awsSecretAccessKey, region, bucket); } } - From e75bd8ae1af783d855a1ae6d1ccf5c5080a0d771 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Mon, 1 Jul 2024 13:23:42 -0700 Subject: [PATCH 06/25] making preProcessors an array in services of configFile --- .../com/meta/cp4m/ServiceConfiguration.java | 10 ++++----- .../cp4m/configuration/RootConfiguration.java | 21 ++++++++++++------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/meta/cp4m/ServiceConfiguration.java b/src/main/java/com/meta/cp4m/ServiceConfiguration.java index 7ae14bc..44602fb 100644 --- a/src/main/java/com/meta/cp4m/ServiceConfiguration.java +++ b/src/main/java/com/meta/cp4m/ServiceConfiguration.java @@ -19,7 +19,7 @@ public class ServiceConfiguration { private final String handler; private final @Nullable String store; private final String plugin; - private final @Nullable String preProcessor; + private final @Nullable String[] preProcessors; @JsonCreator ServiceConfiguration( @@ -27,7 +27,7 @@ public class ServiceConfiguration { @JsonProperty("handler") String handler, @JsonProperty("store") @Nullable String store, @JsonProperty("plugin") String plugin, - @JsonProperty("pre_processor") @Nullable String preProcessor) { + @JsonProperty("pre_processors") @Nullable String[] preProcessors) { Preconditions.checkArgument( webhookPath != null && webhookPath.startsWith("/"), "webhook_path must be present and it must start with a forward slash (/)"); @@ -35,15 +35,15 @@ public class ServiceConfiguration { this.handler = Objects.requireNonNull(handler, "handler must be present"); this.store = store; this.plugin = Objects.requireNonNull(plugin, "plugin must be present"); - this.preProcessor = preProcessor; + this.preProcessors = preProcessors; } public String webhookPath() { return webhookPath; } - public String preProcessor() { - return preProcessor; + public String[] preProcessors() { + return preProcessors; } public String handler() { diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 805809b..0767bf4 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -20,10 +20,8 @@ import com.meta.cp4m.store.ChatStore; import com.meta.cp4m.store.NullStore; import com.meta.cp4m.store.StoreConfig; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; + +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import org.checkerframework.checker.nullness.qual.Nullable; @@ -43,7 +41,7 @@ public class RootConfiguration { @JsonProperty("plugins") Collection plugins, @JsonProperty("stores") @Nullable Collection stores, @JsonProperty("handlers") Collection handlers, - @JsonProperty("preProcessors") @Nullable Collection preProcessors, + @JsonProperty("pre_processors") @Nullable Collection preProcessors, @JsonProperty("services") Collection services, @JsonProperty("port") @Nullable Integer port, @JsonProperty("heartbeat_path") @Nullable String heartbeatPath) { @@ -148,9 +146,16 @@ private Service createService( } PreProcessor preProcessor; - if(serviceConfig.preProcessor() != null){ - preProcessor = preProcessors.get(serviceConfig.preProcessor()).toPreProcessor(); - return new Service<>(store, handler, plugin, List.of(preProcessor), serviceConfig.webhookPath()); + List> preProcessorsList = new ArrayList<>(); + + if(serviceConfig.preProcessors() != null){ + String[] preProcessorNames = serviceConfig.preProcessors(); + for (String i : preProcessorNames) { + preProcessor = preProcessors.get(i).toPreProcessor(); + preProcessorsList.add(preProcessor); + } + + return new Service<>(store, handler, plugin, preProcessorsList, serviceConfig.webhookPath()); } else { // TODO: flow never goes into else. Need to debug why method inherits container annotation return new Service<>(store, handler, plugin, serviceConfig.webhookPath()); From 72f616a12920091005536f819f3788ba22106e3d Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Mon, 1 Jul 2024 14:59:33 -0700 Subject: [PATCH 07/25] converting S3PreProcessorConfig to record class --- .../com/meta/cp4m/S3PreProcessorConfig.java | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index f186092..7ce4e6e 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -13,13 +13,8 @@ import com.meta.cp4m.message.Message; import java.util.Objects; -public class S3PreProcessorConfig implements PreProcessorConfig { - private final String name; - private final String awsAccessKeyId; - private final String awsSecretAccessKey; - private final String region; - private final String bucket; - +public record S3PreProcessorConfig(String name, String awsAccessKeyId, String awsSecretAccessKey, String region, + String bucket) implements PreProcessorConfig { @JsonCreator public S3PreProcessorConfig( @JsonProperty("name") String name, @@ -34,27 +29,6 @@ public S3PreProcessorConfig( this.bucket = Objects.requireNonNull(bucket, "bucket is a required parameter"); } - @Override - public String name() { - return name; - } - - public String awsAccessKeyId() { - return awsAccessKeyId; - } - - public String awsSecretAccessKey() { - return awsSecretAccessKey; - } - - public String region() { - return region; - } - - public String bucket() { - return bucket; - } - @Override public PreProcessor toPreProcessor() { return new S3PreProcessor<>(awsAccessKeyId, awsSecretAccessKey, region, bucket); From 3448b1a65991aaa39ba103f85ee17d1779bed8a1 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Mon, 1 Jul 2024 15:13:44 -0700 Subject: [PATCH 08/25] format check on aws region parameter to adhere to kebab case --- src/main/java/com/meta/cp4m/S3PreProcessorConfig.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index 7ce4e6e..f20439f 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.meta.cp4m.message.Message; import java.util.Objects; @@ -22,6 +23,11 @@ public S3PreProcessorConfig( @JsonProperty("aws_secret_access_key") String awsSecretAccessKey, @JsonProperty("region") String region, @JsonProperty("bucket") String bucket) { + + String kebabCase = "^[a-z0-9]+(-[a-z0-9]+)*$"; + Preconditions.checkArgument( + bucket.matches(kebabCase), "bucket does not match the aws region format(kebab case) or is empty"); + this.name = Objects.requireNonNull(name, "name is a required parameter"); this.awsAccessKeyId = Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter"); this.awsSecretAccessKey = Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter"); From 7a101bfd36aefd12eea63bc84cbef09a39e880e0 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Mon, 1 Jul 2024 15:15:30 -0700 Subject: [PATCH 09/25] clean up --- src/main/java/com/meta/cp4m/S3PreProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index e42bef7..e30a313 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -61,7 +61,6 @@ public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String r StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); this.s3Client = S3Client.builder() - // TODO: Add check to make sure the region is in kebab case .region(Region.of(this.region)) .credentialsProvider(credentialsProvider) .build(); From 1350aa147da988c530a2370d46e3f8349404a953 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Tue, 2 Jul 2024 13:26:04 -0700 Subject: [PATCH 10/25] more cleanup --- src/main/java/com/meta/cp4m/S3PreProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index e30a313..1d7daaf 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -40,7 +40,6 @@ public ThreadState run(ThreadState in) { this.sendRequest(i.value(), in.userId().toString(), i.extension()); } default -> { - System.out.println("Here"); return in; } } From 66d2dca095bfd02b95ed4669b15e23fae2ebd62a Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Tue, 2 Jul 2024 13:41:51 -0700 Subject: [PATCH 11/25] Readme updates with required config file changes to enable AWS --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 7c2edfe..025af58 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,20 @@ verify_token = "" app_secret = "" access_token = "" +[[pre_processors]] +type = "aws_s3" +name = "aws_s3" +aws_access_key_id="" +aws_secret_access_key="" +region = "" +bucket = "" + [[services]] webhook_path = "/" plugin = "openai_test" store = "memory_test" handler = "whatsapp_test" +pre_processors =["aws_s3"] ``` ##### Example: Messenger & Llama 2 (via Hugging Face) From 581f84144a8053b30ff9b621ff398281165e8f44 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Tue, 2 Jul 2024 14:40:05 -0700 Subject: [PATCH 12/25] Fix s3 connection open/close timing --- .../java/com/meta/cp4m/S3PreProcessor.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index 1d7daaf..b22edc1 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -27,6 +27,7 @@ public class S3PreProcessor implements PreProcessor { private final String region; private S3Client s3Client; private final String bucket; + private final StaticCredentialsProvider credentialsProvider; private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); @Override @@ -58,11 +59,7 @@ public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String r "" ); - StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); - this.s3Client = S3Client.builder() - .region(Region.of(this.region)) - .credentialsProvider(credentialsProvider) - .build(); + this.credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); } public void sendRequest(byte[] media, String senderID, String extension) { @@ -71,10 +68,23 @@ public void sendRequest(byte[] media, String senderID, String extension) { Instant.now().toEpochMilli() + '.' + extension; - PutObjectResponse res = s3Client.putObject(PutObjectRequest.builder().bucket(this.bucket).key(key).contentType("application/" + extension) - .build(), - RequestBody.fromBytes(media)); - s3Client.close(); - LOGGER.info("Media uploaded to AWS S3"); + + try (S3Client s3Client = S3Client.builder() + .region(Region.of(this.region)) + .credentialsProvider(this.credentialsProvider) + .build()) { + + PutObjectRequest request = PutObjectRequest + .builder() + .bucket(this.bucket) + .key(key) + .contentType("application/" + extension) + .build(); + s3Client.putObject(request, RequestBody.fromBytes(media)); + s3Client.close(); + LOGGER.info("Media upload to AWS S3 successful"); + } catch (Exception e) { + LOGGER.warn("Media upload to AWS S3 failed, {e}", e); + } } } \ No newline at end of file From 47ac1d3eba345e4cb23696722d7bf126ec9f5053 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Tue, 2 Jul 2024 14:46:52 -0700 Subject: [PATCH 13/25] clean up --- src/main/java/com/meta/cp4m/S3PreProcessor.java | 5 +++-- src/main/java/com/meta/cp4m/S3PreProcessorConfig.java | 1 + src/test/java/com/meta/cp4m/S3PreProcessorTest.java | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index b22edc1..525a97a 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -7,6 +7,7 @@ */ package com.meta.cp4m; + import com.meta.cp4m.message.Message; import com.meta.cp4m.message.Payload; import com.meta.cp4m.message.ThreadState; @@ -34,14 +35,14 @@ public class S3PreProcessor implements PreProcessor { public ThreadState run(ThreadState in) { switch (in.tail().payload()) { - case Payload.Image i -> { + case Payload.Image i -> { this.sendRequest(i.value(), in.userId().toString(), i.extension()); } case Payload.Document i -> { this.sendRequest(i.value(), in.userId().toString(), i.extension()); } default -> { - return in; + return in; } } return in; // TODO: remove last message diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index f20439f..19568e1 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.meta.cp4m.message.Message; + import java.util.Objects; public record S3PreProcessorConfig(String name, String awsAccessKeyId, String awsSecretAccessKey, String region, diff --git a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java index 56924c4..795faa1 100644 --- a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java +++ b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java @@ -23,9 +23,11 @@ class S3PreProcessorTest { @Test void run() { - WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify","SomeSecret", "someToken") //TODO: use credentials from config + WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify", "SomeSecret", "someToken") .toMessageHandler(); - Service service = new Service<>(new NullStore<>(), waMessageHandler, new DummyPlugin<>("dummy"), List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion", "someRegion")), "/whatsapp"); + Service service = new Service<>(new NullStore<>(), waMessageHandler, + new DummyPlugin<>("dummy"), + List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion", "someRegion")), "/whatsapp"); ServicesRunner.newInstance().service(service).port(8080).start(); } } \ No newline at end of file From dce394e25a36a6ced948f3526626abe9100b7d7d Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 13:30:46 -0700 Subject: [PATCH 14/25] String array to List --- src/main/java/com/meta/cp4m/ServiceConfiguration.java | 8 +++++--- .../com/meta/cp4m/configuration/RootConfiguration.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/meta/cp4m/ServiceConfiguration.java b/src/main/java/com/meta/cp4m/ServiceConfiguration.java index 44602fb..4f35489 100644 --- a/src/main/java/com/meta/cp4m/ServiceConfiguration.java +++ b/src/main/java/com/meta/cp4m/ServiceConfiguration.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; import java.util.Objects; public class ServiceConfiguration { @@ -19,7 +21,7 @@ public class ServiceConfiguration { private final String handler; private final @Nullable String store; private final String plugin; - private final @Nullable String[] preProcessors; + private final @Nullable List preProcessors; @JsonCreator ServiceConfiguration( @@ -27,7 +29,7 @@ public class ServiceConfiguration { @JsonProperty("handler") String handler, @JsonProperty("store") @Nullable String store, @JsonProperty("plugin") String plugin, - @JsonProperty("pre_processors") @Nullable String[] preProcessors) { + @JsonProperty("pre_processors") @Nullable List preProcessors) { Preconditions.checkArgument( webhookPath != null && webhookPath.startsWith("/"), "webhook_path must be present and it must start with a forward slash (/)"); @@ -42,7 +44,7 @@ public String webhookPath() { return webhookPath; } - public String[] preProcessors() { + public List preProcessors() { return preProcessors; } diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 0767bf4..fb13080 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -149,7 +149,7 @@ private Service createService( List> preProcessorsList = new ArrayList<>(); if(serviceConfig.preProcessors() != null){ - String[] preProcessorNames = serviceConfig.preProcessors(); + List preProcessorNames = serviceConfig.preProcessors(); for (String i : preProcessorNames) { preProcessor = preProcessors.get(i).toPreProcessor(); preProcessorsList.add(preProcessor); From 9315e1a577f02f923f0ebe7008ba62e42b9b8e45 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 13:32:00 -0700 Subject: [PATCH 15/25] empty preprocessor case handled --- src/main/java/com/meta/cp4m/ServiceConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/meta/cp4m/ServiceConfiguration.java b/src/main/java/com/meta/cp4m/ServiceConfiguration.java index 4f35489..6851285 100644 --- a/src/main/java/com/meta/cp4m/ServiceConfiguration.java +++ b/src/main/java/com/meta/cp4m/ServiceConfiguration.java @@ -13,6 +13,7 @@ import com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -37,7 +38,7 @@ public class ServiceConfiguration { this.handler = Objects.requireNonNull(handler, "handler must be present"); this.store = store; this.plugin = Objects.requireNonNull(plugin, "plugin must be present"); - this.preProcessors = preProcessors; + this.preProcessors = preProcessors == null ? Collections.emptyList() : preProcessors; } public String webhookPath() { From 4de00a6ea0229dfc313c4b70c0059c257bb93754 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 13:53:05 -0700 Subject: [PATCH 16/25] url builder --- .../com/meta/cp4m/message/WAMessageHandler.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index 5955192..c1cff28 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -286,10 +286,8 @@ private void markRead(Identifier phoneNumberId, String messageId) { } private String getUrlFromID(String mediaID) throws IOException { - String graphAPI = "https://graph.facebook.com/v20.0/"; try { - - Content content = Request.get(graphAPI + mediaID) + Content content = Request.get(new URIBuilder(this.baseURL).appendPath(mediaID).build()) .setHeader("Authorization", "Bearer " + accessToken) .setHeader("appsecret_proof", appSecretProof) .execute().returnContent(); @@ -298,22 +296,19 @@ private String getUrlFromID(String mediaID) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(jsonResponse); return jsonNode.get("url").asText(); - } catch (IOException e) { - LOGGER.error("Unable to retrieve media URL from ID", e); - return ""; + } catch (URISyntaxException e) { + throw new RuntimeException(e); } } private Content getMediaFromUrl(String url) throws IOException { try { - Content media = Request.get(url) + return Request.get(url) .setHeader("Authorization", "Bearer " + accessToken) .setHeader("appsecret_proof", appSecretProof) .execute().returnContent(); - return media; } catch (IOException e) { - LOGGER.error("Unable to fetch media from URL", e); - return new Content(new byte[0], (ContentType) null ); + throw new RuntimeException(e); } } } From cbb097a5609cc7eaa537ee0c47580c40b7baf12c Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 13:57:57 -0700 Subject: [PATCH 17/25] Mapper --- src/main/java/com/meta/cp4m/message/WAMessageHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index c1cff28..d5a81b3 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -293,8 +293,7 @@ private String getUrlFromID(String mediaID) throws IOException { .execute().returnContent(); String jsonResponse = content.asString(); - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.readTree(jsonResponse); + JsonNode jsonNode = MAPPER.readTree(jsonResponse); return jsonNode.get("url").asText(); } catch (URISyntaxException e) { throw new RuntimeException(e); From a61090ac97c85290a90652afcacaf6233120d711 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 14:48:21 -0700 Subject: [PATCH 18/25] handleResponse usage --- .../meta/cp4m/message/WAMessageHandler.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index d5a81b3..6433e5e 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import org.apache.hc.client5.http.fluent.Request; import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.net.URIBuilder; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.common.returnsreceiver.qual.This; @@ -91,7 +92,7 @@ private List> post(Context ctx, WebhookPayload payload) { String url = this.getUrlFromID(m.image().id()); Content media = this.getMediaFromUrl(url); payloadValue = new Payload.Image(media.asBytes(), m.image().mimeType()); - } catch (IOException e) { + } catch (IOException | URISyntaxException e) { throw new RuntimeException(e); } } @@ -101,7 +102,7 @@ private List> post(Context ctx, WebhookPayload payload) { String url = this.getUrlFromID(m.document().id()); Content media = this.getMediaFromUrl(url); payloadValue = new Payload.Document(media.asBytes(), m.document().mimeType()); - } catch (IOException e) { + } catch (IOException | URISyntaxException e) { throw new RuntimeException(e); } } @@ -285,20 +286,20 @@ private void markRead(Identifier phoneNumberId, String messageId) { } } - private String getUrlFromID(String mediaID) throws IOException { - try { - Content content = Request.get(new URIBuilder(this.baseURL).appendPath(mediaID).build()) - .setHeader("Authorization", "Bearer " + accessToken) - .setHeader("appsecret_proof", appSecretProof) - .execute().returnContent(); - - String jsonResponse = content.asString(); - JsonNode jsonNode = MAPPER.readTree(jsonResponse); - return jsonNode.get("url").asText(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } + private String getUrlFromID(String mediaID) throws IOException, URISyntaxException { + return Request.get(new URIBuilder(this.baseURL).appendPath(mediaID).build()) + .setHeader("Authorization", "Bearer " + accessToken) + .setHeader("appsecret_proof", appSecretProof) + .execute().handleResponse(response -> { + try { + String jsonResponse = EntityUtils.toString(response.getEntity()); + JsonNode jsonNode = MAPPER.readTree(jsonResponse); + return jsonNode.get("url").asText(); + } catch (RuntimeException e) { + throw new RuntimeException(e); + } + }); + } private Content getMediaFromUrl(String url) throws IOException { try { From 7d0fa5979212381e983098f8444eca4bda1c430a Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 15:04:57 -0700 Subject: [PATCH 19/25] check empty isntead of null --- .../java/com/meta/cp4m/configuration/RootConfiguration.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index fb13080..9c49272 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -148,7 +148,7 @@ private Service createService( PreProcessor preProcessor; List> preProcessorsList = new ArrayList<>(); - if(serviceConfig.preProcessors() != null){ + if(!serviceConfig.preProcessors().isEmpty()){ List preProcessorNames = serviceConfig.preProcessors(); for (String i : preProcessorNames) { preProcessor = preProcessors.get(i).toPreProcessor(); @@ -157,7 +157,6 @@ private Service createService( return new Service<>(store, handler, plugin, preProcessorsList, serviceConfig.webhookPath()); } else { - // TODO: flow never goes into else. Need to debug why method inherits container annotation return new Service<>(store, handler, plugin, serviceConfig.webhookPath()); } } From dbcb7c2421756b545dcf88171af67881cf696b0c Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Wed, 3 Jul 2024 16:49:40 -0700 Subject: [PATCH 20/25] s3 close --- src/main/java/com/meta/cp4m/S3PreProcessor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index 525a97a..a7f0ebd 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -26,7 +26,6 @@ public class S3PreProcessor implements PreProcessor { private final String awsAccessKeyID; private final String awsSecretAccessKey; private final String region; - private S3Client s3Client; private final String bucket; private final StaticCredentialsProvider credentialsProvider; private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); @@ -82,7 +81,6 @@ public void sendRequest(byte[] media, String senderID, String extension) { .contentType("application/" + extension) .build(); s3Client.putObject(request, RequestBody.fromBytes(media)); - s3Client.close(); LOGGER.info("Media upload to AWS S3 successful"); } catch (Exception e) { LOGGER.warn("Media upload to AWS S3 failed, {e}", e); From f72e795c6d2340787d9b9a368910c4ced04b1497 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Thu, 4 Jul 2024 13:27:42 -0700 Subject: [PATCH 21/25] empty preProcessor check removed | URL type changed from String to URI --- .../cp4m/configuration/RootConfiguration.java | 17 ++++++++--------- .../com/meta/cp4m/message/WAMessageHandler.java | 14 +++++++------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 9c49272..e20e34a 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -41,14 +41,13 @@ public class RootConfiguration { @JsonProperty("plugins") Collection plugins, @JsonProperty("stores") @Nullable Collection stores, @JsonProperty("handlers") Collection handlers, - @JsonProperty("pre_processors") @Nullable Collection preProcessors, + @JsonProperty("pre_processors") Collection preProcessors, @JsonProperty("services") Collection services, @JsonProperty("port") @Nullable Integer port, @JsonProperty("heartbeat_path") @Nullable String heartbeatPath) { this.port = port == null ? 8080 : port; this.heartbeatPath = heartbeatPath == null ? "/heartbeat" : heartbeatPath; stores = stores == null ? Collections.emptyList() : stores; - preProcessors = preProcessors == null ? Collections.emptyList(): preProcessors; Preconditions.checkArgument( this.port >= 0 && this.port <= 65535, "port must be between 0 and 65535"); @@ -148,17 +147,17 @@ private Service createService( PreProcessor preProcessor; List> preProcessorsList = new ArrayList<>(); - if(!serviceConfig.preProcessors().isEmpty()){ - List preProcessorNames = serviceConfig.preProcessors(); - for (String i : preProcessorNames) { + List preProcessorNames = serviceConfig.preProcessors(); + for (String i : preProcessorNames) { + try { preProcessor = preProcessors.get(i).toPreProcessor(); preProcessorsList.add(preProcessor); + } catch (NullPointerException | NoSuchElementException e) { + throw new RuntimeException(e); } - - return new Service<>(store, handler, plugin, preProcessorsList, serviceConfig.webhookPath()); - } else { - return new Service<>(store, handler, plugin, serviceConfig.webhookPath()); } + + return new Service<>(store, handler, plugin, preProcessorsList, serviceConfig.webhookPath()); } public ServicesRunner toServicesRunner() { diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index 6433e5e..948b3e1 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -89,7 +89,7 @@ private List> post(Context ctx, WebhookPayload payload) { case TextWebhookMessage m -> payloadValue = new Payload.Text(m.text().body()); case ImageWebhookMessage m -> { try { - String url = this.getUrlFromID(m.image().id()); + URI url = this.getUrlFromID(m.image().id()); Content media = this.getMediaFromUrl(url); payloadValue = new Payload.Image(media.asBytes(), m.image().mimeType()); } catch (IOException | URISyntaxException e) { @@ -99,7 +99,7 @@ private List> post(Context ctx, WebhookPayload payload) { case DocumentWebhookMessage m -> { try { - String url = this.getUrlFromID(m.document().id()); + URI url = this.getUrlFromID(m.document().id()); Content media = this.getMediaFromUrl(url); payloadValue = new Payload.Document(media.asBytes(), m.document().mimeType()); } catch (IOException | URISyntaxException e) { @@ -286,7 +286,7 @@ private void markRead(Identifier phoneNumberId, String messageId) { } } - private String getUrlFromID(String mediaID) throws IOException, URISyntaxException { + private URI getUrlFromID(String mediaID) throws IOException, URISyntaxException { return Request.get(new URIBuilder(this.baseURL).appendPath(mediaID).build()) .setHeader("Authorization", "Bearer " + accessToken) .setHeader("appsecret_proof", appSecretProof) @@ -294,14 +294,14 @@ private String getUrlFromID(String mediaID) throws IOException, URISyntaxExcepti try { String jsonResponse = EntityUtils.toString(response.getEntity()); JsonNode jsonNode = MAPPER.readTree(jsonResponse); - return jsonNode.get("url").asText(); - } catch (RuntimeException e) { + return new URIBuilder(jsonNode.get("url").asText()); + } catch (URISyntaxException e) { throw new RuntimeException(e); } - }); + }).build(); } - private Content getMediaFromUrl(String url) throws IOException { + private Content getMediaFromUrl(URI url) throws IOException { try { return Request.get(url) .setHeader("Authorization", "Bearer " + accessToken) From f7467fb0e2241497166ed975835b54f695a94604 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Thu, 4 Jul 2024 14:51:15 -0700 Subject: [PATCH 22/25] using responseHandler --- .../meta/cp4m/message/WAMessageHandler.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java index 948b3e1..5f3ae26 100644 --- a/src/main/java/com/meta/cp4m/message/WAMessageHandler.java +++ b/src/main/java/com/meta/cp4m/message/WAMessageHandler.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.hc.client5.http.fluent.Content; @@ -90,8 +89,8 @@ private List> post(Context ctx, WebhookPayload payload) { case ImageWebhookMessage m -> { try { URI url = this.getUrlFromID(m.image().id()); - Content media = this.getMediaFromUrl(url); - payloadValue = new Payload.Image(media.asBytes(), m.image().mimeType()); + byte[] media = this.getMediaFromUrl(url); + payloadValue = new Payload.Image(media, m.image().mimeType()); } catch (IOException | URISyntaxException e) { throw new RuntimeException(e); } @@ -100,8 +99,8 @@ private List> post(Context ctx, WebhookPayload payload) { case DocumentWebhookMessage m -> { try { URI url = this.getUrlFromID(m.document().id()); - Content media = this.getMediaFromUrl(url); - payloadValue = new Payload.Document(media.asBytes(), m.document().mimeType()); + byte[] media = this.getMediaFromUrl(url); + payloadValue = new Payload.Document(media, m.document().mimeType()); } catch (IOException | URISyntaxException e) { throw new RuntimeException(e); } @@ -301,14 +300,16 @@ private URI getUrlFromID(String mediaID) throws IOException, URISyntaxException }).build(); } - private Content getMediaFromUrl(URI url) throws IOException { - try { - return Request.get(url) - .setHeader("Authorization", "Bearer " + accessToken) - .setHeader("appsecret_proof", appSecretProof) - .execute().returnContent(); - } catch (IOException e) { - throw new RuntimeException(e); - } + private byte[] getMediaFromUrl(URI url) throws IOException { + return Request.get(url) + .setHeader("Authorization", "Bearer " + accessToken) + .setHeader("appsecret_proof", appSecretProof) + .execute().handleResponse(response -> { + try { + return EntityUtils.toByteArray(response.getEntity()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } } From 1a2887e23c83bdce4839712a8ecf336d094622f3 Mon Sep 17 00:00:00 2001 From: Sachin Karve Date: Thu, 4 Jul 2024 15:02:19 -0700 Subject: [PATCH 23/25] reinstate check --- src/main/java/com/meta/cp4m/configuration/RootConfiguration.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index e20e34a..4ce13f7 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -48,6 +48,7 @@ public class RootConfiguration { this.port = port == null ? 8080 : port; this.heartbeatPath = heartbeatPath == null ? "/heartbeat" : heartbeatPath; stores = stores == null ? Collections.emptyList() : stores; + preProcessors = preProcessors == null ? Collections.emptyList(): preProcessors; Preconditions.checkArgument( this.port >= 0 && this.port <= 65535, "port must be between 0 and 65535"); From 0d744f00746dbad32220eae7773e938c9fc128f0 Mon Sep 17 00:00:00 2001 From: Hunter Jackson Date: Fri, 5 Jul 2024 10:21:48 -0400 Subject: [PATCH 24/25] add an optionally text message to the end of a threadstate when a image or document is uploaded --- .../java/com/meta/cp4m/S3PreProcessor.java | 122 ++++++++++-------- .../com/meta/cp4m/S3PreProcessorConfig.java | 67 ++++++---- .../com/meta/cp4m/S3PreProcessorTest.java | 23 ++-- 3 files changed, 120 insertions(+), 92 deletions(-) diff --git a/src/main/java/com/meta/cp4m/S3PreProcessor.java b/src/main/java/com/meta/cp4m/S3PreProcessor.java index a7f0ebd..6f1b267 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessor.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessor.java @@ -11,79 +11,87 @@ import com.meta.cp4m.message.Message; import com.meta.cp4m.message.Payload; import com.meta.cp4m.message.ThreadState; +import java.time.Instant; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.*; -import java.time.Instant; - public class S3PreProcessor implements PreProcessor { - private final String awsAccessKeyID; - private final String awsSecretAccessKey; - private final String region; - private final String bucket; - private final StaticCredentialsProvider credentialsProvider; - private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class); + private final String awsAccessKeyID; + private final String awsSecretAccessKey; + private final String region; + private final String bucket; + private final @Nullable String textMessageAddition; + private final StaticCredentialsProvider credentialsProvider; - @Override - public ThreadState run(ThreadState in) { + public S3PreProcessor( + String awsAccessKeyID, + String awsSecretAccessKey, + String region, + String bucket, + @Nullable String textMessageAddition) { + this.awsAccessKeyID = awsAccessKeyID; + this.awsSecretAccessKey = awsSecretAccessKey; + this.region = region; + this.bucket = bucket; + this.textMessageAddition = textMessageAddition; - switch (in.tail().payload()) { - case Payload.Image i -> { - this.sendRequest(i.value(), in.userId().toString(), i.extension()); - } - case Payload.Document i -> { - this.sendRequest(i.value(), in.userId().toString(), i.extension()); - } - default -> { - return in; - } - } - return in; // TODO: remove last message - } + AwsSessionCredentials sessionCredentials = + AwsSessionCredentials.create(this.awsAccessKeyID, this.awsSecretAccessKey, ""); - public S3PreProcessor(String awsAccessKeyID, String awsSecretAccessKey, String region, String bucket) { - this.awsAccessKeyID = awsAccessKeyID; - this.awsSecretAccessKey = awsSecretAccessKey; - this.region = region; - this.bucket = bucket; + this.credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); + } - AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create( - this.awsAccessKeyID, - this.awsSecretAccessKey, - "" - ); + @Override + public ThreadState run(ThreadState in) { - this.credentialsProvider = StaticCredentialsProvider.create(sessionCredentials); + switch (in.tail().payload()) { + case Payload.Image i -> { + this.sendRequest(i.value(), in.userId().toString(), i.extension()); + } + case Payload.Document i -> { + this.sendRequest(i.value(), in.userId().toString(), i.extension()); + } + default -> { + return in; + } } - public void sendRequest(byte[] media, String senderID, String extension) { - String key = senderID + - '_' + - Instant.now().toEpochMilli() + - '.' + - extension; + return textMessageAddition == null + ? in + : in.with( + in.newMessageFromUser( + Instant.now(), + textMessageAddition, + Identifier.random())); // TODO: remove last message + } + + public void sendRequest(byte[] media, String senderID, String extension) { + String key = senderID + '_' + Instant.now().toEpochMilli() + '.' + extension; - try (S3Client s3Client = S3Client.builder() - .region(Region.of(this.region)) - .credentialsProvider(this.credentialsProvider) - .build()) { + try (S3Client s3Client = + S3Client.builder() + .region(Region.of(this.region)) + .credentialsProvider(this.credentialsProvider) + .build()) { - PutObjectRequest request = PutObjectRequest - .builder() - .bucket(this.bucket) - .key(key) - .contentType("application/" + extension) - .build(); - s3Client.putObject(request, RequestBody.fromBytes(media)); - LOGGER.info("Media upload to AWS S3 successful"); - } catch (Exception e) { - LOGGER.warn("Media upload to AWS S3 failed, {e}", e); - } + PutObjectRequest request = + PutObjectRequest.builder() + .bucket(this.bucket) + .key(key) + .contentType("application/" + extension) + .build(); + s3Client.putObject(request, RequestBody.fromBytes(media)); + LOGGER.info("Media upload to AWS S3 successful"); + } catch (Exception e) { + LOGGER.warn("Media upload to AWS S3 failed, {e}", e); } -} \ No newline at end of file + } +} diff --git a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java index 19568e1..2888711 100644 --- a/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java +++ b/src/main/java/com/meta/cp4m/S3PreProcessorConfig.java @@ -12,32 +12,47 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.meta.cp4m.message.Message; - import java.util.Objects; +import java.util.regex.Pattern; +import org.checkerframework.checker.nullness.qual.Nullable; + +public record S3PreProcessorConfig( + String name, + String awsAccessKeyId, + String awsSecretAccessKey, + String region, + String bucket, + @Nullable String textMessageAddition) + implements PreProcessorConfig { + + private static final Pattern KEBAB_CASE = Pattern.compile("^[a-z0-9]+(-[a-z0-9]+)*$"); + + @JsonCreator + public S3PreProcessorConfig( + @JsonProperty("name") String name, + @JsonProperty("aws_access_key_id") String awsAccessKeyId, + @JsonProperty("aws_secret_access_key") String awsSecretAccessKey, + @JsonProperty("region") String region, + @JsonProperty("bucket") String bucket, + @JsonProperty("text_message_addition") @Nullable String textMessageAddition) { + + Preconditions.checkArgument( + KEBAB_CASE.matcher(bucket).matches(), + "bucket does not match the aws region format(kebab case) or is empty"); + + this.name = Objects.requireNonNull(name, "name is a required parameter"); + this.awsAccessKeyId = + Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter"); + this.awsSecretAccessKey = + Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter"); + this.region = Objects.requireNonNull(region, "region is a required parameter"); + this.bucket = Objects.requireNonNull(bucket, "bucket is a required parameter"); + this.textMessageAddition = textMessageAddition; + } -public record S3PreProcessorConfig(String name, String awsAccessKeyId, String awsSecretAccessKey, String region, - String bucket) implements PreProcessorConfig { - @JsonCreator - public S3PreProcessorConfig( - @JsonProperty("name") String name, - @JsonProperty("aws_access_key_id") String awsAccessKeyId, - @JsonProperty("aws_secret_access_key") String awsSecretAccessKey, - @JsonProperty("region") String region, - @JsonProperty("bucket") String bucket) { - - String kebabCase = "^[a-z0-9]+(-[a-z0-9]+)*$"; - Preconditions.checkArgument( - bucket.matches(kebabCase), "bucket does not match the aws region format(kebab case) or is empty"); - - this.name = Objects.requireNonNull(name, "name is a required parameter"); - this.awsAccessKeyId = Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter"); - this.awsSecretAccessKey = Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter"); - this.region = Objects.requireNonNull(region, "region is a required parameter"); - this.bucket = Objects.requireNonNull(bucket, "bucket is a required parameter"); - } - - @Override - public PreProcessor toPreProcessor() { - return new S3PreProcessor<>(awsAccessKeyId, awsSecretAccessKey, region, bucket); - } + @Override + public PreProcessor toPreProcessor() { + return new S3PreProcessor<>( + awsAccessKeyId, awsSecretAccessKey, region, bucket, textMessageAddition); + } } diff --git a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java index 795faa1..d51351d 100644 --- a/src/test/java/com/meta/cp4m/S3PreProcessorTest.java +++ b/src/test/java/com/meta/cp4m/S3PreProcessorTest.java @@ -8,16 +8,15 @@ package com.meta.cp4m; -import com.meta.cp4m.plugin.DummyPlugin; +import static org.junit.jupiter.api.Assertions.*; + import com.meta.cp4m.message.WAMessage; import com.meta.cp4m.message.WAMessageHandler; import com.meta.cp4m.message.WAMessengerConfig; +import com.meta.cp4m.plugin.DummyPlugin; import com.meta.cp4m.store.NullStore; -import org.junit.jupiter.api.Test; - import java.util.List; - -import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; class S3PreProcessorTest { @@ -25,9 +24,15 @@ class S3PreProcessorTest { void run() { WAMessageHandler waMessageHandler = WAMessengerConfig.of("verify", "SomeSecret", "someToken") .toMessageHandler(); - Service service = new Service<>(new NullStore<>(), waMessageHandler, - new DummyPlugin<>("dummy"), - List.of(new S3PreProcessor<>("someAccessKey", "someSecretKey", "someRegion", "someRegion")), "/whatsapp"); + Service service = + new Service<>( + new NullStore<>(), + waMessageHandler, + new DummyPlugin<>("dummy"), + List.of( + new S3PreProcessor<>( + "someAccessKey", "someSecretKey", "someRegion", "someRegion", null)), + "/whatsapp"); ServicesRunner.newInstance().service(service).port(8080).start(); } -} \ No newline at end of file +} From 68291f945049775be6ffcd2239068f2e5dc32a26 Mon Sep 17 00:00:00 2001 From: Hunter Jackson Date: Fri, 5 Jul 2024 10:22:28 -0400 Subject: [PATCH 25/25] cleanup preprocessor checks in configuration --- .../cp4m/configuration/RootConfiguration.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java index 4ce13f7..a1a9aee 100644 --- a/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java +++ b/src/main/java/com/meta/cp4m/configuration/RootConfiguration.java @@ -20,7 +20,6 @@ import com.meta.cp4m.store.ChatStore; import com.meta.cp4m.store.NullStore; import com.meta.cp4m.store.StoreConfig; - import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -48,7 +47,7 @@ public class RootConfiguration { this.port = port == null ? 8080 : port; this.heartbeatPath = heartbeatPath == null ? "/heartbeat" : heartbeatPath; stores = stores == null ? Collections.emptyList() : stores; - preProcessors = preProcessors == null ? Collections.emptyList(): preProcessors; + preProcessors = preProcessors == null ? Collections.emptyList() : preProcessors; Preconditions.checkArgument( this.port >= 0 && this.port <= 65535, "port must be between 0 and 65535"); @@ -70,7 +69,8 @@ public class RootConfiguration { plugins.stream() .collect(Collectors.toUnmodifiableMap(PluginConfig::name, Function.identity())); - this.preProcessors = preProcessors.stream() + this.preProcessors = + preProcessors.stream() .collect(Collectors.toUnmodifiableMap(PreProcessorConfig::name, Function.identity())); Preconditions.checkArgument( @@ -103,6 +103,11 @@ public class RootConfiguration { s.store() + " must be the name of a store"); Preconditions.checkArgument( this.handlers.containsKey(s.handler()), s.handler() + " must be the name of a handler"); + for (PreProcessorConfig preProcessor : preProcessors) { + Preconditions.checkArgument( + this.preProcessors.containsKey(preProcessor.name()), + preProcessor.name() + " must be the name of a pre-processor"); + } } this.services = services; } @@ -142,7 +147,7 @@ private Service createService( if (serviceConfig.store() != null) { store = stores.get(serviceConfig.store()).toStore(); } else { - store = new NullStore(); + store = new NullStore<>(); } PreProcessor preProcessor; @@ -150,12 +155,9 @@ private Service createService( List preProcessorNames = serviceConfig.preProcessors(); for (String i : preProcessorNames) { - try { - preProcessor = preProcessors.get(i).toPreProcessor(); - preProcessorsList.add(preProcessor); - } catch (NullPointerException | NoSuchElementException e) { - throw new RuntimeException(e); - } + // guaranteed to return a non null value due to check in constructor + preProcessor = preProcessors.get(i).toPreProcessor(); + preProcessorsList.add(preProcessor); } return new Service<>(store, handler, plugin, preProcessorsList, serviceConfig.webhookPath());