Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add static data support to bundler #179

Merged
merged 12 commits into from
Oct 10, 2024
10 changes: 6 additions & 4 deletions bundler/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ clio = { version = "0.3.5", features = ["clap-parse"] }
derive_more = { version = "1.0.0", features = ["deref", "deref_mut"] }
dotenvy = { version = "0.15.7" }
flate2 = { version = "1.0.34" }
glob = "0.3.1"
headers = { version = "0.4.0" }
humantime = { version = "2.1.0" }
opentelemetry = { version = "0.23.0" }
Expand All @@ -27,6 +28,7 @@ sqlx = { version = "0.8.2", features = [
"mysql",
] }
tar = { version = "0.4.42" }
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
tower = { version = "0.5.1" }
tower-http = { version = "0.6.1", features = ["trace"] }
Expand Down
65 changes: 61 additions & 4 deletions bundler/src/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use flate2::{write::GzEncoder, Compression};
use glob::Pattern;
use schemars::{schema::RootSchema, schema_for, JsonSchema};
use serde::Serialize;
use sqlx::MySqlPool;
use std::{
collections::{hash_map::DefaultHasher, BTreeMap},
collections::{hash_map::DefaultHasher, BTreeMap, HashMap},
ffi::OsStr,
fmt::Debug,
hash::{Hash, Hasher},
};
use tar::Header;
use tokio::try_join;
use tracing::instrument;
use tracing::{instrument, trace};

use crate::permissionables::{
beamlines::Beamlines, proposals::Proposals, sessions::Sessions, subjects::Subjects,
Expand Down Expand Up @@ -74,6 +76,8 @@ where
proposals: Proposals,
/// A mapping of beamlines to their various attributes
beamlines: Beamlines,
/// A map (name to data) of static files to include in the bundle
static_data: HashMap<String, Vec<u8>>,
}

/// The prefix applied to data files in the bundle. Open Policy Agent does not support loading bundles with overlapping prefixes
Expand All @@ -90,13 +94,17 @@ where
sessions: Sessions,
proposals: Proposals,
beamlines: Beamlines,
static_data: HashMap<String, Vec<u8>>,
tpoliaw marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
let mut hasher = DefaultHasher::new();
metadata.hash(&mut hasher);
subjects.hash(&mut hasher);
sessions.hash(&mut hasher);
proposals.hash(&mut hasher);
beamlines.hash(&mut hasher);
for entry in &static_data {
entry.hash(&mut hasher);
}
let hash = hasher.finish();

Self {
Expand All @@ -110,20 +118,31 @@ where
sessions,
proposals,
beamlines,
static_data,
}
}

/// Fetches [`Subjects`] from ISPyB and constructs a [`Bundle`]
#[instrument(name = "fetch_bundle")]
pub async fn fetch(metadata: Metadata, ispyb_pool: &MySqlPool) -> Result<Self, sqlx::Error> {
pub async fn fetch(
metadata: Metadata,
static_data_directory: &[Pattern],
ispyb_pool: &MySqlPool,
) -> Result<Self, sqlx::Error> {
let (subjects, sessions, proposals, beamlines) = try_join!(
Subjects::fetch(ispyb_pool),
Sessions::fetch(ispyb_pool),
Proposals::fetch(ispyb_pool),
Beamlines::fetch(ispyb_pool),
)?;
let static_data = static_data(static_data_directory).await?;
Ok(Self::new(
metadata, subjects, sessions, proposals, beamlines,
metadata,
subjects,
sessions,
proposals,
beamlines,
static_data,
))
}

Expand Down Expand Up @@ -172,6 +191,15 @@ where
beamlines.as_slice(),
)?;

for (name, data) in &self.static_data {
let mut header = Header::from_bytes(data);
bundle_builder.append_data(
&mut header,
format!("{BUNDLE_PREFIX}/{name}/data.json"),
data.as_slice(),
)?;
}

Ok(bundle_builder.into_inner()?.finish()?)
}

Expand All @@ -185,3 +213,32 @@ where
])
}
}

/// Read static data from files that should be included in the compiled bundle
async fn static_data(patterns: &[Pattern]) -> Result<HashMap<String, Vec<u8>>, std::io::Error> {
let mut data = HashMap::new();
for pattern in patterns {
for file in glob::glob(pattern.as_str()).expect("Pattern was validated by CLI") {
let file = file.map_err(|e| e.into_error())?;
let name = file.file_stem();
let Some(name) = name.and_then(OsStr::to_str) else {
// Save having to think about non-utf8 in OPA rules
trace!("Skipping non-utf8 static file: {name:?}");
continue;
};
data.insert(name.to_string(), tokio::fs::read(&file).await?);
}
}
Ok(data)
}

/// Combination of possible errors when fetching data to create bundle
#[derive(Debug, thiserror::Error)]
pub enum BundleDataError {
/// Error fetching data from database
#[error("Error reading dynamic data: {0}")]
Sql(#[from] sqlx::Error),
/// Error fetching data from file
#[error("Error reading static data: {0}")]
Static(#[from] std::io::Error),
}
31 changes: 24 additions & 7 deletions bundler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use axum::{
use axum_extra::TypedHeader;
use clap::Parser;
use clio::ClioPath;
use glob::Pattern;
use headers::{ETag, HeaderMapExt, IfNoneMatch};
use opentelemetry_otlp::WithExportConfig;
use require_bearer::RequireBearerLayer;
Expand Down Expand Up @@ -79,9 +80,11 @@ where
/// A thread safe, mutable, wrapper around the [`BundleFile`]
type CurrentBundle = Arc<RwLock<BundleFile<NoMetadata>>>;

/// Bundler acts as a Open Policy Agent bundle server, providing permissionable data from the ISPyB database
/// Bundler acts as an Open Policy Agent bundle server, providing permissionable data from the
/// ISPyB database and static data from local files
#[derive(Debug, Parser)]
#[command(author, version, about, long_about= None)]
#[allow(clippy::large_enum_variant)]
enum Cli {
/// Run the service providing bundle data
Serve(ServeArgs),
Expand Down Expand Up @@ -110,6 +113,9 @@ struct ServeArgs {
/// The URL of the OpenTelemetry collector to send traces to
#[arg(long, env = "BUNDLER_OTEL_COLLECTOR_URL")]
otel_collector_url: Option<Url>,
/// Paths to any static data files that should be included in the bundle - can be globs
#[arg(long, env = "BUNDLER_STATIC_DATA")]
static_data: Vec<Pattern>,
}

/// Arguments to output the schema with
Expand All @@ -131,12 +137,14 @@ async fn main() {
}
}

/// Runs the service, pulling fresh bundles from ISPyB and serving them via the API
/// Runs the service, pulling fresh bundles from ISPyB/local files and serving them via the API
async fn serve(args: ServeArgs) {
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();

let ispyb_pool = connect_ispyb(args.database_url).await.unwrap();
let current_bundle = fetch_initial_bundle(&ispyb_pool).await.unwrap();
let current_bundle = fetch_initial_bundle(&args.static_data, &ispyb_pool)
.await
.unwrap();
let app = Router::new()
.route("/bundle.tar.gz", get(bundle_endpoint))
.with_state(current_bundle.clone())
Expand All @@ -154,6 +162,7 @@ async fn serve(args: ServeArgs) {
let mut tasks = tokio::task::JoinSet::new();
tasks.spawn(update_bundle(
current_bundle,
args.static_data,
ispyb_pool,
args.polling_interval.into(),
));
Expand Down Expand Up @@ -231,14 +240,18 @@ async fn connect_ispyb(database_url: Url) -> Result<MySqlPool, sqlx::Error> {
connection
}

/// Fetches the intial [`Bundle`] from ISPyB and produces the correspoinding [`BundleFile`]
/// Fetches the initial [`Bundle`] from ISPyB and any static files, and produces the corresponding
/// [`BundleFile`]
#[instrument]
async fn fetch_initial_bundle(
static_data: &[Pattern],
ispyb_pool: &MySqlPool,
) -> Result<Arc<RwLock<BundleFile<NoMetadata>>>, anyhow::Error> {
tracing::info!("Fetching initial bundle");
let bundle = Arc::new(RwLock::new(BundleFile::try_from(
Bundle::fetch(NoMetadata, ispyb_pool).await.unwrap(),
Bundle::fetch(NoMetadata, static_data, ispyb_pool)
.await
.unwrap(),
)?));
tracing::info!(
"Using bundle with revison: {}",
Expand All @@ -255,9 +268,11 @@ async fn serve_endpoints(port: u16, app: Router) {
axum::serve(listener, app).await.unwrap()
}

/// Periodically update the bundle with new data from ISPyB
/// Periodically update the bundle with new data from ISPyB and any static files matching the given
/// glob patterns.
async fn update_bundle(
current_bundle: impl AsRef<RwLock<BundleFile<NoMetadata>>>,
static_data: Vec<Pattern>,
ispyb_pool: MySqlPool,
polling_interval: Duration,
) {
Expand All @@ -267,7 +282,9 @@ async fn update_bundle(
sleep_until(next_fetch).await;
next_fetch = next_fetch.add(polling_interval);
tracing::info!("Updating bundle");
let bundle = Bundle::fetch(NoMetadata, &ispyb_pool).await.unwrap();
let bundle = Bundle::fetch(NoMetadata, &static_data, &ispyb_pool)
.await
.unwrap();
let bundle_file = BundleFile::try_from(bundle).unwrap();
let old_revision = current_bundle
.as_ref()
Expand Down
36 changes: 36 additions & 0 deletions static/admin.json
tpoliaw marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"em_admin": [],
"epsic_admin": [],
"fault_admin": [],
"gen_admin": [],
"pow_admin": [],
"mx_admin": ["i02-1", "i02-2", "i03", "i04", "i04-1", "i23", "i24"],
"saxs_admin": ["b21", "i22", "p38"],
"sm_admin": [],
"tomo_admin": [],
"xpdf_admin": ["i15", "i15-1"],

"b07_admin": ["b07"],
"b16_admin": ["b16"],
"b18_admin": ["b18"],
"b22_admin": ["b22"],
"b23_admin": ["b23"],
"b24_admin": ["b24"],
"i05_admin": ["i05"],
"i06_admin": ["i06"],
"i07_admin": ["i07"],
"i08_admin": ["i08"],
"i09_admin": ["i09"],
"i10_admin": ["i10"],
"i11_admin": ["i11"],
"i12_admin": ["i12"],
"i13_admin": ["i13"],
"i14_admin": ["i14"],
"i16_admin": ["i16"],
"i18_admin": ["i18"],
"i20_admin": ["i20"],
"i21_admin": ["i21"],
"k11_admin": ["k11"],
"p45_admin": ["p45"],
"p99_admin": ["p99"]
}
Loading