From 0e52ac95d6b17cd8e7d38a471ac7c20c75e5c5d8 Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:58:29 +0200 Subject: [PATCH 1/3] Improve retry logic and update unmaintained dependencies for Rust lint CI (#2673) --- CHANGELOG.md | 1 + benchmarks/rust/Cargo.toml | 1 - deny.toml | 4 +- glide-core/Cargo.toml | 3 +- glide-core/redis-rs/redis/Cargo.toml | 12 ++--- .../redis/src/aio/connection_manager.rs | 19 ++++--- .../redis-rs/redis/src/cluster_async/mod.rs | 54 +++++++++---------- .../redis-rs/redis/src/cluster_topology.rs | 22 ++++---- .../src/client/reconnecting_connection.rs | 8 ++- glide-core/src/retry_strategies.rs | 30 ++++++++--- go/Cargo.toml | 1 - go/src/lib.rs | 26 ++++++--- 12 files changed, 104 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aff7be5e44..4ad57b356b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ * Core: Add support for sending multi-slot JSON.MSET and JSON.MGET commands ([#2587]https://github.com/valkey-io/valkey-glide/pull/2587) * Node: Add `JSON.DEBUG` command ([#2572](https://github.com/valkey-io/valkey-glide/pull/2572)) * Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555)) +* Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643)) #### Breaking Changes diff --git a/benchmarks/rust/Cargo.toml b/benchmarks/rust/Cargo.toml index d63bc98e57..1c7baf0b70 100644 --- a/benchmarks/rust/Cargo.toml +++ b/benchmarks/rust/Cargo.toml @@ -15,7 +15,6 @@ redis = { path = "../../glide-core/redis-rs/redis", features = ["aio"] } futures = "0.3.28" rand = "0.8.5" itoa = "1.0.6" -futures-time = "^3.0.0" clap = { version = "4.3.8", features = ["derive"] } chrono = "0.4.26" serde_json = "1.0.99" diff --git a/deny.toml b/deny.toml index e45b094cee..0a43bfa193 100644 --- a/deny.toml +++ b/deny.toml @@ -24,8 +24,6 @@ yanked = "deny" ignore = [ # Unmaintained dependency error that needs more attention due to nested dependencies "RUSTSEC-2024-0370", - "RUSTSEC-2024-0384", - "RUSTSEC-2024-0388", ] # Threshold for security vulnerabilities, any vulnerability with a CVSS score # lower than the range specified will be ignored. Note that ignored advisories @@ -59,7 +57,7 @@ allow = [ "Unicode-DFS-2016", "ISC", "OpenSSL", - "MPL-2.0", + "MPL-2.0" ] # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the diff --git a/glide-core/Cargo.toml b/glide-core/Cargo.toml index bbbf08e0cc..bd12bb09c9 100644 --- a/glide-core/Cargo.toml +++ b/glide-core/Cargo.toml @@ -24,7 +24,8 @@ logger_core = { path = "../logger_core" } dispose = "0.5.0" tokio-util = { version = "^0.7", features = ["rt"], optional = true } num_cpus = { version = "^1.15", optional = true } -tokio-retry = "0.3.0" +tokio-retry2 = {version = "0.5", features = ["jitter"]} + protobuf = { version = "3", features = [ "bytes", "with-bytes", diff --git a/glide-core/redis-rs/redis/Cargo.toml b/glide-core/redis-rs/redis/Cargo.toml index 0b9da67e33..3320ba8ec7 100644 --- a/glide-core/redis-rs/redis/Cargo.toml +++ b/glide-core/redis-rs/redis/Cargo.toml @@ -53,7 +53,6 @@ dispose = { version = "0.5.0", optional = true } # Only needed for the connection manager arc-swap = { version = "1.7.1" } futures = { version = "0.3.3", optional = true } -tokio-retry = { version = "0.3.0", optional = true } # Only needed for the r2d2 feature r2d2 = { version = "0.8.8", optional = true } @@ -61,7 +60,6 @@ r2d2 = { version = "0.8.8", optional = true } # Only needed for cluster crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } -derivative = { version = "2.2.0", optional = true } # Only needed for async cluster dashmap = { version = "6.0", optional = true } @@ -69,9 +67,7 @@ dashmap = { version = "6.0", optional = true } async-trait = { version = "0.1.24", optional = true } # Only needed for tokio support -backoff-tokio = { package = "backoff", version = "0.4.0", optional = true, features = [ - "tokio", -] } +tokio-retry2 = {version = "0.5", features = ["jitter"], optional = true} # Only needed for native tls native-tls = { version = "0.2", optional = true } @@ -134,7 +130,7 @@ aio = [ ] geospatial = [] json = ["serde", "serde/derive", "serde_json"] -cluster = ["crc16", "rand", "derivative"] +cluster = ["crc16", "rand"] script = ["sha1_smol"] tls-native-tls = ["native-tls"] tls-rustls = [ @@ -145,10 +141,10 @@ tls-rustls = [ ] tls-rustls-insecure = ["tls-rustls"] tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"] -tokio-comp = ["aio", "tokio/net", "backoff-tokio"] +tokio-comp = ["aio", "tokio/net", "tokio-retry2"] tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] -connection-manager = ["futures", "aio", "tokio-retry"] +connection-manager = ["futures", "aio", "tokio-retry2"] streams = [] cluster-async = ["cluster", "futures", "futures-util", "dashmap"] keep-alive = ["socket2"] diff --git a/glide-core/redis-rs/redis/src/aio/connection_manager.rs b/glide-core/redis-rs/redis/src/aio/connection_manager.rs index 83d680ae53..02e6976d15 100644 --- a/glide-core/redis-rs/redis/src/aio/connection_manager.rs +++ b/glide-core/redis-rs/redis/src/aio/connection_manager.rs @@ -15,8 +15,8 @@ use futures::{ }; use futures_util::future::BoxFuture; use std::sync::Arc; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tokio_retry::Retry; +use tokio_retry2::strategy::{jitter, ExponentialBackoff}; +use tokio_retry2::{Retry, RetryError}; /// A `ConnectionManager` is a proxy that wraps a [multiplexed /// connection][multiplexed-connection] and automatically reconnects to the @@ -191,12 +191,15 @@ impl ConnectionManager { connection_timeout: std::time::Duration, ) -> RedisResult { let retry_strategy = exponential_backoff.map(jitter).take(number_of_retries); - Retry::spawn(retry_strategy, || { - client.get_multiplexed_async_connection_with_timeouts( - response_timeout, - connection_timeout, - GlideConnectionOptions::default(), - ) + Retry::spawn(retry_strategy, || async { + client + .get_multiplexed_async_connection_with_timeouts( + response_timeout, + connection_timeout, + GlideConnectionOptions::default(), + ) + .await + .map_err(RetryError::transient) }) .await } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 9fe5f8821b..426601ca02 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -33,7 +33,11 @@ use crate::{ client::GlideConnectionOptions, cluster_routing::{Routable, RoutingInfo}, cluster_slotmap::SlotMap, - cluster_topology::SLOT_SIZE, + cluster_topology::{ + calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES, + DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR, + SLOT_SIZE, + }, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC}, FromRedisValue, InfoDict, ToRedisArgs, @@ -69,10 +73,6 @@ use crate::{ self, MultipleNodeRoutingInfo, Redirect, ResponsePolicy, Route, SingleNodeRoutingInfo, SlotAddr, }, - cluster_topology::{ - calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES, - DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL, - }, connection::{PubSubSubscriptionInfo, PubSubSubscriptionKind}, push_manager::PushInfo, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult, @@ -84,9 +84,10 @@ use std::time::Duration; #[cfg(feature = "tokio-comp")] use async_trait::async_trait; #[cfg(feature = "tokio-comp")] -use backoff_tokio::future::retry; +use tokio_retry2::strategy::{jitter_range, ExponentialFactorBackoff}; #[cfg(feature = "tokio-comp")] -use backoff_tokio::{Error as BackoffError, ExponentialBackoff}; +use tokio_retry2::{Retry, RetryError}; + #[cfg(feature = "tokio-comp")] use tokio::{sync::Notify, time::timeout}; @@ -1518,16 +1519,24 @@ where let mut res = Ok(()); if !skip_slots_refresh { - let retry_strategy = ExponentialBackoff { - initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, - max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL, - max_elapsed_time: None, - ..Default::default() - }; + let retry_strategy = ExponentialFactorBackoff::from_millis( + DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, + DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR, + ) + .map(jitter_range(0.8, 1.2)) + .take(DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES); let retries_counter = AtomicUsize::new(0); - res = retry(retry_strategy, || { + res = Retry::spawn(retry_strategy, || async { let curr_retry = retries_counter.fetch_add(1, atomic::Ordering::Relaxed); Self::refresh_slots(inner.clone(), curr_retry) + .await + .map_err(|err| { + if err.kind() == ErrorKind::AllConnectionsUnavailable { + RetryError::permanent(err) + } else { + RetryError::transient(err) + } + }) }) .await; } @@ -1706,26 +1715,13 @@ where false } - async fn refresh_slots( - inner: Arc>, - curr_retry: usize, - ) -> Result<(), BackoffError> { + async fn refresh_slots(inner: Arc>, curr_retry: usize) -> RedisResult<()> { // Update the slot refresh last run timestamp let now = SystemTime::now(); let mut last_run_wlock = inner.slot_refresh_state.last_run.write().await; *last_run_wlock = Some(now); drop(last_run_wlock); - Self::refresh_slots_inner(inner, curr_retry) - .await - .map_err(|err| { - if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES - || err.kind() == ErrorKind::AllConnectionsUnavailable - { - BackoffError::Permanent(err) - } else { - BackoffError::from(err) - } - }) + Self::refresh_slots_inner(inner, curr_retry).await } pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool { diff --git a/glide-core/redis-rs/redis/src/cluster_topology.rs b/glide-core/redis-rs/redis/src/cluster_topology.rs index a2ce9ea078..df8debd681 100644 --- a/glide-core/redis-rs/redis/src/cluster_topology.rs +++ b/glide-core/redis-rs/redis/src/cluster_topology.rs @@ -8,7 +8,6 @@ use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap}; use crate::{cluster::TlsMode, ErrorKind, RedisError, RedisResult, Value}; #[cfg(all(feature = "cluster-async", not(feature = "tokio-comp")))] use async_std::sync::RwLock; -use derivative::Derivative; use std::collections::{hash_map::DefaultHasher, HashMap}; use std::hash::{Hash, Hasher}; use std::sync::atomic::AtomicBool; @@ -21,11 +20,10 @@ use tracing::info; // Exponential backoff constants for retrying a slot refresh /// The default number of refresh topology retries in the same call pub const DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES: usize = 3; -/// The default maximum interval between two retries of the same call for topology refresh -pub const DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(1); -/// The default initial interval for retrying topology refresh -pub const DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL: Duration = Duration::from_millis(500); - +/// The default base duration for retrying topology refresh +pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS: u64 = 500; +/// The default base factor for retrying topology refresh +pub const DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR: f64 = 1.5; // Constants for the intervals between two independent consecutive refresh slots calls /// The default wait duration between two consecutive refresh slots calls #[cfg(feature = "cluster-async")] @@ -58,17 +56,21 @@ impl SlotRefreshState { } } -#[derive(Derivative)] -#[derivative(PartialEq, Eq)] #[derive(Debug)] pub(crate) struct TopologyView { pub(crate) hash_value: TopologyHash, - #[derivative(PartialEq = "ignore")] pub(crate) nodes_count: u16, - #[derivative(PartialEq = "ignore")] slots_and_count: (u16, Vec), } +impl PartialEq for TopologyView { + fn eq(&self, other: &Self) -> bool { + self.hash_value == other.hash_value + } +} + +impl Eq for TopologyView {} + pub(crate) fn slot(key: &[u8]) -> u16 { crc16::State::::calculate(key) % SLOT_SIZE } diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index 245e7900d3..c567b6b5a6 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -17,7 +17,7 @@ use telemetrylib::Telemetry; use tokio::sync::{mpsc, Notify}; use tokio::task; use tokio::time::timeout; -use tokio_retry::Retry; +use tokio_retry2::{Retry, RetryError}; use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT}; @@ -121,7 +121,11 @@ async fn create_connection( TokioDisconnectNotifier::new(), )), }; - let action = || get_multiplexed_connection(client, &connection_options); + let action = || async { + get_multiplexed_connection(client, &connection_options) + .await + .map_err(RetryError::transient) + }; match Retry::spawn(retry_strategy.get_iterator(), action).await { Ok(connection) => { diff --git a/glide-core/src/retry_strategies.rs b/glide-core/src/retry_strategies.rs index d851cb63dd..1a5157d225 100644 --- a/glide-core/src/retry_strategies.rs +++ b/glide-core/src/retry_strategies.rs @@ -3,7 +3,7 @@ */ use crate::client::ConnectionRetryStrategy; use std::time::Duration; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry2::strategy::{jitter_range, ExponentialBackoff}; #[derive(Clone, Debug)] pub(super) struct RetryStrategy { @@ -27,7 +27,7 @@ impl RetryStrategy { pub(super) fn get_iterator(&self) -> impl Iterator { ExponentialBackoff::from_millis(self.exponent_base as u64) .factor(self.factor as u64) - .map(jitter) + .map(jitter_range(0.8, 1.2)) .take(self.number_of_retries as usize) } } @@ -78,23 +78,39 @@ mod tests { let mut counter = 0; for duration in intervals { counter += 1; - assert!(duration.as_millis() <= interval_duration as u128); + let upper_limit = (interval_duration as f32 * 1.2) as u128; + let lower_limit = (interval_duration as f32 * 0.8) as u128; + assert!( + lower_limit <= duration.as_millis() || duration.as_millis() <= upper_limit, + "{:?}ms <= {:?}ms <= {:?}ms", + lower_limit, + duration.as_millis(), + upper_limit + ); } assert_eq!(counter, retries); } #[test] fn test_exponential_backoff_with_jitter() { - let retries = 3; - let base = 10; - let factor = 5; + let retries = 5; + let base = 2; + let factor = 100; let intervals = get_exponential_backoff(base, factor, retries).get_iterator(); let mut counter = 0; for duration in intervals { counter += 1; let unjittered_duration = factor * (base.pow(counter)); - assert!(duration.as_millis() <= unjittered_duration as u128); + let upper_limit = (unjittered_duration as f32 * 1.2) as u128; + let lower_limit = (unjittered_duration as f32 * 0.8) as u128; + assert!( + lower_limit <= duration.as_millis() || duration.as_millis() <= upper_limit, + "{:?}ms <= {:?}ms <= {:?}ms", + lower_limit, + duration.as_millis(), + upper_limit + ); } assert_eq!(counter, retries); diff --git a/go/Cargo.toml b/go/Cargo.toml index 05d34e7108..48556820fd 100644 --- a/go/Cargo.toml +++ b/go/Cargo.toml @@ -13,7 +13,6 @@ redis = { path = "../glide-core/redis-rs/redis", features = ["aio", "tokio-comp" glide-core = { path = "../glide-core", features = ["socket-layer"] } tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] } protobuf = { version = "3.3.0", features = [] } -derivative = "2.2.0" [profile.release] lto = true diff --git a/go/src/lib.rs b/go/src/lib.rs index 57e6e3a44a..344dac6e45 100644 --- a/go/src/lib.rs +++ b/go/src/lib.rs @@ -3,7 +3,6 @@ */ #![deny(unsafe_op_in_unsafe_fn)] -use derivative::Derivative; use glide_core::client::Client as GlideClient; use glide_core::connection_request; use glide_core::errors; @@ -17,6 +16,7 @@ use std::{ ffi::{c_void, CString}, mem, os::raw::{c_char, c_double, c_long, c_ulong}, + ptr, }; use tokio::runtime::Builder; use tokio::runtime::Runtime; @@ -28,8 +28,7 @@ use tokio::runtime::Runtime; /// The struct is freed by the external caller by using `free_command_response` to avoid memory leaks. /// TODO: Add a type enum to validate what type of response is being sent in the CommandResponse. #[repr(C)] -#[derive(Derivative)] -#[derivative(Debug, Default)] +#[derive(Debug)] pub struct CommandResponse { response_type: ResponseType, int_value: c_long, @@ -39,26 +38,39 @@ pub struct CommandResponse { /// Below two values are related to each other. /// `string_value` represents the string. /// `string_value_len` represents the length of the string. - #[derivative(Default(value = "std::ptr::null_mut()"))] string_value: *mut c_char, string_value_len: c_long, /// Below two values are related to each other. /// `array_value` represents the array of CommandResponse. /// `array_value_len` represents the length of the array. - #[derivative(Default(value = "std::ptr::null_mut()"))] array_value: *mut CommandResponse, array_value_len: c_long, /// Below two values represent the Map structure inside CommandResponse. /// The map is transformed into an array of (map_key: CommandResponse, map_value: CommandResponse) and passed to Go. /// These are represented as pointers as the map can be null (optionally present). - #[derivative(Default(value = "std::ptr::null_mut()"))] map_key: *mut CommandResponse, - #[derivative(Default(value = "std::ptr::null_mut()"))] map_value: *mut CommandResponse, } +impl Default for CommandResponse { + fn default() -> Self { + CommandResponse { + response_type: ResponseType::default(), + int_value: 0, + float_value: 0.0, + bool_value: false, + string_value: ptr::null_mut(), + string_value_len: 0, + array_value: ptr::null_mut(), + array_value_len: 0, + map_key: ptr::null_mut(), + map_value: ptr::null_mut(), + } + } +} + #[repr(C)] #[derive(Debug, Default)] pub enum ResponseType { From 137c02eb20483a8c73ad967f2adde0f49aa64bf8 Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 6 Nov 2024 15:35:13 +0000 Subject: [PATCH 2/3] Core: Release the read lock while creating connections inrefresh_connections Signed-off-by: barshaul --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ad57b356b..128b5bd154 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * Node: Add `JSON.DEBUG` command ([#2572](https://github.com/valkey-io/valkey-glide/pull/2572)) * Node: Add `JSON.NUMINCRBY` and `JSON.NUMMULTBY` command ([#2555](https://github.com/valkey-io/valkey-glide/pull/2555)) * Core: Improve retry logic and update unmaintained dependencies for Rust lint CI ([#2673](https://github.com/valkey-io/valkey-glide/pull/2643)) +* Core: Release the read lock while creating connections in `refresh_connections` ([#2630](https://github.com/valkey-io/valkey-glide/issues/2630)) #### Breaking Changes From c374d29634392f6b48f5ca321bfcb904390cc2eb Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 12 Nov 2024 15:22:04 -0800 Subject: [PATCH 3/3] Node: Add `FT.PROFILE` command (#2633) * Node: Add FT.PROFILE command --------- Signed-off-by: Yury-Fridlyand Signed-off-by: Andrew Carbonetto Co-authored-by: Yury-Fridlyand --- CHANGELOG.md | 1 + glide-core/src/client/value_conversion.rs | 4 +- node/npm/glide/index.ts | 4 + node/src/server-modules/GlideFt.ts | 409 +++++++++----- node/src/server-modules/GlideFtOptions.ts | 14 +- node/tests/ServerModules.test.ts | 627 +++++++++++++--------- 6 files changed, 660 insertions(+), 399 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 128b5bd154..184b7abd76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * Node: Added `FT.CREATE` ([#2501](https://github.com/valkey-io/valkey-glide/pull/2501)) * Node: Added `FT.INFO` ([#2540](https://github.com/valkey-io/valkey-glide/pull/2540)) * Node: Added `FT.AGGREGATE` ([#2554](https://github.com/valkey-io/valkey-glide/pull/2554)) +* Node: Added `FT.PROFILE` ([#2633](https://github.com/valkey-io/valkey-glide/pull/2633)) * Java: Added `JSON.DEBUG` ([#2520](https://github.com/valkey-io/valkey-glide/pull/2520)) * Java: Added `JSON.ARRINSERT` and `JSON.ARRLEN` ([#2476](https://github.com/valkey-io/valkey-glide/pull/2476)) * Java: Added `JSON.ARRINDEX` ([#2546](https://github.com/valkey-io/valkey-glide/pull/2546)) diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index ca4a0371c9..6ba9dc757c 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -1132,8 +1132,8 @@ pub(crate) fn convert_to_expected_type( let res = vec![ convert_to_expected_type(array.remove(0), *type_of_query)?, convert_to_expected_type(array.remove(0), Some(ExpectedReturnType::Map { - key_type: &None, - value_type: &None, + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::Double), }))?]; Ok(Value::Array(res)) diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index 8e7ef5de14..43b80b463d 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -136,7 +136,9 @@ function initialize() { FtAggregateSortBy, FtAggregateSortProperty, FtAggregateApply, + FtAggregateReturnType, FtSearchReturnType, + FtProfileOtions, GlideRecord, GlideString, JsonGetOptions, @@ -268,7 +270,9 @@ function initialize() { FtAggregateSortBy, FtAggregateSortProperty, FtAggregateApply, + FtAggregateReturnType, FtSearchReturnType, + FtProfileOtions, GlideRecord, GlideJson, GlideString, diff --git a/node/src/server-modules/GlideFt.ts b/node/src/server-modules/GlideFt.ts index aaf4335c01..61e7c36a9f 100644 --- a/node/src/server-modules/GlideFt.ts +++ b/node/src/server-modules/GlideFt.ts @@ -36,6 +36,11 @@ export type FtSearchReturnType = [ GlideRecord>, ]; +/** + * Response type for the {@link GlideFt.aggregate | ft.aggregate} command. + */ +export type FtAggregateReturnType = GlideRecord[]; + /** Module for Vector Search commands. */ export class GlideFt { /** @@ -263,7 +268,7 @@ export class GlideFt { * }, * ], * }; - * const result = await GlideFt.aggregate("myIndex", "*", options); + * const result = await GlideFt.aggregate(client, "myIndex", "*", options); * console.log(result); // Output: * // [ * // [ @@ -304,90 +309,19 @@ export class GlideFt { indexName: GlideString, query: GlideString, options?: DecoderOption & FtAggregateOptions, - ): Promise[]> { - const args: GlideString[] = ["FT.AGGREGATE", indexName, query]; - - if (options) { - if (options.loadAll) args.push("LOAD", "*"); - else if (options.loadFields) - args.push( - "LOAD", - options.loadFields.length.toString(), - ...options.loadFields, - ); - - if (options.timeout) - args.push("TIMEOUT", options.timeout.toString()); - - if (options.params) { - args.push( - "PARAMS", - (options.params.length * 2).toString(), - ...options.params.flatMap((pair) => pair), - ); - } - - if (options.clauses) { - for (const clause of options.clauses) { - switch (clause.type) { - case "LIMIT": - args.push( - clause.type, - clause.offset.toString(), - clause.count.toString(), - ); - break; - case "FILTER": - args.push(clause.type, clause.expression); - break; - case "GROUPBY": - args.push( - clause.type, - clause.properties.length.toString(), - ...clause.properties, - ); - - for (const reducer of clause.reducers) { - args.push( - "REDUCE", - reducer.function, - reducer.args.length.toString(), - ...reducer.args, - ); - if (reducer.name) args.push("AS", reducer.name); - } - - break; - case "SORTBY": - args.push( - clause.type, - (clause.properties.length * 2).toString(), - ); - for (const property of clause.properties) - args.push(property.property, property.order); - if (clause.max) - args.push("MAX", clause.max.toString()); - break; - case "APPLY": - args.push( - clause.type, - clause.expression, - "AS", - clause.name, - ); - break; - default: - throw new Error( - "Unknown clause type in FtAggregateOptions", - ); - } - } - } - } - - return _handleCustomCommand(client, args, options) as Promise< - GlideRecord[] - >; + ): Promise { + const args: GlideString[] = [ + "FT.AGGREGATE", + indexName, + query, + ..._addFtAggregateOptions(options), + ]; + + return _handleCustomCommand( + client, + args, + options, + ) as Promise; } /** @@ -571,59 +505,135 @@ export class GlideFt { query: GlideString, options?: FtSearchOptions & DecoderOption, ): Promise { - const args: GlideString[] = ["FT.SEARCH", indexName, query]; + const args: GlideString[] = [ + "FT.SEARCH", + indexName, + query, + ..._addFtSearchOptions(options), + ]; + + return _handleCustomCommand(client, args, options) as Promise< + [number, GlideRecord>] + >; + } + + /** + * Runs a search query and collects performance profiling information. + * + * @param client - The client to execute the command. + * @param indexName - The index name. + * @param query - The text query to search. + * @param options - (Optional) See {@link FtSearchOptions} and {@link DecoderOption}. Additionally: + * - `limited` (Optional) - Either provide a full verbose output or some brief version. + * + * @returns A two-element array. The first element contains results of the search query being profiled, the + * second element stores profiling information. + * + * @example + * ```typescript + * // Example of running profile on a search query + * const vector = Buffer.alloc(24); + * const result = await GlideFt.profileSearch(client, "json_idx1", "*=>[KNN 2 @VEC $query_vec]", {params: [{key: "query_vec", value: vector}]}); + * console.log(result); // Output: + * // result[0] contains `FT.SEARCH` response with the given query + * // result[1] contains profiling data as a `Record` + * ``` + */ + static async profileSearch( + client: GlideClient | GlideClusterClient, + indexName: GlideString, + query: GlideString, + options?: DecoderOption & + FtSearchOptions & { + limited?: boolean; + }, + ): Promise<[FtSearchReturnType, Record]> { + const args: GlideString[] = ["FT.PROFILE", indexName, "SEARCH"]; + + if (options?.limited) { + args.push("LIMITED"); + } + + args.push("QUERY", query); if (options) { - // RETURN - if (options.returnFields) { - const returnFields: GlideString[] = []; - options.returnFields.forEach((returnField) => - returnField.alias - ? returnFields.push( - returnField.fieldIdentifier, - "AS", - returnField.alias, - ) - : returnFields.push(returnField.fieldIdentifier), - ); - args.push( - "RETURN", - returnFields.length.toString(), - ...returnFields, - ); - } + args.push(..._addFtSearchOptions(options)); + } - // TIMEOUT - if (options.timeout) { - args.push("TIMEOUT", options.timeout.toString()); - } + return ( + _handleCustomCommand( + client, + args, + options as DecoderOption, + ) as Promise<[FtSearchReturnType, GlideRecord]> + ).then((v) => [v[0], convertGlideRecordToRecord(v[1])]); + } - // PARAMS - if (options.params) { - args.push("PARAMS", (options.params.length * 2).toString()); - options.params.forEach((param) => - args.push(param.key, param.value), - ); - } + /** + * Runs an aggregate query and collects performance profiling information. + * + * @param client - The client to execute the command. + * @param indexName - The index name. + * @param query - The text query to search. + * @param options - (Optional) See {@link FtAggregateOptions} and {@link DecoderOption}. Additionally: + * - `limited` (Optional) - Either provide a full verbose output or some brief version. + * + * @returns A two-element array. The first element contains results of the aggregate query being profiled, the + * second element stores profiling information. + * + * @example + * ```typescript + * // Example of running profile on an aggregate query + * const options: FtAggregateOptions = { + * loadFields: ["__key"], + * clauses: [ + * { + * type: "GROUPBY", + * properties: ["@condition"], + * reducers: [ + * { + * function: "TOLIST", + * args: ["__key"], + * name: "bicycles", + * }, + * ], + * }, + * ], + * }; + * const result = await GlideFt.profileAggregate(client, "myIndex", "*", options); + * console.log(result); // Output: + * // result[0] contains `FT.AGGREGATE` response with the given query + * // result[1] contains profiling data as a `Record` + * ``` + */ + static async profileAggregate( + client: GlideClient | GlideClusterClient, + indexName: GlideString, + query: GlideString, + options?: DecoderOption & + FtAggregateOptions & { + limited?: boolean; + }, + ): Promise<[FtAggregateReturnType, Record]> { + const args: GlideString[] = ["FT.PROFILE", indexName, "AGGREGATE"]; + + if (options?.limited) { + args.push("LIMITED"); + } - // LIMIT - if (options.limit) { - args.push( - "LIMIT", - options.limit.offset.toString(), - options.limit.count.toString(), - ); - } + args.push("QUERY", query); - // COUNT - if (options.count) { - args.push("COUNT"); - } + if (options) { + args.push(..._addFtAggregateOptions(options)); } - return _handleCustomCommand(client, args, options) as Promise< - [number, GlideRecord>] - >; + return ( + _handleCustomCommand( + client, + args, + options as DecoderOption, + ) as Promise<[FtAggregateReturnType, GlideRecord]> + ).then((v) => [v[0], convertGlideRecordToRecord(v[1])]); } /** @@ -723,6 +733,145 @@ export class GlideFt { } } +/** + * @internal + */ +function _addFtAggregateOptions(options?: FtAggregateOptions): GlideString[] { + if (!options) return []; + + const args: GlideString[] = []; + + if (options.loadAll) args.push("LOAD", "*"); + else if (options.loadFields) + args.push( + "LOAD", + options.loadFields.length.toString(), + ...options.loadFields, + ); + + if (options.timeout) args.push("TIMEOUT", options.timeout.toString()); + + if (options.params) { + args.push( + "PARAMS", + (options.params.length * 2).toString(), + ...options.params.flatMap((param) => [param.key, param.value]), + ); + } + + if (options.clauses) { + for (const clause of options.clauses) { + switch (clause.type) { + case "LIMIT": + args.push( + clause.type, + clause.offset.toString(), + clause.count.toString(), + ); + break; + case "FILTER": + args.push(clause.type, clause.expression); + break; + case "GROUPBY": + args.push( + clause.type, + clause.properties.length.toString(), + ...clause.properties, + ); + + for (const reducer of clause.reducers) { + args.push( + "REDUCE", + reducer.function, + reducer.args.length.toString(), + ...reducer.args, + ); + if (reducer.name) args.push("AS", reducer.name); + } + + break; + case "SORTBY": + args.push( + clause.type, + (clause.properties.length * 2).toString(), + ); + for (const property of clause.properties) + args.push(property.property, property.order); + if (clause.max) args.push("MAX", clause.max.toString()); + break; + case "APPLY": + args.push( + clause.type, + clause.expression, + "AS", + clause.name, + ); + break; + default: + throw new Error( + "Unknown clause type in FtAggregateOptions", + ); + } + } + } + + return args; +} + +/** + * @internal + */ +function _addFtSearchOptions(options?: FtSearchOptions): GlideString[] { + if (!options) return []; + + const args: GlideString[] = []; + + // RETURN + if (options.returnFields) { + const returnFields: GlideString[] = []; + options.returnFields.forEach((returnField) => + returnField.alias + ? returnFields.push( + returnField.fieldIdentifier, + "AS", + returnField.alias, + ) + : returnFields.push(returnField.fieldIdentifier), + ); + args.push("RETURN", returnFields.length.toString(), ...returnFields); + } + + // TIMEOUT + if (options.timeout) { + args.push("TIMEOUT", options.timeout.toString()); + } + + // PARAMS + if (options.params) { + args.push( + "PARAMS", + (options.params.length * 2).toString(), + ...options.params.flatMap((param) => [param.key, param.value]), + ); + } + + // LIMIT + if (options.limit) { + args.push( + "LIMIT", + options.limit.offset.toString(), + options.limit.count.toString(), + ); + } + + // COUNT + if (options.count) { + args.push("COUNT"); + } + + return args; +} + /** * @internal */ diff --git a/node/src/server-modules/GlideFtOptions.ts b/node/src/server-modules/GlideFtOptions.ts index e6fe836154..6d9e8f4528 100644 --- a/node/src/server-modules/GlideFtOptions.ts +++ b/node/src/server-modules/GlideFtOptions.ts @@ -138,8 +138,11 @@ export type FtAggregateOptions = { | FtAggregateSortBy | FtAggregateApply )[]; - /** The key/value pairs can be referenced from within the query expression. */ - params?: [GlideString, GlideString][]; + /** + * Query parameters, which could be referenced in the query by `$` sign, followed by + * the parameter name. + */ + params?: GlideRecord; } & ( | { /** List of fields to load from the index. */ @@ -224,11 +227,15 @@ export interface FtAggregateApply { /** The new property name to store the result of apply. This name can be referenced by further operations down the pipeline. */ name: GlideString; } + /** * Represents the input options to be used in the FT.SEARCH command. * All fields in this class are optional inputs for FT.SEARCH. */ export type FtSearchOptions = { + /** Query timeout in milliseconds. */ + timeout?: number; + /** * Add a field to be returned. * @param fieldIdentifier field name to return. @@ -236,9 +243,6 @@ export type FtSearchOptions = { */ returnFields?: { fieldIdentifier: GlideString; alias?: GlideString }[]; - /** Query timeout in milliseconds. */ - timeout?: number; - /** * Query parameters, which could be referenced in the query by `$` sign, followed by * the parameter name. diff --git a/node/tests/ServerModules.test.ts b/node/tests/ServerModules.test.ts index 01e26cb7ff..e572466b8a 100644 --- a/node/tests/ServerModules.test.ts +++ b/node/tests/ServerModules.test.ts @@ -15,6 +15,8 @@ import { convertGlideRecordToRecord, Decoder, FtAggregateOptions, + FtAggregateReturnType, + FtSearchOptions, FtSearchReturnType, GlideClusterClient, GlideFt, @@ -2501,7 +2503,80 @@ describe("Server Module Tests", () => { }); it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - "FT.AGGREGATE ft.aggregate", + "FT.INFO ft.info", + async (protocol) => { + client = await GlideClusterClient.createClient( + getClientConfigurationOption( + cluster.getAddresses(), + protocol, + ), + ); + + const index = uuidv4(); + expect( + await GlideFt.create( + client, + Buffer.from(index), + [ + { + type: "VECTOR", + name: "$.vec", + alias: "VEC", + attributes: { + algorithm: "HNSW", + distanceMetric: "COSINE", + dimensions: 42, + }, + }, + { type: "TEXT", name: "$.name" }, + ], + { dataType: "JSON", prefixes: ["123"] }, + ), + ).toEqual("OK"); + + let response = await GlideFt.info(client, Buffer.from(index)); + + expect(response).toMatchObject({ + index_name: index, + key_type: "JSON", + key_prefixes: ["123"], + fields: [ + { + identifier: "$.name", + type: "TEXT", + field_name: "$.name", + option: "", + }, + { + identifier: "$.vec", + type: "VECTOR", + field_name: "VEC", + option: "", + vector_params: { + distance_metric: "COSINE", + dimension: 42, + }, + }, + ], + }); + + response = await GlideFt.info(client, index, { + decoder: Decoder.Bytes, + }); + expect(response).toMatchObject({ + index_name: Buffer.from(index), + }); + + expect(await GlideFt.dropindex(client, index)).toEqual("OK"); + // querying a missing index + await expect(GlideFt.info(client, index)).rejects.toThrow( + "Index not found", + ); + }, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "FT.AGGREGATE on JSON", async (protocol) => { client = await GlideClusterClient.createClient( getClientConfigurationOption( @@ -2513,8 +2588,7 @@ describe("Server Module Tests", () => { const isResp3 = protocol == ProtocolVersion.RESP3; const prefixBicycles = "{bicycles}:"; const indexBicycles = prefixBicycles + uuidv4(); - const prefixMovies = "{movies}:"; - const indexMovies = prefixMovies + uuidv4(); + const query = "*"; // FT.CREATE idx:bicycle ON JSON PREFIX 1 bicycle: SCHEMA $.model AS model TEXT $.description AS // description TEXT $.price AS price NUMERIC $.condition AS condition TAG SEPARATOR , @@ -2642,7 +2716,7 @@ describe("Server Module Tests", () => { ); // FT.AGGREGATE idx:bicycle * LOAD 1 __key GROUPBY 1 @condition REDUCE COUNT 0 AS bicycles - let options: FtAggregateOptions = { + const options: FtAggregateOptions = { loadFields: ["__key"], clauses: [ { @@ -2658,15 +2732,13 @@ describe("Server Module Tests", () => { }, ], }; - let aggreg = ( - await GlideFt.aggregate(client, indexBicycles, "*", options) - ) - .map(convertGlideRecordToRecord) - // elements (records in array) could be reordered - .sort((a, b) => - a["condition"]! > b["condition"]! ? 1 : -1, - ); - expect(aggreg).toEqual([ + const aggreg = await GlideFt.aggregate( + client, + indexBicycles, + query, + options, + ); + const expectedAggreg = [ { condition: "new", bicycles: isResp3 ? 5 : "5", @@ -2679,7 +2751,54 @@ describe("Server Module Tests", () => { condition: "used", bicycles: isResp3 ? 4 : "4", }, - ]); + ]; + expect( + aggreg + .map(convertGlideRecordToRecord) + // elements (records in array) could be reordered + .sort((a, b) => + a["condition"]! > b["condition"]! ? 1 : -1, + ), + ).toEqual(expectedAggreg); + + const aggregProfile: [ + FtAggregateReturnType, + Record, + ] = await GlideFt.profileAggregate( + client, + indexBicycles, + "*", + options, + ); + // profile metrics and categories are subject to change + expect(aggregProfile[1]).toBeTruthy(); + expect( + aggregProfile[0] + .map(convertGlideRecordToRecord) + // elements (records in array) could be reordered + .sort((a, b) => + a["condition"]! > b["condition"]! ? 1 : -1, + ), + ).toEqual(expectedAggreg); + + await GlideFt.dropindex(client, indexBicycles); + }, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "FT.AGGREGATE on HASH", + async (protocol) => { + client = await GlideClusterClient.createClient( + getClientConfigurationOption( + cluster.getAddresses(), + protocol, + ), + ); + + const isResp3 = protocol == ProtocolVersion.RESP3; + const prefixMovies = "{movies}:"; + const indexMovies = prefixMovies + uuidv4(); + const query = "*"; // FT.CREATE idx:movie ON hash PREFIX 1 "movie:" SCHEMA title TEXT release_year NUMERIC // rating NUMERIC genre TAG votes NUMERIC @@ -2742,7 +2861,7 @@ describe("Server Module Tests", () => { // FT.AGGREGATE idx:movie * LOAD * APPLY ceil(@rating) as r_rating GROUPBY 1 @genre REDUCE // COUNT 0 AS nb_of_movies REDUCE SUM 1 votes AS nb_of_votes REDUCE AVG 1 r_rating AS avg_rating // SORTBY 4 @avg_rating DESC @nb_of_votes DESC - options = { + const options: FtAggregateOptions = { loadAll: true, clauses: [ { @@ -2786,13 +2905,13 @@ describe("Server Module Tests", () => { }, ], }; - aggreg = ( - await GlideFt.aggregate(client, indexMovies, "*", options) - ) - .map(convertGlideRecordToRecord) - // elements (records in array) could be reordered - .sort((a, b) => (a["genre"]! > b["genre"]! ? 1 : -1)); - expect(aggreg).toEqual([ + const aggreg = await GlideFt.aggregate( + client, + indexMovies, + query, + options, + ); + const expectedAggreg = [ { genre: "Action", nb_of_movies: isResp3 ? 2.0 : "2", @@ -2811,15 +2930,38 @@ describe("Server Module Tests", () => { nb_of_votes: isResp3 ? 559490.0 : "559490", avg_rating: isResp3 ? 9.0 : "9", }, - ]); + ]; + expect( + aggreg + .map(convertGlideRecordToRecord) + // elements (records in array) could be reordered + .sort((a, b) => (a["genre"]! > b["genre"]! ? 1 : -1)), + ).toEqual(expectedAggreg); + + const aggregProfile: [ + FtAggregateReturnType, + Record, + ] = await GlideFt.profileAggregate( + client, + indexMovies, + query, + options, + ); + // profile metrics and categories are subject to change + expect(aggregProfile[1]).toBeTruthy(); + expect( + aggregProfile[0] + .map(convertGlideRecordToRecord) + // elements (records in array) could be reordered + .sort((a, b) => (a["genre"]! > b["genre"]! ? 1 : -1)), + ).toEqual(expectedAggreg); await GlideFt.dropindex(client, indexMovies); - await GlideFt.dropindex(client, indexBicycles); }, ); it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - "FT.INFO ft.info", + "FT.SEARCH binary on HASH", async (protocol) => { client = await GlideClusterClient.createClient( getClientConfigurationOption( @@ -2827,67 +2969,231 @@ describe("Server Module Tests", () => { protocol, ), ); + const prefix = "{" + uuidv4() + "}:"; + const index = prefix + "index"; + const query = "*=>[KNN 2 @VEC $query_vec]"; - const index = uuidv4(); + // setup a hash index: expect( await GlideFt.create( client, - Buffer.from(index), + index, [ { type: "VECTOR", - name: "$.vec", + name: "vec", alias: "VEC", attributes: { algorithm: "HNSW", - distanceMetric: "COSINE", - dimensions: 42, + distanceMetric: "L2", + dimensions: 2, }, }, - { type: "TEXT", name: "$.name" }, ], - { dataType: "JSON", prefixes: ["123"] }, + { + dataType: "HASH", + prefixes: [prefix], + }, ), ).toEqual("OK"); - let response = await GlideFt.info(client, Buffer.from(index)); + const binaryValue1 = Buffer.alloc(8); + expect( + await client.hset(Buffer.from(prefix + "0"), [ + // value of + { field: "vec", value: binaryValue1 }, + ]), + ).toEqual(1); - expect(response).toMatchObject({ - index_name: index, - key_type: "JSON", - key_prefixes: ["123"], - fields: [ + const binaryValue2: Buffer = Buffer.alloc(8); + binaryValue2[6] = 0x80; + binaryValue2[7] = 0xbf; + expect( + await client.hset(Buffer.from(prefix + "1"), [ + // value of + { field: "vec", value: binaryValue2 }, + ]), + ).toEqual(1); + + // let server digest the data and update index + const sleep = new Promise((resolve) => + setTimeout(resolve, DATA_PROCESSING_TIMEOUT), + ); + await sleep; + + // With the `COUNT` parameters - returns only the count + const optionsWithCount: FtSearchOptions = { + params: [{ key: "query_vec", value: binaryValue1 }], + timeout: 10000, + count: true, + }; + const binaryResultCount: FtSearchReturnType = + await GlideFt.search(client, index, query, { + decoder: Decoder.Bytes, + ...optionsWithCount, + }); + expect(binaryResultCount).toEqual([2]); + + const options: FtSearchOptions = { + params: [{ key: "query_vec", value: binaryValue1 }], + timeout: 10000, + }; + const binaryResult: FtSearchReturnType = await GlideFt.search( + client, + index, + query, + { + decoder: Decoder.Bytes, + ...options, + }, + ); + + const expectedBinaryResult: FtSearchReturnType = [ + 2, + [ { - identifier: "$.name", - type: "TEXT", - field_name: "$.name", - option: "", + key: Buffer.from(prefix + "1"), + value: [ + { + key: Buffer.from("vec"), + value: binaryValue2, + }, + { + key: Buffer.from("__VEC_score"), + value: Buffer.from("1"), + }, + ], }, { - identifier: "$.vec", - type: "VECTOR", - field_name: "VEC", - option: "", - vector_params: { - distance_metric: "COSINE", - dimension: 42, - }, + key: Buffer.from(prefix + "0"), + value: [ + { + key: Buffer.from("vec"), + value: binaryValue1, + }, + { + key: Buffer.from("__VEC_score"), + value: Buffer.from("0"), + }, + ], }, ], - }); + ]; + expect(binaryResult).toEqual(expectedBinaryResult); - response = await GlideFt.info(client, index, { + const binaryProfileResult: [ + FtSearchReturnType, + Record, + ] = await GlideFt.profileSearch(client, index, query, { decoder: Decoder.Bytes, + ...options, }); - expect(response).toMatchObject({ - index_name: Buffer.from(index), - }); + // profile metrics and categories are subject to change + expect(binaryProfileResult[1]).toBeTruthy(); + expect(binaryProfileResult[0]).toEqual(expectedBinaryResult); + }, + ); - expect(await GlideFt.dropindex(client, index)).toEqual("OK"); - // querying a missing index - await expect(GlideFt.info(client, index)).rejects.toThrow( - "Index not found", + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "FT.SEARCH binary on JSON", + async (protocol) => { + client = await GlideClusterClient.createClient( + getClientConfigurationOption( + cluster.getAddresses(), + protocol, + ), ); + + const prefix = "{" + uuidv4() + "}:"; + const index = prefix + "index"; + const query = "*"; + + // set string values + expect( + await GlideJson.set( + client, + prefix + "1", + "$", + '[{"arr": 42}, {"val": "hello"}, {"val": "world"}]', + ), + ).toEqual("OK"); + + // setup a json index: + expect( + await GlideFt.create( + client, + index, + [ + { + type: "NUMERIC", + name: "$..arr", + alias: "arr", + }, + { + type: "TEXT", + name: "$..val", + alias: "val", + }, + ], + { + dataType: "JSON", + prefixes: [prefix], + }, + ), + ).toEqual("OK"); + + // let server digest the data and update index + const sleep = new Promise((resolve) => + setTimeout(resolve, DATA_PROCESSING_TIMEOUT), + ); + await sleep; + + const optionsWithLimit: FtSearchOptions = { + returnFields: [ + { fieldIdentifier: "$..arr", alias: "myarr" }, + { fieldIdentifier: "$..val", alias: "myval" }, + ], + timeout: 10000, + limit: { offset: 0, count: 2 }, + }; + const stringResult: FtSearchReturnType = await GlideFt.search( + client, + index, + query, + optionsWithLimit, + ); + const expectedStringResult: FtSearchReturnType = [ + 1, + [ + { + key: prefix + "1", + value: [ + { + key: "myarr", + value: "42", + }, + { + key: "myval", + value: "hello", + }, + ], + }, + ], + ]; + expect(stringResult).toEqual(expectedStringResult); + + const stringProfileResult: [ + FtSearchReturnType, + Record, + ] = await GlideFt.profileSearch( + client, + index, + query, + optionsWithLimit, + ); + // profile metrics and categories are subject to change + expect(stringProfileResult[1]).toBeTruthy(); + expect(stringProfileResult[0]).toEqual(expectedStringResult); }, ); @@ -2956,209 +3262,6 @@ describe("Server Module Tests", () => { ).rejects.toThrow("Index not found"); }); - it("FT.SEARCH binary test", async () => { - client = await GlideClusterClient.createClient( - getClientConfigurationOption( - cluster.getAddresses(), - ProtocolVersion.RESP3, - ), - ); - const prefix = "{" + uuidv4() + "}:"; - const index = prefix + "index"; - - // setup a hash index: - expect( - await GlideFt.create( - client, - index, - [ - { - type: "VECTOR", - name: "vec", - alias: "VEC", - attributes: { - algorithm: "HNSW", - distanceMetric: "L2", - dimensions: 2, - }, - }, - ], - { - dataType: "HASH", - prefixes: [prefix], - }, - ), - ).toEqual("OK"); - - const binaryValue1 = Buffer.alloc(8); - expect( - await client.hset(Buffer.from(prefix + "0"), [ - // value of - { field: "vec", value: binaryValue1 }, - ]), - ).toEqual(1); - - const binaryValue2: Buffer = Buffer.alloc(8); - binaryValue2[6] = 0x80; - binaryValue2[7] = 0xbf; - expect( - await client.hset(Buffer.from(prefix + "1"), [ - // value of - { field: "vec", value: binaryValue2 }, - ]), - ).toEqual(1); - - // let server digest the data and update index - const sleep = new Promise((resolve) => - setTimeout(resolve, DATA_PROCESSING_TIMEOUT), - ); - await sleep; - - // With the `COUNT` parameters - returns only the count - const binaryResultCount: FtSearchReturnType = await GlideFt.search( - client, - index, - "*=>[KNN 2 @VEC $query_vec]", - { - params: [{ key: "query_vec", value: binaryValue1 }], - timeout: 10000, - count: true, - decoder: Decoder.Bytes, - }, - ); - expect(binaryResultCount).toEqual([2]); - - const binaryResult: FtSearchReturnType = await GlideFt.search( - client, - index, - "*=>[KNN 2 @VEC $query_vec]", - { - params: [{ key: "query_vec", value: binaryValue1 }], - timeout: 10000, - decoder: Decoder.Bytes, - }, - ); - - const expectedBinaryResult: FtSearchReturnType = [ - 2, - [ - { - key: Buffer.from(prefix + "1"), - value: [ - { - key: Buffer.from("vec"), - value: binaryValue2, - }, - { - key: Buffer.from("__VEC_score"), - value: Buffer.from("1"), - }, - ], - }, - { - key: Buffer.from(prefix + "0"), - value: [ - { - key: Buffer.from("vec"), - value: binaryValue1, - }, - { - key: Buffer.from("__VEC_score"), - value: Buffer.from("0"), - }, - ], - }, - ], - ]; - expect(binaryResult).toEqual(expectedBinaryResult); - }); - - it("FT.SEARCH string test", async () => { - client = await GlideClusterClient.createClient( - getClientConfigurationOption( - cluster.getAddresses(), - ProtocolVersion.RESP3, - ), - ); - - const prefix = "{" + uuidv4() + "}:"; - const index = prefix + "index"; - - // set string values - expect( - await GlideJson.set( - client, - prefix + "1", - "$", - '[{"arr": 42}, {"val": "hello"}, {"val": "world"}]', - ), - ).toEqual("OK"); - - // setup a json index: - expect( - await GlideFt.create( - client, - index, - [ - { - type: "NUMERIC", - name: "$..arr", - alias: "arr", - }, - { - type: "TEXT", - name: "$..val", - alias: "val", - }, - ], - { - dataType: "JSON", - prefixes: [prefix], - }, - ), - ).toEqual("OK"); - - // let server digest the data and update index - const sleep = new Promise((resolve) => - setTimeout(resolve, DATA_PROCESSING_TIMEOUT), - ); - await sleep; - - const stringResult: FtSearchReturnType = await GlideFt.search( - client, - index, - "*", - { - returnFields: [ - { fieldIdentifier: "$..arr", alias: "myarr" }, - { fieldIdentifier: "$..val", alias: "myval" }, - ], - timeout: 10000, - decoder: Decoder.String, - limit: { offset: 0, count: 2 }, - }, - ); - const expectedStringResult: FtSearchReturnType = [ - 1, - [ - { - key: prefix + "1", - value: [ - { - key: "myarr", - value: "42", - }, - { - key: "myval", - value: "hello", - }, - ], - }, - ], - ]; - expect(stringResult).toEqual(expectedStringResult); - }); - it("FT.ALIASADD, FT.ALIASUPDATE and FT.ALIASDEL test", async () => { client = await GlideClusterClient.createClient( getClientConfigurationOption(