Skip to content

Commit

Permalink
update momento sdk to latest version
Browse files Browse the repository at this point in the history
Changes to update momento sdk to latest version.
  • Loading branch information
brayniac committed Jun 6, 2024
1 parent 9783725 commit 3cbcc9c
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 154 deletions.
14 changes: 8 additions & 6 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::workload::ClientRequest;
use crate::workload::ClientWorkItem as WorkItem;
use crate::*;
use ::momento::MomentoError;

use ::momento::{MomentoError, MomentoErrorCode};
use async_channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use std::time::Instant;
use tokio::io::*;
use tokio::runtime::Runtime;
use tokio::time::{timeout, Duration};

use std::io::{Error, ErrorKind, Result};
use std::time::Instant;

mod http1;
mod http2;
mod memcache;
Expand Down Expand Up @@ -73,9 +75,9 @@ pub enum ResponseError {

impl From<MomentoError> for ResponseError {
fn from(other: MomentoError) -> Self {
match other {
MomentoError::LimitExceeded { .. } => ResponseError::Ratelimited,
MomentoError::Timeout { .. } => ResponseError::BackendTimeout,
match other.error_code {
MomentoErrorCode::LimitExceededError { .. } => ResponseError::Ratelimited,
MomentoErrorCode::TimeoutError { .. } => ResponseError::BackendTimeout,
_ => ResponseError::Exception,
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/clients/momento/commands/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ pub async fn hash_set(
if request.data.len() == 1 {
let (field, value) = request.data.iter().next().unwrap();

let r = DictionarySetFieldRequest::new(cache_name, &*request.key, &**field, &**value)
let field: Vec<u8> = field.to_vec();
let value: Vec<u8> = value.to_vec();

let r = DictionarySetFieldRequest::new(cache_name, &*request.key, field, value)
.ttl(CollectionTtl::new(request.ttl, false));

let result = timeout(
Expand All @@ -32,10 +35,10 @@ pub async fn hash_set(

record_result!(result, HASH_SET)
} else {
let d: Vec<(&[u8], &[u8])> = request
let d: Vec<(Vec<u8>, Vec<u8>)> = request
.data
.iter()
.map(|(k, v)| (k.as_ref(), v.as_ref()))
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect();

let r = DictionarySetFieldsRequest::new(cache_name, &*request.key, d)
Expand Down
10 changes: 4 additions & 6 deletions src/clients/momento/commands/list_push_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ pub async fn list_push_back(
} else {
// note: we need to reverse because the semantics of list
// concat do not match the redis push semantics
let mut r = ListConcatenateBackRequest::new(
cache_name,
&*request.key,
request.elements.iter().map(|v| &**v).rev(),
)
.ttl(CollectionTtl::new(None, false));
let elements: Vec<&[u8]> = request.elements.iter().map(|v| &**v).rev().collect();

let mut r = ListConcatenateBackRequest::new(cache_name, &*request.key, elements)
.ttl(CollectionTtl::new(None, false));

if let Some(t) = request.truncate {
r = r.truncate_front_to_size(t);
Expand Down
10 changes: 4 additions & 6 deletions src/clients/momento/commands/list_push_front.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ pub async fn list_push_front(
} else {
// note: we need to reverse because the semantics of list
// concat do not match the redis push semantics
let mut r = ListConcatenateFrontRequest::new(
cache_name,
&*request.key,
request.elements.iter().map(|v| &**v).rev(),
)
.ttl(CollectionTtl::new(None, false));
let elements: Vec<&[u8]> = request.elements.iter().map(|v| &**v).rev().collect();

let mut r = ListConcatenateFrontRequest::new(cache_name, &*request.key, elements)
.ttl(CollectionTtl::new(None, false));

if let Some(t) = request.truncate {
r = r.truncate_back_to_size(t);
Expand Down
3 changes: 1 addition & 2 deletions src/clients/momento/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;

use paste::paste;

mod delete;
Expand All @@ -20,7 +21,6 @@ mod set_add;
mod set_members;
mod set_remove;
mod sorted_set_add;
mod sorted_set_increment;
mod sorted_set_range;
mod sorted_set_rank;
mod sorted_set_remove;
Expand All @@ -45,7 +45,6 @@ pub use set_add::*;
pub use set_members::*;
pub use set_remove::*;
pub use sorted_set_add::*;
pub use sorted_set_increment::*;
pub use sorted_set_range::*;
pub use sorted_set_rank::*;
pub use sorted_set_remove::*;
Expand Down
14 changes: 6 additions & 8 deletions src/clients/momento/commands/set_add.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use ::momento::cache::CollectionTtl;
use ::momento::cache::SetAddElementsRequest;

use ::momento::cache::{CollectionTtl, SetAddElementsRequest};

/// Adds one or more members (elements) to a set.
///
Expand All @@ -14,12 +14,10 @@ pub async fn set_add(
) -> std::result::Result<(), ResponseError> {
SET_ADD.increment();

let r = SetAddElementsRequest::new(
cache_name,
&*request.key,
request.members.iter().map(|v| &**v),
)
.ttl(CollectionTtl::new(request.ttl, false));
let members: Vec<&[u8]> = request.members.iter().map(|v| &**v).collect();

let r = SetAddElementsRequest::new(cache_name, &*request.key, members)
.ttl(CollectionTtl::new(request.ttl, false));

let result = timeout(
config.client().unwrap().request_timeout(),
Expand Down
2 changes: 1 addition & 1 deletion src/clients/momento/commands/set_remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn set_remove(
) -> std::result::Result<(), ResponseError> {
SET_REMOVE.increment();

let members: Vec<&[u8]> = request.members.iter().map(|v| v.borrow()).collect();
let members: Vec<&[u8]> = request.members.iter().map(|v| &**v).collect();

let result = timeout(
config.client().unwrap().request_timeout(),
Expand Down
11 changes: 5 additions & 6 deletions src/clients/momento/commands/sorted_set_add.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::*;
use ::momento::cache::CollectionTtl;
use ::momento::cache::SortedSetPutElementRequest;
use ::momento::cache::SortedSetPutElementsRequest;

use ::momento::cache::{CollectionTtl, SortedSetPutElementRequest, SortedSetPutElementsRequest};

pub async fn sorted_set_add(
client: &mut CacheClient,
Expand All @@ -18,7 +17,7 @@ pub async fn sorted_set_add(
if request.members.len() == 1 {
let (member, score) = request.members.first().unwrap();

let r = SortedSetPutElementRequest::new(cache_name, &*request.key, &**member, *score)
let r = SortedSetPutElementRequest::new(cache_name, &*request.key, member.to_vec(), *score)
.ttl(CollectionTtl::new(request.ttl, false));

let result = timeout(
Expand All @@ -29,10 +28,10 @@ pub async fn sorted_set_add(

record_result!(result, SORTED_SET_ADD)
} else {
let d: Vec<(&[u8], f64)> = request
let d: Vec<(Vec<u8>, f64)> = request
.members
.iter()
.map(|(m, s)| (m.as_ref(), *s))
.map(|(m, s)| (m.to_vec(), *s))
.collect();

let r = SortedSetPutElementsRequest::new(cache_name, &*request.key, d)
Expand Down
27 changes: 0 additions & 27 deletions src/clients/momento/commands/sorted_set_increment.rs

This file was deleted.

53 changes: 23 additions & 30 deletions src/clients/momento/commands/sorted_set_range.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,49 @@
use super::*;
use core::ops::Bound;

use ::momento::cache::{SortedSetFetchByRankRequest, SortedSetFetchByScoreRequest, SortedSetOrder};

/// Performs a range query on a sorted set, returning the specified range of
/// elements. Supports selecting a range of keys by index (rank).
pub async fn sorted_set_range(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::SortedSetRange,
) -> std::result::Result<(), ResponseError> {
SORTED_SET_RANGE.increment();

let result = if !request.by_score {
let start = match request.start {
None => Bound::Unbounded,
Some(v) => Bound::Included(v),
};
let mut r = SortedSetFetchByRankRequest::new(cache_name, &*request.key)
.order(SortedSetOrder::Ascending);

if let Some(start) = request.start {
r = r.start_rank(start);
}

let end = match request.end {
None => Bound::Unbounded,
Some(v) => Bound::Included(v),
};
if let Some(end) = request.end {
r = r.end_rank(end);
}

timeout(
config.client().unwrap().request_timeout(),
client.sorted_set_fetch_by_index(
cache_name,
&*request.key,
momento::sorted_set::Order::Ascending,
(start, end),
),
client.send_request(r),
)
.await
} else {
let start = match request.start {
None => Bound::Unbounded,
Some(v) => Bound::Included(v as f64),
};
let mut r = SortedSetFetchByScoreRequest::new(cache_name, &*request.key)
.order(SortedSetOrder::Ascending);

if let Some(start) = request.start {
r = r.min_score(start.into());
}

let end = match request.end {
None => Bound::Unbounded,
Some(v) => Bound::Included(v as f64),
};
if let Some(end) = request.end {
r = r.max_score(end.into());
}

timeout(
config.client().unwrap().request_timeout(),
client.sorted_set_fetch_by_score(
cache_name,
&*request.key,
momento::sorted_set::Order::Ascending,
(start, end),
),
client.send_request(r),
)
.await
};
Expand Down
4 changes: 3 additions & 1 deletion src/clients/momento/commands/sorted_set_rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use super::*;

/// Retrieve the rank for a member of a sorted set.
pub async fn sorted_set_rank(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::SortedSetRank,
) -> std::result::Result<(), ResponseError> {
SORTED_SET_RANK.increment();

let result = timeout(
config.client().unwrap().request_timeout(),
client.sorted_set_get_rank(cache_name, &*request.key, &*request.member),
)
.await;

record_result!(result, SORTED_SET_RANK)
}
11 changes: 8 additions & 3 deletions src/clients/momento/commands/sorted_set_remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ use super::*;

/// Removes one or more members of a sorted set.
pub async fn sorted_set_remove(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::SortedSetRemove,
) -> std::result::Result<(), ResponseError> {
SORTED_SET_REMOVE.increment();
let members: Vec<&[u8]> = request.members.iter().map(|v| v.borrow()).collect();

let result = timeout(
config.client().unwrap().request_timeout(),
client.sorted_set_remove(cache_name, &*request.key, members),
client.sorted_set_remove_elements(
cache_name,
&*request.key,
request.members.iter().map(|v| v.to_vec()),
),
)
.await;

record_result!(result, SORTED_SET_REMOVE)
}
14 changes: 11 additions & 3 deletions src/clients/momento/commands/sorted_set_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@ use super::*;

/// Retrieve the score of one or more members of a sorted set.
pub async fn sorted_set_score(
client: &mut SimpleCacheClient,
client: &mut CacheClient,
config: &Config,
cache_name: &str,
request: workload::client::SortedSetScore,
) -> std::result::Result<(), ResponseError> {
if request.members.is_empty() {
return Ok(());
} else if request.members.len() > 1 {
REQUEST_UNSUPPORTED.increment();
return Ok(());
}

SORTED_SET_SCORE.increment();
let members: Vec<&[u8]> = request.members.iter().map(|v| v.borrow()).collect();

let result = timeout(
config.client().unwrap().request_timeout(),
client.sorted_set_get_score(cache_name, &*request.key, members),
client.sorted_set_get_score(cache_name, &*request.key, &*request.members[0]),
)
.await;

record_result!(result, SORTED_SET_SCORE)
}
Loading

0 comments on commit 3cbcc9c

Please sign in to comment.