Skip to content

Commit

Permalink
kill LogMsg storage, impl save store to disk
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 6, 2023
1 parent 93c1600 commit 37be641
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 356 deletions.
77 changes: 30 additions & 47 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use nohash_hasher::IntMap;

use re_arrow_store::{DataStoreConfig, TimeInt};
Expand Down Expand Up @@ -159,33 +161,31 @@ impl EntityDb {
/// A in-memory database built from a stream of [`LogMsg`]es.
#[derive(Default)]
pub struct LogDb {
/// Messages in the order they arrived
chronological_row_ids: Vec<RowId>,
log_messages: ahash::HashMap<RowId, LogMsg>,

/// Data that was logged with [`TimePoint::timeless`].
/// We need to re-insert those in any new timelines
/// that are created after they were logged.
timeless_row_ids: Vec<RowId>,
/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

/// Set by whomever created this [`LogDb`].
pub data_source: Option<re_smart_channel::Source>,

/// Comes in a special message, [`LogMsg::BeginRecordingMsg`].
recording_info: Option<RecordingInfo>,
recording_msg: Option<BeginRecordingMsg>,

/// Where we store the entities.
pub entity_db: EntityDb,
}

impl LogDb {
pub fn recording_msg(&self) -> Option<&BeginRecordingMsg> {
self.recording_msg.as_ref()
}

pub fn recording_info(&self) -> Option<&RecordingInfo> {
self.recording_info.as_ref()
self.recording_msg().map(|msg| &msg.info)
}

pub fn recording_id(&self) -> RecordingId {
if let Some(info) = &self.recording_info {
info.recording_id
if let Some(msg) = &self.recording_msg {
msg.info.recording_id
} else {
RecordingId::ZERO
}
Expand All @@ -203,11 +203,16 @@ impl LogDb {
self.entity_db.tree.num_timeless_messages()
}

pub fn len(&self) -> usize {
self.entity_db.data_store.total_timeless_rows() as usize
+ self.entity_db.data_store.total_temporal_rows() as usize
}

pub fn is_empty(&self) -> bool {
self.log_messages.is_empty()
self.len() == 0
}

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
Expand All @@ -218,38 +223,27 @@ impl LogDb {
time_point,
path_op,
} = msg;
self.entity_op_msgs.insert(*row_id, msg.clone());
self.entity_db.add_path_op(*row_id, time_point, path_op);
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
self.chronological_row_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

fn add_begin_recording_msg(&mut self, msg: &BeginRecordingMsg) {
self.recording_info = Some(msg.info.clone());
self.recording_msg = Some(msg.clone());
}

pub fn len(&self) -> usize {
self.log_messages.len()
}

/// In the order they arrived
pub fn chronological_log_messages(&self) -> impl Iterator<Item = &LogMsg> {
self.chronological_row_ids
.iter()
.filter_map(|id| self.get_log_msg(id))
/// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `LogDb`.
pub fn iter_entity_op_msgs(&self) -> impl Iterator<Item = &EntityPathOpMsg> {
self.entity_op_msgs.values()
}

pub fn get_log_msg(&self, row_id: &RowId) -> Option<&LogMsg> {
self.log_messages.get(row_id)
pub fn get_entity_op_msg(&self, row_id: &RowId) -> Option<&EntityPathOpMsg> {
self.entity_op_msgs.get(row_id)
}

/// Free up some RAM by forgetting the older parts of all timelines.
Expand All @@ -263,26 +257,15 @@ impl LogDb {
let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline();

let Self {
chronological_row_ids,
log_messages,
timeless_row_ids,
entity_op_msgs,
data_source: _,
recording_info: _,
recording_msg: _,
entity_db,
} = self;

{
crate::profile_scope!("chronological_row_ids");
chronological_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
}

{
crate::profile_scope!("log_messages");
log_messages.retain(|row_id, _| !drop_row_ids.contains(row_id));
}
{
crate::profile_scope!("timeless_row_ids");
timeless_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
crate::profile_scope!("entity_op_msgs");
entity_op_msgs.retain(|row_id, _| !drop_row_ids.contains(row_id));
}

entity_db.purge(&cutoff_times, &drop_row_ids);
Expand Down
11 changes: 11 additions & 0 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ mod encoder {
}
encoder.finish()
}

pub fn encode_owned(
messages: impl Iterator<Item = LogMsg>,
write: impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(&message)?;
}
encoder.finish()
}
}

#[cfg(feature = "save")]
Expand Down
14 changes: 0 additions & 14 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,6 @@ pub enum LogMsg {
Goodbye(RowId),
}

impl LogMsg {
pub fn id(&self) -> RowId {
match self {
Self::BeginRecordingMsg(msg) => msg.row_id,
Self::EntityPathOpMsg(msg) => msg.row_id,
Self::Goodbye(row_id) => *row_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(msg) => msg.table_id.into_row_id(),
}
}
}

impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg);
impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg);
impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg);
Expand Down
90 changes: 33 additions & 57 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ impl App {
log_db.data_source = Some(self.rx.source().clone());
}

if let Err(err) = log_db.add(msg) {
if let Err(err) = log_db.add(&msg) {
re_log::error!("Failed to add incoming msg: {err}");
};

Expand Down Expand Up @@ -906,8 +906,6 @@ fn preview_files_being_dropped(egui_ctx: &egui::Context) {
enum PanelSelection {
#[default]
Viewport,

EventLog,
}

#[derive(Default, serde::Deserialize, serde::Serialize)]
Expand All @@ -930,8 +928,6 @@ struct AppState {
/// Which view panel is currently being shown
panel_selection: PanelSelection,

event_log_view: crate::event_log_view::EventLogView,

selection_panel: crate::selection_panel::SelectionPanel,
time_panel: crate::time_panel::TimePanel,

Expand Down Expand Up @@ -959,7 +955,6 @@ impl AppState {
selected_rec_id,
recording_configs,
panel_selection,
event_log_view,
blueprints,
selection_panel,
time_panel,
Expand Down Expand Up @@ -1004,7 +999,6 @@ impl AppState {
.entry(selected_app_id)
.or_insert_with(|| Blueprint::new(ui.ctx()))
.blueprint_panel_and_viewport(&mut ctx, ui),
PanelSelection::EventLog => event_log_view.ui(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
Expand Down Expand Up @@ -1536,16 +1530,6 @@ fn main_view_selector_ui(ui: &mut egui::Ui, app: &mut App) {
{
ui.close_menu();
}
if ui
.selectable_value(
&mut app.state.panel_selection,
PanelSelection::EventLog,
"Event Log",
)
.clicked()
{
ui.close_menu();
}
});
}
}
Expand Down Expand Up @@ -1751,44 +1735,36 @@ fn save_database_to_file(
path: std::path::PathBuf,
time_selection: Option<(re_data_store::Timeline, TimeRangeF)>,
) -> impl FnOnce() -> anyhow::Result<std::path::PathBuf> {
use re_log_types::{EntityPathOpMsg, TimeInt};

let msgs = match time_selection {
// Fast path: no query, just dump everything.
None => log_db
.chronological_log_messages()
.cloned()
.collect::<Vec<_>>(),

// Query path: time to filter!
Some((timeline, range)) => {
use std::ops::RangeInclusive;
let range: RangeInclusive<TimeInt> = range.min.floor()..=range.max.ceil();
log_db
.chronological_log_messages()
.filter(|msg| {
match msg {
LogMsg::BeginRecordingMsg(_) | LogMsg::Goodbye(_) => {
true // timeless
}
LogMsg::EntityPathOpMsg(EntityPathOpMsg { time_point, .. }) => {
time_point.is_timeless() || {
let is_within_range = time_point
.get(&timeline)
.map_or(false, |t| range.contains(t));
is_within_range
}
}
LogMsg::ArrowMsg(_) => {
// TODO(john)
false
}
}
})
.cloned()
.collect::<Vec<_>>()
}
};
use re_arrow_store::TimeRange;

crate::profile_scope!("dump_messages");

let begin_rec_msg = log_db
.recording_msg()
.map(|msg| LogMsg::BeginRecordingMsg(msg.clone()));

let ent_op_msgs = log_db
.iter_entity_op_msgs()
.map(|msg| LogMsg::EntityPathOpMsg(msg.clone()))
.collect_vec();

let time_filter = time_selection.map(|(timeline, range)| {
(
timeline,
TimeRange::new(range.min.floor(), range.max.ceil()),
)
});
let data_msgs = log_db
.entity_db
.data_store
.to_data_tables(time_filter)
.map(|table| LogMsg::ArrowMsg(table.to_arrow_msg().unwrap()))
.collect_vec();

let msgs = std::iter::once(begin_rec_msg)
.flatten()
.chain(ent_op_msgs)
.chain(data_msgs);

move || {
crate::profile_scope!("save_to_file");
Expand All @@ -1797,7 +1773,7 @@ fn save_database_to_file(
let file = std::fs::File::create(path.as_path())
.with_context(|| format!("Failed to create file at {path:?}"))?;

re_log_types::encoding::encode(msgs.iter(), file)
re_log_types::encoding::encode_owned(msgs, file)
.map(|_| path)
.context("Message encode")
}
Expand All @@ -1811,7 +1787,7 @@ fn load_rrd_to_log_db(mut read: impl std::io::Read) -> anyhow::Result<LogDb> {

let mut log_db = LogDb::default();
for msg in decoder {
log_db.add(msg?)?;
log_db.add(&msg?)?;
}
Ok(log_db)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod viewer_analytics;

pub(crate) use misc::{mesh_loader, Item, TimeControl, TimeView, ViewerContext};
use re_log_types::PythonVersion;
pub(crate) use ui::{event_log_view, memory_panel, selection_panel, time_panel, UiVerbosity};
pub(crate) use ui::{memory_panel, selection_panel, time_panel, UiVerbosity};

pub use app::{App, StartupOptions};
pub use remote_viewer_app::RemoteViewerApp;
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/misc/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Item {
Item::InstancePath(space_view_id, _) => space_view_id
.map(|space_view_id| blueprint.viewport.space_view(&space_view_id).is_some())
.unwrap_or(true),
Item::RowId(row_id) => log_db.get_log_msg(row_id).is_some(),
Item::RowId(row_id) => log_db.get_entity_op_msg(row_id).is_some(),
Item::SpaceView(space_view_id) => {
blueprint.viewport.space_view(space_view_id).is_some()
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/ui/data_ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum UiVerbosity {
Small,

/// At most this height
#[allow(dead_code)]
MaxHeight(f32),

/// Display a reduced set, used for hovering.
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/ui/data_ui/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl DataUi for RowId {
ctx.row_id_button(ui, *self);
}
UiVerbosity::All | UiVerbosity::Reduced => {
if let Some(msg) = ctx.log_db.get_log_msg(self) {
if let Some(msg) = ctx.log_db.get_entity_op_msg(self) {
msg.data_ui(ctx, ui, verbosity, query);
} else {
ctx.row_id_button(ui, *self);
Expand Down
Loading

0 comments on commit 37be641

Please sign in to comment.