Skip to content

Commit

Permalink
audio sync
Browse files Browse the repository at this point in the history
  • Loading branch information
raffaeleragni committed Jul 14, 2024
1 parent 8ea4312 commit 95e870e
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Current state is in development.
- [X] rigged (>= 0.14.0)
- [X] with morphs (>= 0.14.0)
- [X] Asset: Textures
- [ ] Asset: Audio
- [X] Asset: Audio (>= 0.14.0)
- [X] Compressed Assets
- [ ] Throttleable sync (time window queuing)
- [ ] Skippable channel for Unordered+Unreliable
Expand Down
10 changes: 7 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ use bevy_renet::renet::{
};

use crate::{
lib_priv::{sync_material_enabled, sync_mesh_enabled, PromotionState, SyncTrackerRes},
lib_priv::{
sync_audio_enabled, sync_material_enabled, sync_mesh_enabled, PromotionState,
SyncTrackerRes,
},
proto::Message,
ClientState,
};

use self::track::{
entity_created_on_client, entity_parented_on_client, entity_removed_from_client,
react_on_changed_components, react_on_changed_images, react_on_changed_materials,
react_on_changed_meshes,
react_on_changed_audios, react_on_changed_components, react_on_changed_images,
react_on_changed_materials, react_on_changed_meshes,
};

mod receiver;
Expand Down Expand Up @@ -58,6 +61,7 @@ impl Plugin for ClientSyncPlugin {
react_on_changed_materials.run_if(sync_material_enabled),
react_on_changed_images.run_if(sync_material_enabled),
react_on_changed_meshes.run_if(sync_mesh_enabled),
react_on_changed_audios.run_if(sync_audio_enabled),
receiver::poll_for_messages,
)
.chain()
Expand Down
1 change: 1 addition & 0 deletions src/client/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn client_received_a_message(
}),
Message::MeshUpdated { id, url } => sync_assets.request(SyncAssetType::Mesh, id, url),
Message::ImageUpdated { id, url } => sync_assets.request(SyncAssetType::Image, id, url),
Message::AudioUpdated { id, url } => sync_assets.request(SyncAssetType::Audio, id, url),
Message::PromoteToHost => {
info!("Client is being promoted to host");
client.send_message(
Expand Down
31 changes: 31 additions & 0 deletions src/client/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,37 @@ pub(crate) fn react_on_changed_materials(
}
}

pub(crate) fn react_on_changed_audios(
mut track: ResMut<SyncTrackerRes>,
mut sync_asset: ResMut<SyncAssetTransfer>,
mut client: ResMut<RenetClient>,
assets: Res<Assets<AudioSource>>,
mut events: EventReader<AssetEvent<AudioSource>>,
) {
for event in &mut events.read() {
match event {
AssetEvent::Added { id } | AssetEvent::Modified { id } => {
let Some(asset) = assets.get(*id) else {
continue;
};
let AssetId::Uuid { uuid: id } = id else {
continue;
};
if track.skip_network_handle_change(*id) {
continue;
}
let url = sync_asset.serve_audio(id, asset);
client.send_message(
DefaultChannel::ReliableOrdered,
bincode::serialize(&Message::AudioUpdated { id: *id, url }).unwrap(),
);
}
AssetEvent::Removed { id: _ } => {}
_ => (),
}
}
}

pub(crate) fn react_on_changed_meshes(
mut track: ResMut<SyncTrackerRes>,
mut sync_asset: ResMut<SyncAssetTransfer>,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub trait SyncComponent {
) -> &mut Self;
fn sync_materials(&mut self, enable: bool);
fn sync_meshes(&mut self, enable: bool);
fn sync_audios(&mut self, enable: bool);
}

#[derive(Resource)]
Expand Down
10 changes: 10 additions & 0 deletions src/lib_priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub(crate) struct SyncTrackerRes {

pub(crate) sync_materials: bool,
pub(crate) sync_meshes: bool,
pub(crate) sync_audios: bool,
}

pub(crate) fn sync_material_enabled(tracker: Res<SyncTrackerRes>) -> bool {
Expand All @@ -70,6 +71,10 @@ pub(crate) fn sync_mesh_enabled(tracker: Res<SyncTrackerRes>) -> bool {
tracker.sync_meshes
}

pub(crate) fn sync_audio_enabled(tracker: Res<SyncTrackerRes>) -> bool {
tracker.sync_audios
}

impl SyncTrackerRes {
pub(crate) fn signal_component_changed(&mut self, id: Uuid, data: Box<dyn Reflect>) {
let name = data.get_represented_type_info().unwrap().type_path().into();
Expand Down Expand Up @@ -283,6 +288,11 @@ impl SyncComponent for App {
let mut tracker = self.world_mut().resource_mut::<SyncTrackerRes>();
tracker.sync_meshes = enable;
}

fn sync_audios(&mut self, enable: bool) {
let mut tracker = self.world_mut().resource_mut::<SyncTrackerRes>();
tracker.sync_audios = enable;
}
}

#[derive(Component, Debug, Clone, Reflect, Default)]
Expand Down
14 changes: 10 additions & 4 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,31 @@ pub(crate) fn log_message_received(from: Who, message: &Message) {
id,
name
)
}
},
Message::StandardMaterialUpdated { id, material: _ } => {
debug!(
"{:?} received StandardMaterialUpdated {{ uuid: {} }}",
from, id
)
}
},
Message::MeshUpdated { id, url } => {
debug!(
"{:?} received MeshUpdated {{ uuid: {} }} {{ url: {} }}",
from, id, url
)
}
},
Message::ImageUpdated { id, url } => {
debug!(
"{:?} received ImageUpdated {{ uuid: {} }} {{ url: {} }}",
from, id, url
)
}
},
Message::AudioUpdated { id, url } => {
debug!(
"{:?} received AudioUpdated {{ uuid: {} }} {{ url: {} }}",
from, id, url
)
},
Message::PromoteToHost => debug!("{:?} received PromoteToHost", from),
Message::NewHost {
ip,
Expand Down
105 changes: 103 additions & 2 deletions src/networking/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub(crate) fn init(app: &mut App, addr: IpAddr, port: u16, max_transfer: usize)
Update,
process_image_assets.run_if(resource_exists::<SyncAssetTransfer>),
);
app.add_systems(
Update,
process_audio_assets.run_if(resource_exists::<SyncAssetTransfer>),
);
}

fn process_mesh_assets(
Expand Down Expand Up @@ -74,8 +78,29 @@ fn process_image_assets(
}
}

fn process_audio_assets(
mut audios: ResMut<Assets<AudioSource>>,
sync: ResMut<SyncAssetTransfer>,
mut sync_tracker: ResMut<SyncTrackerRes>,
) {
let Ok(mut map) = sync.audios_to_apply.write() else {
return;
};
for (id, audio) in map.drain() {
sync_tracker.pushed_handles_from_network.insert(id);
let id: AssetId<AudioSource> = AssetId::Uuid { uuid: id };
audios.insert(
id,
AudioSource {
bytes: audio.into(),
},
);
}
}

type MeshCache = Arc<RwLock<HashMap<Uuid, Vec<u8>>>>;
type ImageCache = Arc<RwLock<HashMap<Uuid, Vec<u8>>>>;
type AudioCache = Arc<RwLock<HashMap<Uuid, Vec<u8>>>>;

#[derive(Resource)]
pub(crate) struct SyncAssetTransfer {
Expand All @@ -86,6 +111,8 @@ pub(crate) struct SyncAssetTransfer {
meshes_to_apply: MeshCache,
images: ImageCache,
images_to_apply: ImageCache,
audios: AudioCache,
audios_to_apply: AudioCache,
max_transfer: usize,
}

Expand All @@ -106,6 +133,8 @@ impl SyncAssetTransfer {
let meshes_to_apply = Arc::new(RwLock::new(HashMap::<Uuid, Vec<u8>>::new()));
let images = Arc::new(RwLock::new(HashMap::<Uuid, Vec<u8>>::new()));
let images_to_apply = Arc::new(RwLock::new(HashMap::<Uuid, Vec<u8>>::new()));
let audios = Arc::new(RwLock::new(HashMap::<Uuid, Vec<u8>>::new()));
let audios_to_apply = Arc::new(RwLock::new(HashMap::<Uuid, Vec<u8>>::new()));

let result = Self {
base_url,
Expand All @@ -116,6 +145,8 @@ impl SyncAssetTransfer {
max_transfer,
images,
images_to_apply,
audios,
audios_to_apply,
};

let (server_tx, server_rx) = channel::<Request>();
Expand All @@ -127,9 +158,10 @@ impl SyncAssetTransfer {
});
let meshes = result.meshes.clone();
let images = result.images.clone();
let audios = result.audios.clone();
result
.server_pool
.execute(move || Self::respond(server_rx, meshes, images, max_transfer));
.execute(move || Self::respond(server_rx, meshes, images, audios, max_transfer));
result
}

Expand All @@ -141,6 +173,7 @@ impl SyncAssetTransfer {
}
let meshes_to_apply = self.meshes_to_apply.clone();
let images_to_apply = self.images_to_apply.clone();
let audios_to_apply = self.audios_to_apply.clone();
debug!("Queuing request for {:?}:{} at {}", asset_type, id, url);
let max_transfer = self.max_transfer;
self.download_pool.execute(move || {
Expand Down Expand Up @@ -185,6 +218,20 @@ impl SyncAssetTransfer {
std::thread::sleep(Duration::from_millis(1));
}
}
SyncAssetType::Audio => {
let mut lock = audios_to_apply.write();
loop {
match lock {
Ok(mut map) => {
debug!("Received audio {} with size {}", id, len);
map.insert(id, bytes);
break;
}
Err(_) => lock = audios_to_apply.write(),
}
std::thread::sleep(Duration::from_millis(1));
}
}
}
}
}
Expand Down Expand Up @@ -225,7 +272,30 @@ impl SyncAssetTransfer {
format!("{}/image/{}", self.base_url, &id.to_string())
}

fn respond(rx: Receiver<Request>, meshes: MeshCache, images: ImageCache, max_size: usize) {
pub(crate) fn serve_audio(&mut self, id: &Uuid, audio: &AudioSource) -> String {
let mut lock = self.audios.write();
loop {
match lock {
Ok(mut map) => {
let bin = Vec::<u8>::from(audio.as_ref());
let audio = map.entry(*id).or_insert_with(|| bin);
debug!("Serving audio {} with size {}", id, audio.len());
break;
}
Err(_) => lock = self.meshes.write(),
}
std::thread::sleep(Duration::from_millis(1));
}
format!("{}/audio/{}", self.base_url, &id.to_string())
}

fn respond(
rx: Receiver<Request>,
meshes: MeshCache,
images: ImageCache,
audios: AudioCache,
max_size: usize,
) {
for request in rx.iter() {
let url = request.url();
let (asset_type, id) = if url.contains("/image/") {
Expand All @@ -238,6 +308,11 @@ impl SyncAssetTransfer {
continue;
};
(SyncAssetType::Mesh, id)
} else if url.contains("/audio/") {
let Some(id) = url.strip_prefix("/audio/") else {
continue;
};
(SyncAssetType::Audio, id)
} else {
continue;
};
Expand Down Expand Up @@ -297,6 +372,32 @@ impl SyncAssetTransfer {
)
.unwrap_or(());
}
SyncAssetType::Audio => {
let Ok(audiosmap) = audios.read() else {
request
.respond(Response::from_string("").with_status_code(449))
.unwrap_or(());
continue;
};
let Some(audio) = audiosmap.get(&id) else {
request
.respond(Response::from_string("").with_status_code(404))
.unwrap_or(());
continue;
};
debug!("Responding to {} with size {}", url, audio.len());
request
.respond(
Response::from_data(audio.clone())
.with_header(Header {
field: "Content-Length".parse().unwrap(),
value: AsciiString::from_ascii(audio.len().to_string())
.unwrap(),
})
.with_chunked_threshold(max_size),
)
.unwrap_or(());
}
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub type AssId = Uuid;
pub(crate) enum SyncAssetType {
Mesh,
Image,
Audio,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -44,14 +45,18 @@ pub(crate) enum Message {
id: Uuid,
url: String,
} = 8,
PromoteToHost,
AudioUpdated {
id: Uuid,
url: String,
} = 9,
PromoteToHost = 10,
NewHost {
ip: IpAddr,
port: u16,
web_port: u16,
max_transfer: usize,
},
RequestInitialSync,
} = 11,
RequestInitialSync = 12,
}

#[derive(Event)]
Expand Down
Loading

0 comments on commit 95e870e

Please sign in to comment.