Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3 Image and Document uploads #52

Merged
merged 26 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,20 @@ verify_token = "<your verification token here>"
app_secret = "<your verification app secret here>"
access_token = "<you access token here>"

[[pre_processors]]
type = "aws_s3"
name = "aws_s3"
aws_access_key_id="<your-aws-console-access-key>"
aws_secret_access_key="<your-aws-console-secret-key>"
region = "<your-aws-s3-bucket-region>"
bucket = "<your-aws-bucket-name>"

[[services]]
webhook_path = "/<webhook-pathname>"
plugin = "openai_test"
store = "memory_test"
handler = "whatsapp_test"
pre_processors =["aws_s3"]
```

##### Example: Messenger & Llama 2 (via Hugging Face)
Expand Down
59 changes: 58 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,64 @@
<systemStubsVersion>2.1.1</systemStubsVersion>
<custom.jarName>${project.artifactId}-${project.version}</custom.jarName>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.24.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sso</artifactId>
<version>2.13.15</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>ssooidc</artifactId>
<version>2.17.94</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>2.25.35</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -111,7 +168,7 @@
<dependency>
<groupId>ai.djl.huggingface</groupId>
<artifactId>tokenizers</artifactId>
<version>0.23.0</version>
<version>0.28.0</version>
</dependency>
</dependencies>
<build>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/meta/cp4m/PreProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.meta.cp4m;

import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.ThreadState;

@FunctionalInterface
public interface PreProcessor<T extends Message> {

ThreadState<T> run(ThreadState<T> in);
}
23 changes: 23 additions & 0 deletions src/main/java/com/meta/cp4m/PreProcessorConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.meta.cp4m;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.meta.cp4m.message.Message;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = S3PreProcessorConfig.class, name = "aws_s3")
})
public interface PreProcessorConfig {
String name();

<T extends Message> PreProcessor<T> toPreProcessor();
}
97 changes: 97 additions & 0 deletions src/main/java/com/meta/cp4m/S3PreProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.meta.cp4m;

import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.Payload;
import com.meta.cp4m.message.ThreadState;
import java.time.Instant;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

public class S3PreProcessor<T extends Message> implements PreProcessor<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3PreProcessor.class);
private final String awsAccessKeyID;
private final String awsSecretAccessKey;
private final String region;
private final String bucket;
private final @Nullable String textMessageAddition;
private final StaticCredentialsProvider credentialsProvider;

public S3PreProcessor(
String awsAccessKeyID,
String awsSecretAccessKey,
String region,
String bucket,
@Nullable String textMessageAddition) {
this.awsAccessKeyID = awsAccessKeyID;
this.awsSecretAccessKey = awsSecretAccessKey;
this.region = region;
this.bucket = bucket;
this.textMessageAddition = textMessageAddition;

AwsSessionCredentials sessionCredentials =
AwsSessionCredentials.create(this.awsAccessKeyID, this.awsSecretAccessKey, "");

this.credentialsProvider = StaticCredentialsProvider.create(sessionCredentials);
}

@Override
public ThreadState<T> run(ThreadState<T> in) {

switch (in.tail().payload()) {
case Payload.Image i -> {
this.sendRequest(i.value(), in.userId().toString(), i.extension());
}
case Payload.Document i -> {
this.sendRequest(i.value(), in.userId().toString(), i.extension());
}
default -> {
return in;
}
}

return textMessageAddition == null
? in
: in.with(
in.newMessageFromUser(
Instant.now(),
textMessageAddition,
Identifier.random())); // TODO: remove last message
}

public void sendRequest(byte[] media, String senderID, String extension) {
String key = senderID + '_' + Instant.now().toEpochMilli() + '.' + extension;

try (S3Client s3Client =
S3Client.builder()
.region(Region.of(this.region))
.credentialsProvider(this.credentialsProvider)
.build()) {

PutObjectRequest request =
PutObjectRequest.builder()
.bucket(this.bucket)
.key(key)
.contentType("application/" + extension)
.build();
s3Client.putObject(request, RequestBody.fromBytes(media));
LOGGER.info("Media upload to AWS S3 successful");
} catch (Exception e) {
LOGGER.warn("Media upload to AWS S3 failed, {e}", e);
}
}
}
58 changes: 58 additions & 0 deletions src/main/java/com/meta/cp4m/S3PreProcessorConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.meta.cp4m;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.meta.cp4m.message.Message;
import java.util.Objects;
import java.util.regex.Pattern;
import org.checkerframework.checker.nullness.qual.Nullable;

public record S3PreProcessorConfig(
String name,
String awsAccessKeyId,
String awsSecretAccessKey,
String region,
String bucket,
@Nullable String textMessageAddition)
implements PreProcessorConfig {

private static final Pattern KEBAB_CASE = Pattern.compile("^[a-z0-9]+(-[a-z0-9]+)*$");

@JsonCreator
public S3PreProcessorConfig(
@JsonProperty("name") String name,
@JsonProperty("aws_access_key_id") String awsAccessKeyId,
@JsonProperty("aws_secret_access_key") String awsSecretAccessKey,
@JsonProperty("region") String region,
@JsonProperty("bucket") String bucket,
@JsonProperty("text_message_addition") @Nullable String textMessageAddition) {

Preconditions.checkArgument(
KEBAB_CASE.matcher(bucket).matches(),
"bucket does not match the aws region format(kebab case) or is empty");

this.name = Objects.requireNonNull(name, "name is a required parameter");
this.awsAccessKeyId =
Objects.requireNonNull(awsAccessKeyId, "aws access key is a required parameter");
this.awsSecretAccessKey =
Objects.requireNonNull(awsSecretAccessKey, "aws secret access key is a required parameter");
this.region = Objects.requireNonNull(region, "region is a required parameter");
this.bucket = Objects.requireNonNull(bucket, "bucket is a required parameter");
this.textMessageAddition = textMessageAddition;
}

@Override
public <T extends Message> PreProcessor<T> toPreProcessor() {
return new S3PreProcessor<>(
awsAccessKeyId, awsSecretAccessKey, region, bucket, textMessageAddition);
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/meta/cp4m/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -34,12 +35,23 @@ public class Service<T extends Message> {
private final Plugin<T> plugin;

private final String path;
private final List <PreProcessor<T>> preProcessors;

public Service(ChatStore<T> store, MessageHandler<T> handler, Plugin<T> plugin, String path) {
this.handler = Objects.requireNonNull(handler);
this.store = Objects.requireNonNull(store);
this.plugin = Objects.requireNonNull(plugin);
this.path = Objects.requireNonNull(path);
this.preProcessors = Collections.emptyList();
}

public Service(
ChatStore<T> store, MessageHandler<T> handler, Plugin<T> plugin, List<PreProcessor<T>> preProcessors, String path) {
this.handler = Objects.requireNonNull(handler);
this.store = Objects.requireNonNull(store);
this.plugin = plugin;
this.path = path;
this.preProcessors = Collections.unmodifiableList(preProcessors);
}

<IN> void handler(Context ctx, IN in, RequestProcessor<IN, T> processor) {
Expand Down Expand Up @@ -80,6 +92,11 @@ public Plugin<T> plugin() {
}

private void execute(ThreadState<T> thread) {
ThreadState<T> preproccessed = thread;
for (PreProcessor<T> preProcessor : preProcessors) {
preproccessed = preProcessor.run(preproccessed);
}

T pluginResponse;
try {
pluginResponse = plugin.handle(thread);
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/meta/cp4m/ServiceConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,43 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class ServiceConfiguration {
private final String webhookPath;
private final String handler;
private final @Nullable String store;
private final String plugin;
private final @Nullable List<String> preProcessors;

@JsonCreator
ServiceConfiguration(
@JsonProperty("webhook_path") String webhookPath,
@JsonProperty("handler") String handler,
@JsonProperty("store") @Nullable String store,
@JsonProperty("plugin") String plugin) {
@JsonProperty("plugin") String plugin,
@JsonProperty("pre_processors") @Nullable List<String> preProcessors) {
Preconditions.checkArgument(
webhookPath != null && webhookPath.startsWith("/"),
"webhook_path must be present and it must start with a forward slash (/)");
this.webhookPath = webhookPath;
this.handler = Objects.requireNonNull(handler, "handler must be present");
this.store = store;
this.plugin = Objects.requireNonNull(plugin, "plugin must be present");
this.preProcessors = preProcessors == null ? Collections.emptyList() : preProcessors;
}

public String webhookPath() {
return webhookPath;
}

public List<String> preProcessors() {
return preProcessors;
}

public String handler() {
return handler;
}
Expand Down
Loading
Loading