Skip to content

Commit

Permalink
Add VssService and base service setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
G8XSU committed Sep 30, 2024
1 parent ee24608 commit 5ba4b28
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 0 deletions.
15 changes: 15 additions & 0 deletions rust/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "server"
version = "0.1.0"
edition = "2021"

[dependencies]
api = { path = "../api" }
impls = { path = "../impls" }

hyper = { version = "1", default-features = false, features = ["server", "http1"] }
http-body-util = { version = "0.1", default-features = false }
hyper-util = { version = "0.1", default-features = false, features = ["server-graceful"] }
tokio = { version = "1.38.0", default-features = false, features = ["time", "signal", "rt-multi-thread", "macros"] }
prost = { version = "0.11.6", default-features = false, features = ["std"] }
bytes = "1.4.0"
75 changes: 75 additions & 0 deletions rust/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use hyper::service::Service;
use prost::Message;
use std::net::SocketAddr;

use tokio::net::TcpListener;
use tokio::signal::unix::SignalKind;

use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;

use crate::vss_service::VssService;
use api::auth::NoopAuthorizer;
use api::kv_store::KvStore;
use impls::postgres_store::PostgresBackendImpl;
use std::str::FromStr;
use std::sync::Arc;

pub(crate) mod vss_service;

fn main() {
// Define the address to bind the server to
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

let runtime = match tokio::runtime::Builder::new_multi_thread().enable_all().build() {
Ok(runtime) => Arc::new(runtime),
Err(e) => {
eprintln!("Failed to setup tokio runtime: {}", e);
std::process::exit(-1);
},
};

runtime.block_on(async {
let mut sigterm_stream = match tokio::signal::unix::signal(SignalKind::terminate()) {
Ok(stream) => stream,
Err(e) => {
println!("Failed to register for SIGTERM stream: {}", e);
std::process::exit(-1);
},
};
let authorizer = Arc::new(NoopAuthorizer {});
let store = Arc::new(
PostgresBackendImpl::new("postgresql://postgres:postgres@localhost:5432/postgres")
.await
.unwrap(),
);
let rest_svc_listener =
TcpListener::bind(&addr).await.expect("Failed to bind listening port");
loop {
tokio::select! {
res = rest_svc_listener.accept() => {
match res {
Ok((stream, _)) => {
let io_stream = TokioIo::new(stream);
let vss_service = VssService::new(Arc::clone(&store) as Arc<dyn KvStore>,authorizer );
runtime.spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io_stream, vss_service).await {
eprintln!("Failed to serve connection: {}", err);
}
});
},
Err(e) => eprintln!("Failed to accept connection: {}", e),
}
}
_ = tokio::signal::ctrl_c() => {
println!("Received CTRL-C, shutting down..");
break;
}
_ = sigterm_stream.recv() => {
println!("Received SIGTERM, shutting down..");
break;
}
}
}
});
}
154 changes: 154 additions & 0 deletions rust/server/src/vss_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Error, Request, Response, StatusCode};
use std::collections::HashMap;

use prost::Message;

use api::auth::Authorizer;
use api::error::VssError;
use api::kv_store::KvStore;
use api::types::{
DeleteObjectRequest, DeleteObjectResponse, ErrorCode, ErrorResponse, GetObjectRequest,
GetObjectResponse, ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest,
PutObjectResponse,
};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

#[derive(Clone)]
pub struct VssService {
store: Arc<dyn KvStore>,
authorizer: Arc<dyn Authorizer>,
}

impl VssService {
pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self {
Self { store, authorizer }
}
}

impl Service<Request<Incoming>> for VssService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
let store = Arc::clone(&self.store);
let authorizer = Arc::clone(&self.authorizer);
let path = req.uri().path().to_owned();
Box::pin(async move {
match path.as_str() {
"/getObject" => {
handle_request(store, authorizer, req, handle_get_object_request).await
},
"/putObjects" => {
handle_request(store, authorizer, req, handle_put_object_request).await
},
"/deleteObject" => {
handle_request(store, authorizer, req, handle_delete_object_request).await
},
"/listKeyVersions" => {
handle_request(store, authorizer, req, handle_list_object_request).await
},
_ => {
let error = format!("Unknown request: {}", path).into_bytes();
Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Full::new(Bytes::from(error)))
.unwrap())
},
}
})
}
}

async fn handle_get_object_request(
store: Arc<dyn KvStore>, user_token: String, request: GetObjectRequest,
) -> Result<GetObjectResponse, VssError> {
store.get(user_token, request).await
}
async fn handle_put_object_request(
store: Arc<dyn KvStore>, user_token: String, request: PutObjectRequest,
) -> Result<PutObjectResponse, VssError> {
store.put(user_token, request).await
}
async fn handle_delete_object_request(
store: Arc<dyn KvStore>, user_token: String, request: DeleteObjectRequest,
) -> Result<DeleteObjectResponse, VssError> {
store.delete(user_token, request).await
}
async fn handle_list_object_request(
store: Arc<dyn KvStore>, user_token: String, request: ListKeyVersionsRequest,
) -> Result<ListKeyVersionsResponse, VssError> {
store.list_key_versions(user_token, request).await
}
async fn handle_request<
T: Message + Default,
R: Message,
F: FnOnce(Arc<dyn KvStore>, String, T) -> Fut + Send + 'static,
Fut: Future<Output = Result<R, VssError>> + Send,
>(
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>,
handler: F,
) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> {
// TODO: we should bound the amount of data we read to avoid allocating too much memory.
let (parts, body) = request.into_parts();
let headers_map = parts
.headers
.iter()
.map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
.collect::<HashMap<String, String>>();

let user_token = match authorizer.verify(&headers_map).await {
Ok(auth_response) => auth_response.user_token,
Err(e) => return build_error_response(e),
};

let bytes = body.collect().await?.to_bytes();
match T::decode(bytes) {
Ok(request) => match handler(store.clone(), user_token, request).await {
Ok(response) => Ok(Response::builder()
.body(Full::new(Bytes::from(response.encode_to_vec())))
// unwrap safety: body only errors when previous chained calls failed.
.unwrap()),
Err(e) => build_error_response(e),
},
Err(_) => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Full::new(Bytes::from(b"Error parsing request".to_vec())))
// unwrap safety: body only errors when previous chained calls failed.
.unwrap()),
}
}

fn build_error_response(e: VssError) -> Result<Response<Full<Bytes>>, Error> {
let error_response = match e {
VssError::NoSuchKeyError(msg) => ErrorResponse {
error_code: ErrorCode::NoSuchKeyException.into(),
message: msg.to_string(),
},
VssError::ConflictError(msg) => ErrorResponse {
error_code: ErrorCode::ConflictException.into(),
message: msg.to_string(),
},
VssError::InvalidRequestError(msg) => ErrorResponse {
error_code: ErrorCode::InvalidRequestException.into(),
message: msg.to_string(),
},
VssError::AuthError(msg) => {
ErrorResponse { error_code: ErrorCode::AuthException.into(), message: msg.to_string() }
},
_ => ErrorResponse {
error_code: ErrorCode::InternalServerException.into(),
message: "Unknown Server Error occurred.".to_string(),
},
};
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::new(Bytes::from(error_response.encode_to_vec())))
// unwrap safety: body only errors when previous chained calls failed.
.unwrap())
}

0 comments on commit 5ba4b28

Please sign in to comment.