Skip to content

Commit

Permalink
merge duplicated functions into single impl in llmp/mod.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
BAGUVIX456 committed Dec 23, 2024
1 parent f12fa78 commit eb4b884
Showing 1 changed file with 55 additions and 67 deletions.
122 changes: 55 additions & 67 deletions libafl/src/events/llmp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,73 +346,7 @@ where
}
}

/// Handle arriving events in the client
#[cfg(not(feature = "share_objectives"))]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<usize, Error>
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S::Corpus: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
{
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);

if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};

let event: Event<DI> = postcard::from_bytes(event_bytes)?;
log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
Ok(count)
}
}

#[cfg(feature = "share_objectives")]
impl<DI, IC, ICB, S, SP> LlmpEventConverter<DI, IC, ICB, S, SP>
where
S: UsesInput
+ HasExecutions
+ HasSolutions
+ HasMetadata
+ Stoppable
+ HasCurrentTestcase
+ HasCorpus,
S::Solutions: Corpus<Input = S::Input>,
SP: ShMemProvider,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
// Handle arriving events in the client
#[cfg(feature = "share_objectives")]
fn handle_in_client<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
Expand All @@ -425,7 +359,9 @@ where
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S: HasSolutions + HasCurrentTestcase,
S::Corpus: Corpus<Input = S::Input>,
S::Solutions: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
Expand Down Expand Up @@ -490,6 +426,56 @@ where
}

/// Handle arriving events in the client
#[cfg(not(feature = "share_objectives"))]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<usize, Error>
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S::Corpus: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
{
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);

if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};

let event: Event<DI> = postcard::from_bytes(event_bytes)?;
log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
Ok(count)
}

/// Handle arriving events in the client
#[cfg(feature = "share_objectives")]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
Expand All @@ -500,7 +486,9 @@ where
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S: HasSolutions + HasCurrentTestcase,
S::Corpus: Corpus<Input = S::Input>,
S::Solutions: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
Expand Down

0 comments on commit eb4b884

Please sign in to comment.