diff --git a/vmm/sandbox/src/client.rs b/vmm/sandbox/src/client.rs index e1d284b8..de1a5411 100644 --- a/vmm/sandbox/src/client.rs +++ b/vmm/sandbox/src/client.rs @@ -21,9 +21,13 @@ use std::{ use anyhow::anyhow; use containerd_sandbox::error::{Error, Result}; -use log::error; +use log::{debug, error, warn}; use nix::{ - sys::socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr}, + sys::{ + socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr}, + time::TimeValLike, + }, + time::{clock_gettime, ClockId}, unistd::close, }; use tokio::{ @@ -36,7 +40,9 @@ use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient}; use crate::network::{NetworkInterface, Route}; -const HVSOCK_RETRY_TIMEOUT: u64 = 10; +const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10; +const TIME_SYNC_PERIOD: u64 = 60; +const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10; pub(crate) async fn new_sandbox_client(address: &str) -> Result { let client = new_ttrpc_client(address).await?; @@ -188,9 +194,9 @@ async fn connect_to_hvsocket(address: &str) -> Result { .map_err(Error::Other) }; - timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT), fut) + timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut) .await - .map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT))? + .map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))? } pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result { @@ -256,3 +262,52 @@ pub(crate) async fn client_update_routes( .map_err(|e| anyhow!("failed to update routes: {}", e))?; Ok(()) } + +pub(crate) async fn client_sync_clock(client: &SandboxServiceClient) { + let client = client.clone(); + let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS).as_nanos() as i64; + let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await; + debug!("sync_clock: start sync clock from host to guest"); + + let mut req = SyncClockPacket::new(); + match clock_gettime(clock_id) { + Ok(ts) => req.ClientSendTime = ts.num_nanoseconds(), + Err(e) => { + warn!("failed to get current clock: {}", e); + continue; + } + } + match client + .sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req) + .await + { + Ok(mut p) => { + match clock_gettime(clock_id) { + Ok(ts) => p.ServerArriveTime = ts.num_nanoseconds(), + Err(e) => { + warn!("failed to get current clock: {}", e); + continue; + } + } + p.Delta = ((p.ClientSendTime - p.ClientArriveTime) + + (p.ServerArriveTime - p.ServerSendTime)) + / 2; + if p.Delta.abs() > tolerance_nanos { + if let Err(e) = client + .sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p) + .await + { + error!("sync clock set delta failed: {:?}", e); + } + } + } + Err(e) => { + error!("sync clock get error: {:?}", e); + } + } + } + }); +} diff --git a/vmm/sandbox/src/cloud_hypervisor/hooks.rs b/vmm/sandbox/src/cloud_hypervisor/hooks.rs index 7fb55ec8..58c56507 100644 --- a/vmm/sandbox/src/cloud_hypervisor/hooks.rs +++ b/vmm/sandbox/src/cloud_hypervisor/hooks.rs @@ -31,8 +31,9 @@ impl Hooks for CloudHypervisorHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - let data = &mut sandbox.data; - data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + // sync clock + sandbox.sync_clock().await; Ok(()) } } diff --git a/vmm/sandbox/src/qemu/hooks.rs b/vmm/sandbox/src/qemu/hooks.rs index 5bb40ed9..9f296726 100644 --- a/vmm/sandbox/src/qemu/hooks.rs +++ b/vmm/sandbox/src/qemu/hooks.rs @@ -44,8 +44,9 @@ impl Hooks for QemuHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - let data = &mut sandbox.data; - data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + // sync clock + sandbox.sync_clock().await; Ok(()) } } diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index f14ee60f..c66d3e70 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -35,7 +35,10 @@ use tokio::{ use vmm_common::{api::sandbox_ttrpc::SandboxServiceClient, storage::Storage, SHARED_DIR_SUFFIX}; use crate::{ - client::{client_check, client_update_interfaces, client_update_routes, new_sandbox_client}, + client::{ + client_check, client_sync_clock, client_update_interfaces, client_update_routes, + new_sandbox_client, + }, container::KuasarContainer, network::{Network, NetworkConfig}, utils::get_resources, @@ -460,6 +463,13 @@ where } Ok(()) } + + pub(crate) async fn sync_clock(&self) { + let client_guard = self.client.lock().await; + if let Some(client) = &*client_guard { + client_sync_clock(client).await; + } + } } #[derive(Default, Debug, Deserialize)] diff --git a/vmm/sandbox/src/stratovirt/hooks.rs b/vmm/sandbox/src/stratovirt/hooks.rs index c7a8fddb..983f998a 100644 --- a/vmm/sandbox/src/stratovirt/hooks.rs +++ b/vmm/sandbox/src/stratovirt/hooks.rs @@ -42,8 +42,9 @@ impl Hooks for StratoVirtHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - let data = &mut sandbox.data; - data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + // sync clock + sandbox.sync_clock().await; Ok(()) } } diff --git a/vmm/task/src/sandbox_service.rs b/vmm/task/src/sandbox_service.rs index c279fe3e..72f60f34 100644 --- a/vmm/task/src/sandbox_service.rs +++ b/vmm/task/src/sandbox_service.rs @@ -14,10 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::sync::Arc; +use std::{ops::Add, sync::Arc, time::Duration}; use async_trait::async_trait; -use containerd_shim::{error::Result, TtrpcContext, TtrpcResult}; +use containerd_shim::{error::Result, Error, TtrpcContext, TtrpcResult}; +use nix::{ + sys::time::{TimeSpec, TimeValLike}, + time::{clock_gettime, clock_settime, ClockId}, +}; use tokio::sync::Mutex; use vmm_common::{ api, @@ -85,11 +89,27 @@ impl api::sandbox_ttrpc::SandboxService for SandboxService { async fn sync_clock( &self, _ctx: &TtrpcContext, - _req: SyncClockPacket, + req: SyncClockPacket, ) -> TtrpcResult { - Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status( - ::ttrpc::Code::NOT_FOUND, - "/grpc.SandboxService/SyncClock is not supported".to_string(), - ))) + let mut resp = req.clone(); + let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME); + match req.Delta { + 0 => { + resp.ClientArriveTime = clock_gettime(clock_id) + .map_err(Error::Nix)? + .num_nanoseconds(); + resp.ServerSendTime = clock_gettime(clock_id) + .map_err(Error::Nix)? + .num_nanoseconds(); + } + _ => { + let mut clock_spce = clock_gettime(clock_id).map_err(Error::Nix)?; + clock_spce = clock_spce.add(TimeSpec::from_duration(Duration::from_nanos( + req.Delta as u64, + ))); + clock_settime(clock_id, clock_spce).map_err(Error::Nix)?; + } + } + Ok(resp) } }