Skip to content

Commit

Permalink
Remove per-task aggregator HPKE keys.
Browse files Browse the repository at this point in the history
DAP-12 removed (specified) per-task HPKE key requests. We don't have a
reason to want per-task HPKE keys, especially if they require custom
functionality, so we remove per-task HPKE key support. With this change,
all Janus deployments must use "global" HPKE keys.

Also, minorly refactor HpkeKeypairCache; mostly, to maintain all state
under a single mutex and to remove an unnecessary "view" type.
  • Loading branch information
branlwyd committed Nov 15, 2024
1 parent 6f5975e commit 1c13810
Show file tree
Hide file tree
Showing 37 changed files with 641 additions and 1,595 deletions.
280 changes: 72 additions & 208 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async fn setup_aggregate_init_test_without_sending_request<
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);

datastore.put_aggregator_task(&helper_task).await.unwrap();
let keypair = datastore.put_global_hpke_key().await.unwrap();
let keypair = datastore.put_hpke_key().await.unwrap();

let handler = AggregatorHandlerBuilder::new(
Arc::clone(&datastore),
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ mod tests {
let ephemeral_datastore = ephemeral_datastore().await;
let meter = noop_meter();
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
let keypair = datastore.put_global_hpke_key().await.unwrap();
let keypair = datastore.put_hpke_key().await.unwrap();

let aggregation_parameter = dummy::AggregationParam(7);
let prepare_init_generator = PrepareInitGenerator::new(
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub(crate) async fn setup_collection_job_test_case(
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);

datastore.put_aggregator_task(&role_task).await.unwrap();
datastore.put_global_hpke_key().await.unwrap();
datastore.put_hpke_key().await.unwrap();

let handler = AggregatorHandlerBuilder::new(
Arc::clone(&datastore),
Expand Down
31 changes: 2 additions & 29 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,27 +427,15 @@ where
}
}

/// Deserialization helper struct to extract a "task_id" parameter from a query string.
#[derive(Deserialize)]
struct HpkeConfigQuery {
/// The optional "task_id" parameter, in base64url-encoded form.
#[serde(default)]
task_id: Option<String>,
}

const HPKE_CONFIG_SIGNATURE_HEADER: &str = "x-hpke-config-signature";

/// API handler for the "/hpke_config" GET endpoint.
async fn hpke_config<C: Clock>(
conn: &mut Conn,
State(aggregator): State<Arc<Aggregator<C>>>,
) -> Result<(), Error> {
let query = serde_urlencoded::from_str::<HpkeConfigQuery>(conn.querystring())
.map_err(|err| Error::BadRequest(format!("couldn't parse query string: {err}")))?;
let (encoded_hpke_config_list, signature) = conn
.cancel_on_disconnect(
aggregator.handle_hpke_config(query.task_id.as_ref().map(AsRef::as_ref)),
)
.cancel_on_disconnect(aggregator.handle_hpke_config())
.await
.ok_or(Error::ClientDisconnected)??;

Expand Down Expand Up @@ -828,7 +816,6 @@ pub mod test_util {
use crate::aggregator::test_util::default_aggregator_config;
use janus_aggregator_core::{
datastore::{
models::HpkeKeyState,
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore,
},
Expand Down Expand Up @@ -883,21 +870,7 @@ pub mod test_util {
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);

let hpke_keypair = HpkeKeypair::test();
datastore
.run_unnamed_tx(|tx| {
let hpke_keypair = hpke_keypair.clone();
Box::pin(async move {
tx.put_global_hpke_keypair(&hpke_keypair).await?;
tx.set_global_hpke_keypair_state(
hpke_keypair.config().id(),
&HpkeKeyState::Active,
)
.await
})
})
.await
.unwrap();
let hpke_keypair = datastore.put_hpke_key().await.unwrap();

let handler = AggregatorHandlerBuilder::new(
datastore.clone(),
Expand Down
116 changes: 9 additions & 107 deletions aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::aggregator::{
aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator},
empty_batch_aggregations,
http_handlers::{
test_util::{decode_response_body, take_problem_details, HttpHandlerTest},
AggregatorHandlerBuilder,
},
http_handlers::test_util::{decode_response_body, take_problem_details, HttpHandlerTest},
test_util::{
assert_task_aggregation_counter, default_aggregator_config, generate_helper_report_share,
assert_task_aggregation_counter, generate_helper_report_share,
generate_helper_report_share_for_plaintext, BATCH_AGGREGATION_SHARD_COUNT,
},
};
Expand All @@ -18,20 +15,19 @@ use janus_aggregator_core::{
ReportAggregation, ReportAggregationState, TaskAggregationCounter,
},
task::{test_util::TaskBuilder, BatchMode, VerifyKey},
test_util::noop_meter,
};
use janus_core::{
auth_tokens::AuthenticationToken,
hpke::HpkeKeypair,
report_id::ReportIdChecksumExt,
test_util::{run_vdaf, runtime::TestRuntime},
test_util::run_vdaf,
time::{Clock, MockClock, TimeExt},
vdaf::VdafInstance,
};
use janus_messages::{
batch_mode::{LeaderSelected, TimeInterval},
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
Duration, Extension, ExtensionType, HpkeCiphertext, InputShareAad, Interval,
Duration, Extension, ExtensionType, HpkeCiphertext, HpkeConfigId, InputShareAad, Interval,
PartialBatchSelector, PrepareInit, PrepareStepResult, ReportError, ReportIdChecksum,
ReportMetadata, ReportShare, Time,
};
Expand Down Expand Up @@ -257,16 +253,11 @@ async fn aggregate_init() {
// prepare_init_3 has an unknown HPKE config ID.
let (prepare_init_3, transcript_3) = prep_init_generator.next(&measurement);

let wrong_hpke_config = loop {
let hpke_config = HpkeKeypair::test().config().clone();
if helper_task.hpke_keys().contains_key(hpke_config.id()) {
continue;
}
if hpke_keypair.config().id() == hpke_config.id() {
continue;
}
break hpke_config;
};
let unused_hpke_config_id =
HpkeConfigId::from(u8::from(*hpke_keypair.config().id()).wrapping_add(1));
let wrong_hpke_config = HpkeKeypair::test_with_id(unused_hpke_config_id)
.config()
.clone();

let report_share_3 = generate_helper_report_share::<dummy::Vdaf>(
*task.id(),
Expand Down Expand Up @@ -724,95 +715,6 @@ async fn aggregate_init_batch_already_collected() {
.await;
}

#[tokio::test]
#[allow(clippy::unit_arg)]
async fn aggregate_init_with_reports_encrypted_by_task_specific_key() {
let HttpHandlerTest {
clock,
ephemeral_datastore: _ephemeral_datastore,
datastore,
hpke_keypair,
..
} = HttpHandlerTest::new().await;

let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Fake { rounds: 1 }).build();

let helper_task = task.helper_view().unwrap();
datastore.put_aggregator_task(&helper_task).await.unwrap();
let vdaf = dummy::Vdaf::new(1);
let aggregation_param = dummy::AggregationParam(0);
let prep_init_generator = PrepareInitGenerator::new(
clock.clone(),
helper_task.clone(),
helper_task.current_hpke_key().config().clone(),
vdaf.clone(),
aggregation_param,
);

// Same ID as the task to test having both keys to choose from. (skip if there is already a
// global keypair with the same ID set up by the fixture)
if helper_task.current_hpke_key().config().id() != hpke_keypair.config().id() {
let global_hpke_keypair_same_id =
HpkeKeypair::test_with_id((*helper_task.current_hpke_key().config().id()).into());
datastore
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
Box::pin(async move {
// Leave these in the PENDING state--they should still be decryptable.
tx.put_global_hpke_keypair(&global_hpke_keypair_same_id)
.await
})
})
.await
.unwrap();
}

// Create new handler _after_ the keys have been inserted so that they come pre-cached.
let handler = AggregatorHandlerBuilder::new(
datastore.clone(),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
default_aggregator_config(),
)
.await
.unwrap()
.build()
.unwrap();

let (prepare_init, transcript) = prep_init_generator.next(&0);

let aggregation_job_id: AggregationJobId = random();
let request = AggregationJobInitializeReq::new(
dummy::AggregationParam(0).get_encoded().unwrap(),
PartialBatchSelector::new_time_interval(),
Vec::from([prepare_init.clone()]),
);

let mut test_conn = put_aggregation_job(&task, &aggregation_job_id, &request, &handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
let aggregate_resp: AggregationJobResp = decode_response_body(&mut test_conn).await;

// Validate response.
assert_eq!(aggregate_resp.prepare_resps().len(), 1);

let prepare_step = aggregate_resp.prepare_resps().first().unwrap();
assert_eq!(
prepare_step.report_id(),
prepare_init.report_share().metadata().id()
);
assert_matches!(prepare_step.result(), PrepareStepResult::Continue { message } => {
assert_eq!(message, &transcript.helper_prepare_transitions[0].message);
});

assert_task_aggregation_counter(
&datastore,
*task.id(),
TaskAggregationCounter::new_with_values(1),
)
.await;
}

#[tokio::test]
async fn aggregate_init_prep_init_failed() {
let HttpHandlerTest {
Expand Down
Loading

0 comments on commit 1c13810

Please sign in to comment.