Skip to content

Commit

Permalink
check hash and retry in 'sendFiles'
Browse files Browse the repository at this point in the history
  • Loading branch information
pepoviola committed Dec 11, 2024
1 parent 56e26a8 commit cdac3de
Showing 1 changed file with 88 additions and 49 deletions.
137 changes: 88 additions & 49 deletions crates/provider/src/kubernetes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ where
trace!("snap: {db_snapshot}");
let url_of_snap = match db_snapshot {
AssetLocation::Url(location) => location.clone(),
AssetLocation::FilePath(filepath) => self.upload_to_fileserver(filepath).await?,
AssetLocation::FilePath(filepath) => {
let (url, _) = self.upload_to_fileserver(filepath).await?;
url
},
};

// we need to get the snapshot from a public access
Expand Down Expand Up @@ -349,13 +352,19 @@ where
.unwrap_or_else(|| panic!("namespace shouldn't be dropped, {}", THIS_IS_A_BUG))
}

async fn upload_to_fileserver(&self, location: &Path) -> Result<Url, ProviderError> {
async fn upload_to_fileserver(&self, location: &Path) -> Result<(Url, String), ProviderError> {
let file_name = if let Some(name) = location.file_name() {
name.to_string_lossy()
} else {
"unnamed".into()
};

let data = self.filesystem.read(location).await?;
let hashed_path = hex::encode(sha2::Sha256::digest(&data));
let content_hashed = hex::encode(sha2::Sha256::digest(&data));
let req = self
.http_client
.head(format!(
"http://{}/{hashed_path}",
"http://{}/{content_hashed}__{file_name}",
self.file_server_local_host().await?
))
.build()
Expand All @@ -380,7 +389,7 @@ where
})?;
}

Ok(url)
Ok((url, content_hashed))
}

async fn file_server_local_host(&self) -> Result<String, ProviderError> {
Expand All @@ -394,6 +403,66 @@ where
"file server port not bound locally"
)))
}

async fn download_file(&self, url: &str, remote_file_path: &Path, hash: Option<&str>) -> Result<(), ProviderError> {
let r = self
.k8s_client
.pod_exec(
&self.namespace_name(),
&self.name,
vec![
"/cfg/curl",
url,
"--output",
&remote_file_path.to_string_lossy(),
],
)
.await
.map_err(|err| {
ProviderError::DownloadFile(
remote_file_path.to_string_lossy().to_string(),
anyhow!(format!("node: {}, err: {}", self.name(), err)),
)
})?;

trace!("download url {} result: {:?}",url, r);

if r.is_err() {
return Err(ProviderError::DownloadFile(
remote_file_path.to_string_lossy().to_string(),
anyhow!(format!("node: {}, err downloading file", self.name()))
));
}

if let Some(hash) = hash {
// check if the hash of the file is correct
let res = self
.k8s_client
.pod_exec(&self.namespace_name(), &self.name, vec!["/cfg/coreutils", "sha256sum", &remote_file_path.to_string_lossy()]).await
.map_err(|err| {
ProviderError::DownloadFile(
remote_file_path.to_string_lossy().to_string(),
anyhow!(format!("node: {}, err: {}", self.name(), err)),
)
})?;

if let Ok(output) = res {
if ! output.contains(hash) {
return Err(ProviderError::DownloadFile(
remote_file_path.to_string_lossy().to_string(),
anyhow!(format!("node: {}, invalid sha256sum for file", self.name()))
));
}
} else {
return Err(ProviderError::DownloadFile(
remote_file_path.to_string_lossy().to_string(),
anyhow!(format!("node: {}, err calculating sha256sum for file {:?}", self.name(),res))
));
}
}

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -545,55 +614,25 @@ where
remote_file_path: &Path,
mode: &str,
) -> Result<(), ProviderError> {
let data = self.filesystem.read(local_file_path).await.map_err(|err| {
ProviderError::SendFile(
self.name.clone(),
local_file_path.to_string_lossy().to_string(),
err.into(),
)
})?;

if let Some(remote_parent_dir) = self.get_remote_parent_dir(remote_file_path) {
self.create_remote_dir(&remote_parent_dir).await?;
}

self.http_client
.post(format!(
"http://{}{}",
self.file_server_local_host().await?,
remote_file_path.to_string_lossy()
))
.body(data)
.send()
.await
.map_err(|err| {
ProviderError::SendFile(
self.name.clone(),
local_file_path.to_string_lossy().to_string(),
err.into(),
)
})?;
trace!(
"Uploading file: {} IFF not present in the fileserver",
local_file_path.to_string_lossy()
);

let _ = self
.k8s_client
.pod_exec(
&self.namespace_name(),
&self.name,
vec![
"/cfg/curl",
&format!("fileserver{}", remote_file_path.to_string_lossy()),
"--output",
&remote_file_path.to_string_lossy(),
],
)
.await
.map_err(|err| {
ProviderError::SendFile(
self.name.clone(),
local_file_path.to_string_lossy().to_string(),
err.into(),
)
})?;
// we need to override the url to use inside the pod
let (mut url, hash) = self.upload_to_fileserver(local_file_path).await?;
let _ = url.set_host(Some("fileserver"));
let _ = url.set_port(Some(80));

let res = self.download_file(&url.to_string(),&remote_file_path, Some(&hash)).await;
if res.is_err() {
// re-try one time
self.download_file(&url.to_string(),&remote_file_path, Some(&hash)).await?;
}

let _ = self
.k8s_client
Expand Down

0 comments on commit cdac3de

Please sign in to comment.