Skip to content

Commit

Permalink
Merge pull request #50 from Burning1020/sync-clock
Browse files Browse the repository at this point in the history
vmm: support time synchronization
  • Loading branch information
abel-von authored Aug 22, 2023
2 parents 789f256 + 106ad63 commit a138c50
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 27 deletions.
8 changes: 4 additions & 4 deletions vmm/sandbox/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 60 additions & 5 deletions vmm/sandbox/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ use std::{

use anyhow::anyhow;
use containerd_sandbox::error::{Error, Result};
use log::error;
use log::{debug, error, warn};
use nix::{
sys::socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr},
sys::{
socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr},
time::TimeValLike,
},
time::{clock_gettime, ClockId},
unistd::close,
};
use tokio::{
Expand All @@ -36,7 +40,9 @@ use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient};

use crate::network::{NetworkInterface, Route};

const HVSOCK_RETRY_TIMEOUT: u64 = 10;
const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10;
const TIME_SYNC_PERIOD: u64 = 60;
const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10;

pub(crate) async fn new_sandbox_client(address: &str) -> Result<SandboxServiceClient> {
let client = new_ttrpc_client(address).await?;
Expand Down Expand Up @@ -188,9 +194,9 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
.map_err(Error::Other)
};

timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT), fut)
timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut)
.await
.map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT))?
.map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))?
}

pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result<UnixAddr> {
Expand Down Expand Up @@ -256,3 +262,52 @@ pub(crate) async fn client_update_routes(
.map_err(|e| anyhow!("failed to update routes: {}", e))?;
Ok(())
}

pub(crate) async fn client_sync_clock(client: &SandboxServiceClient) {
let client = client.clone();
let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS).as_nanos() as i64;
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await;
debug!("sync_clock: start sync clock from host to guest");

let mut req = SyncClockPacket::new();
match clock_gettime(clock_id) {
Ok(ts) => req.ClientSendTime = ts.num_nanoseconds(),
Err(e) => {
warn!("failed to get current clock: {}", e);
continue;
}
}
match client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req)
.await
{
Ok(mut p) => {
match clock_gettime(clock_id) {
Ok(ts) => p.ServerArriveTime = ts.num_nanoseconds(),
Err(e) => {
warn!("failed to get current clock: {}", e);
continue;
}
}
p.Delta = ((p.ClientSendTime - p.ClientArriveTime)
+ (p.ServerArriveTime - p.ServerSendTime))
/ 2;
if p.Delta.abs() > tolerance_nanos {
if let Err(e) = client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p)
.await
{
error!("sync clock set delta failed: {:?}", e);
}
}
}
Err(e) => {
error!("sync clock get error: {:?}", e);
}
}
}
});
}
5 changes: 3 additions & 2 deletions vmm/sandbox/src/cloud_hypervisor/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ impl Hooks<CloudHypervisorVM> for CloudHypervisorHooks {
}

async fn post_start(&self, sandbox: &mut KuasarSandbox<CloudHypervisorVM>) -> Result<()> {
let data = &mut sandbox.data;
data.task_address = sandbox.vm.agent_socket.to_string();
sandbox.data.task_address = sandbox.vm.agent_socket.to_string();
// sync clock
sandbox.sync_clock().await;
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions vmm/sandbox/src/qemu/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ impl Hooks<QemuVM> for QemuHooks {
}

async fn post_start(&self, sandbox: &mut KuasarSandbox<QemuVM>) -> Result<()> {
let data = &mut sandbox.data;
data.task_address = sandbox.vm.agent_socket.to_string();
sandbox.data.task_address = sandbox.vm.agent_socket.to_string();
// sync clock
sandbox.sync_clock().await;
Ok(())
}
}
Expand Down
12 changes: 11 additions & 1 deletion vmm/sandbox/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use tokio::{
use vmm_common::{api::sandbox_ttrpc::SandboxServiceClient, storage::Storage, SHARED_DIR_SUFFIX};

use crate::{
client::{client_check, client_update_interfaces, client_update_routes, new_sandbox_client},
client::{
client_check, client_sync_clock, client_update_interfaces, client_update_routes,
new_sandbox_client,
},
container::KuasarContainer,
network::{Network, NetworkConfig},
utils::get_resources,
Expand Down Expand Up @@ -460,6 +463,13 @@ where
}
Ok(())
}

pub(crate) async fn sync_clock(&self) {
let client_guard = self.client.lock().await;
if let Some(client) = &*client_guard {
client_sync_clock(client).await;
}
}
}

#[derive(Default, Debug, Deserialize)]
Expand Down
5 changes: 3 additions & 2 deletions vmm/sandbox/src/stratovirt/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ impl Hooks<StratoVirtVM> for StratoVirtHooks {
}

async fn post_start(&self, sandbox: &mut KuasarSandbox<StratoVirtVM>) -> Result<()> {
let data = &mut sandbox.data;
data.task_address = sandbox.vm.agent_socket.to_string();
sandbox.data.task_address = sandbox.vm.agent_socket.to_string();
// sync clock
sandbox.sync_clock().await;
Ok(())
}
}
8 changes: 4 additions & 4 deletions vmm/task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 27 additions & 7 deletions vmm/task/src/sandbox_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::sync::Arc;
use std::{ops::Add, sync::Arc, time::Duration};

use async_trait::async_trait;
use containerd_shim::{error::Result, TtrpcContext, TtrpcResult};
use containerd_shim::{error::Result, Error, TtrpcContext, TtrpcResult};
use nix::{
sys::time::{TimeSpec, TimeValLike},
time::{clock_gettime, clock_settime, ClockId},
};
use tokio::sync::Mutex;
use vmm_common::{
api,
Expand Down Expand Up @@ -85,11 +89,27 @@ impl api::sandbox_ttrpc::SandboxService for SandboxService {
async fn sync_clock(
&self,
_ctx: &TtrpcContext,
_req: SyncClockPacket,
req: SyncClockPacket,
) -> TtrpcResult<SyncClockPacket> {
Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status(
::ttrpc::Code::NOT_FOUND,
"/grpc.SandboxService/SyncClock is not supported".to_string(),
)))
let mut resp = req.clone();
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
match req.Delta {
0 => {
resp.ClientArriveTime = clock_gettime(clock_id)
.map_err(Error::Nix)?
.num_nanoseconds();
resp.ServerSendTime = clock_gettime(clock_id)
.map_err(Error::Nix)?
.num_nanoseconds();
}
_ => {
let mut clock_spce = clock_gettime(clock_id).map_err(Error::Nix)?;
clock_spce = clock_spce.add(TimeSpec::from_duration(Duration::from_nanos(
req.Delta as u64,
)));
clock_settime(clock_id, clock_spce).map_err(Error::Nix)?;
}
}
Ok(resp)
}
}

0 comments on commit a138c50

Please sign in to comment.