Skip to content

Commit

Permalink
Java: Changed handling of large requests to transfer them as leaked p…
Browse files Browse the repository at this point in the history
…ointers (valkey-io#1708)

* Restructure Java FFI layer to handle errors properly

* Address clippy lints

* Add tests for error and panic handling

* Fix FFI tests

* Apply Spotless

* Fix some minor issue I forgot about

* Add API to create the leaked bytes vec

* Bridge the MAX_REQUEST_ARGS_LENGTH constant from Rust to Java

* Fix warnings in Rust

* Update Java client to utilize the pointer with large argument sizes

* Update createLeakedBytesVec to handle panics

* spotless

* Add docs and run Rust linters

* Add large value tests

* Fix transactions and add transaction tests

* dummy commit for CI

* Revert "dummy commit for CI"

This reverts commit 3ed1937.

* Fix JDK11 build issue

Due to using a JDK17 function

* Fix another JDK11 issue

* Fix merge issues.

* Remove unneccesary mut prefix

* Clarify the MAX_REQUEST_ARGS_LENGTH_IN_BYTES constant

* Fix merge issue

---------

Co-authored-by: Jonathan Louie <jonathanl@bitquilltech.com>
  • Loading branch information
2 people authored and cyip10 committed Jul 16, 2024
1 parent 7d23838 commit ebe24eb
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 287 deletions.
1 change: 1 addition & 0 deletions java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ logger_core = {path = "../logger_core"}
tracing-subscriber = "0.3.16"
jni = "0.21.1"
log = "0.4.20"
bytes = { version = "1.6.0" }

[profile.release]
lto = true
Expand Down
437 changes: 218 additions & 219 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java

Large diffs are not rendered by default.

19 changes: 8 additions & 11 deletions java/client/src/main/java/glide/api/models/ClusterTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
import redis_request.RedisRequestOuterClass;

/**
* Extends BaseTransaction class for cluster mode commands. Transactions allow the execution of a
Expand Down Expand Up @@ -50,9 +49,8 @@ protected ClusterTransaction getThis() {
*/
public ClusterTransaction sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(sortClusterOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(Sort, ArrayUtils.addFirst(sortClusterOptions.toArgs(), key)));
return this;
}

Expand All @@ -69,9 +67,8 @@ public ClusterTransaction sort(
*/
public ClusterTransaction sortReadOnly(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(sortClusterOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(SortReadOnly, commandArgs));
protobufTransaction.addCommands(
buildCommand(SortReadOnly, ArrayUtils.addFirst(sortClusterOptions.toArgs(), key)));
return this;
}

Expand All @@ -94,10 +91,10 @@ public ClusterTransaction sortStore(
@NonNull String destination,
@NonNull SortClusterOptions sortClusterOptions) {
String[] storeArguments = new String[] {STORE_COMMAND_STRING, destination};
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(
concatenateArrays(new String[] {key}, sortClusterOptions.toArgs(), storeArguments));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(
Sort,
concatenateArrays(new String[] {key}, sortClusterOptions.toArgs(), storeArguments)));
return this;
}
}
25 changes: 10 additions & 15 deletions java/client/src/main/java/glide/api/models/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;

/**
* Extends BaseTransaction class for Redis standalone commands. Transactions allow the execution of
Expand Down Expand Up @@ -53,9 +52,7 @@ protected Transaction getThis() {
* @return Command Response - A simple <code>OK</code> response.
*/
public Transaction select(long index) {
ArgsArray commandArgs = buildArgs(Long.toString(index));

protobufTransaction.addCommands(buildCommand(Select, commandArgs));
protobufTransaction.addCommands(buildCommand(Select, Long.toString(index)));
return this;
}

Expand All @@ -71,8 +68,7 @@ public Transaction select(long index) {
* exist in the source database.
*/
public Transaction move(String key, long dbIndex) {
ArgsArray commandArgs = buildArgs(key, Long.toString(dbIndex));
protobufTransaction.addCommands(buildCommand(Move, commandArgs));
protobufTransaction.addCommands(buildCommand(Move, key, Long.toString(dbIndex)));
return this;
}

Expand Down Expand Up @@ -113,8 +109,7 @@ public Transaction copy(
if (replace) {
args = ArrayUtils.add(args, REPLACE_REDIS_API);
}
ArgsArray commandArgs = buildArgs(args);
protobufTransaction.addCommands(buildCommand(Copy, commandArgs));
protobufTransaction.addCommands(buildCommand(Copy, args));
return this;
}

Expand All @@ -129,8 +124,8 @@ public Transaction copy(
* @return Command Response - An <code>Array</code> of sorted elements.
*/
public Transaction sort(@NonNull String key, @NonNull SortOptions sortOptions) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(sortOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(Sort, ArrayUtils.addFirst(sortOptions.toArgs(), key)));
return this;
}

Expand All @@ -145,8 +140,8 @@ public Transaction sort(@NonNull String key, @NonNull SortOptions sortOptions) {
* @return Command Response - An <code>Array</code> of sorted elements.
*/
public Transaction sortReadOnly(@NonNull String key, @NonNull SortOptions sortOptions) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(sortOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(SortReadOnly, commandArgs));
protobufTransaction.addCommands(
buildCommand(SortReadOnly, ArrayUtils.addFirst(sortOptions.toArgs(), key)));
return this;
}

Expand All @@ -166,9 +161,9 @@ public Transaction sortReadOnly(@NonNull String key, @NonNull SortOptions sortOp
public Transaction sortStore(
@NonNull String key, @NonNull String destination, @NonNull SortOptions sortOptions) {
String[] storeArguments = new String[] {STORE_COMMAND_STRING, destination};
ArgsArray arguments =
buildArgs(concatenateArrays(new String[] {key}, sortOptions.toArgs(), storeArguments));
protobufTransaction.addCommands(buildCommand(Sort, arguments));
protobufTransaction.addCommands(
buildCommand(
Sort, concatenateArrays(new String[] {key}, sortOptions.toArgs(), storeArguments)));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
import response.ResponseOuterClass.Response;

public class RedisValueResolver {
public static final long MAX_REQUEST_ARGS_LENGTH_IN_BYTES;

// TODO: consider lazy loading the glide_rs library
static {
NativeUtils.loadGlideLib();

// Note: This is derived from a native call instead of hard-coded to ensure consistency
// between Java and native clients.
MAX_REQUEST_ARGS_LENGTH_IN_BYTES = getMaxRequestArgsLengthInBytes();
}

/**
Expand All @@ -26,4 +31,20 @@ public class RedisValueResolver {
* @return A RESP3 value
*/
public static native Object valueFromPointerBinary(long pointer);

/**
* Copy the given array of byte arrays to a native series of byte arrays and return a C-style
* pointer.
*
* @param args The arguments to copy.
* @return A C-style pointer to a native representation of the arguments.
*/
public static native long createLeakedBytesVec(byte[][] args);

/**
* Get the maximum length in bytes of all request arguments.
*
* @return The maximum length in bytes of all request arguments.
*/
private static native long getMaxRequestArgsLengthInBytes();
}
104 changes: 68 additions & 36 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import glide.api.models.exceptions.RequestException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.ffi.resolvers.RedisValueResolver;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass;
import redis_request.RedisRequestOuterClass.Command;
Expand Down Expand Up @@ -198,18 +202,12 @@ protected <T> CompletableFuture<T> submitCommandToChannel(
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, String[] arguments, Route route) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFromUtf8(arg));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

var builder =
RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());

return prepareRedisRequestRoute(builder, route);
}
Expand All @@ -225,18 +223,12 @@ protected RedisRequest.Builder prepareRedisRequest(
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, GlideString[] arguments, Route route) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFrom(arg.getBytes()));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

var builder =
RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());

return prepareRedisRequestRoute(builder, route);
}
Expand Down Expand Up @@ -298,17 +290,11 @@ protected RedisRequest.Builder prepareRedisRequest(
* adding a callback id.
*/
protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, String[] arguments) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFromUtf8(arg));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

return RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());
}

/**
Expand All @@ -321,17 +307,11 @@ protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, Stri
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, GlideString[] arguments) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFrom(arg.getBytes()));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

return RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());
}

private RedisRequest.Builder prepareRedisRequestRoute(RedisRequest.Builder builder, Route route) {
Expand Down Expand Up @@ -392,4 +372,56 @@ private Response exceptionHandler(Throwable e) {
}
throw new RuntimeException(e);
}

/**
* Add the given set of arguments to the output Command.Builder.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
public static void populateCommandWithArgs(String[] arguments, Command.Builder outputBuilder) {
populateCommandWithArgs(
Arrays.stream(arguments)
.map(value -> value.getBytes(StandardCharsets.UTF_8))
.collect(Collectors.toList()),
outputBuilder);
}

/**
* Add the given set of arguments to the output Command.Builder.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
private static void populateCommandWithArgs(
GlideString[] arguments, Command.Builder outputBuilder) {
populateCommandWithArgs(
Arrays.stream(arguments).map(GlideString::getBytes).collect(Collectors.toList()),
outputBuilder);
}

/**
* Add the given set of arguments to the output Command.Builder.
*
* <p>Implementation note: When the length in bytes of all arguments supplied to the given command
* exceed {@link RedisValueResolver#MAX_REQUEST_ARGS_LENGTH_IN_BYTES}, the Command will hold a
* handle to leaked vector of byte arrays in the native layer in the <code>ArgsVecPointer</code>
* field. In the normal case where the command arguments are small, they'll be serialized as to an
* {@link ArgsArray} message.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
private static void populateCommandWithArgs(
List<byte[]> arguments, Command.Builder outputBuilder) {
final long totalArgSize = arguments.stream().mapToLong(arg -> arg.length).sum();
if (totalArgSize < RedisValueResolver.MAX_REQUEST_ARGS_LENGTH_IN_BYTES) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
arguments.forEach(arg -> commandArgs.addArgs(ByteString.copyFrom(arg)));
outputBuilder.setArgsArray(commandArgs);
} else {
outputBuilder.setArgsVecPointer(
RedisValueResolver.createLeakedBytesVec(arguments.toArray(new byte[][] {})));
}
}
}
6 changes: 3 additions & 3 deletions java/integTest/src/test/java/glide/SharedClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public static void teardown() {
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void send_and_receive_large_values(BaseClient client) {
int length = 1 << 16;
String key = getRandomString(length);
String value = getRandomString(length);
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

assertEquals(length, key.length());
assertEquals(length, value.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ public void keyless_transactions_with_group_of_commands(
assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void test_transaction_large_values() {
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

ClusterTransaction transaction = new ClusterTransaction();
transaction.set(key, value);
transaction.get(key);

Object[] expectedResult =
new Object[] {
OK, // transaction.set(key, value);
value, // transaction.get(key);
};

Object[] result = clusterClient.exec(transaction).get();
assertArrayEquals(expectedResult, result);
}

@Test
@SneakyThrows
public void lastsave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ public void keyless_transactions_with_group_of_commands(
assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void test_transaction_large_values() {
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

Transaction transaction = new Transaction();
transaction.set(key, value);
transaction.get(key);

Object[] expectedResult =
new Object[] {
OK, // transaction.set(key, value);
value, // transaction.get(key);
};

Object[] result = client.exec(transaction).get();
assertArrayEquals(expectedResult, result);
}

@SneakyThrows
@Test
public void test_standalone_transaction() {
Expand Down
2 changes: 1 addition & 1 deletion java/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/
use jni::{errors::Error as JNIError, JNIEnv};
use log::error;
Expand Down
Loading

0 comments on commit ebe24eb

Please sign in to comment.