Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark reserved peers as explicit for gossipsub to avoid reputation decreasing #1423

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ pub fn init_sub_services(

#[cfg(feature = "p2p")]
let mut network = {
if let Some(config) = config.p2p.clone() {
if let Some(p2p_config) = config.p2p.clone() {
let p2p_db = database.clone();
let genesis = p2p_db.get_genesis()?;
let p2p_config = config.init(genesis)?;
let p2p_config = p2p_config.init(genesis)?;

Some(fuel_core_p2p::service::new_service(
config.chain_conf.transaction_parameters.chain_id,
p2p_config,
p2p_db,
importer_adapter.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub(crate) fn build_transport(
(transport, connection_state)
}

fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
pub fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
multiaddr
.iter()
// Safety: as is the case with `bootstrap_nodes` it is assumed that `reserved_nodes` [`Multiadr`]
Expand Down
7 changes: 6 additions & 1 deletion crates/services/p2p/src/gossipsub/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn initialize_peer_score_thresholds() -> PeerScoreThresholds {

/// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour
pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub {
if p2p_config.metrics {
let mut gossipsub = if p2p_config.metrics {
// Move to Metrics related feature flag
let mut p2p_registry = Registry::default();

Expand Down Expand Up @@ -208,7 +208,12 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub {
initialize_gossipsub(&mut gossipsub, p2p_config);

gossipsub
};
for peer_id in crate::config::peer_ids_set_from(&p2p_config.reserved_nodes) {
gossipsub.add_explicit_peer(&peer_id);
}

gossipsub
}

fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) {
Expand Down
27 changes: 22 additions & 5 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
fuel_tx::{
Transaction,
UniqueIdentifier,
},
fuel_types::{
BlockHeight,
ChainId,
},
services::p2p::{
peer_reputation::{
AppScore,
Expand Down Expand Up @@ -109,6 +115,7 @@ impl Debug for TaskRequest {
/// Orchestrates various p2p-related events between the inner `P2pService`
/// and the top level `NetworkService`.
pub struct Task<D> {
chain_id: ChainId,
p2p_service: FuelP2PService<PostcardCodec>,
db: Arc<D>,
next_block_height: BoxStream<BlockHeight>,
Expand All @@ -119,6 +126,7 @@ pub struct Task<D> {

impl<D> Task<D> {
pub fn new<B: BlockHeightImporter>(
chain_id: ChainId,
config: Config,
db: Arc<D>,
block_importer: Arc<B>,
Expand All @@ -135,6 +143,7 @@ impl<D> Task<D> {
p2p_service.peer_manager().reserved_peers_updates();

Self {
chain_id,
p2p_service,
db,
request_receiver,
Expand Down Expand Up @@ -192,10 +201,11 @@ where
should_continue = true;
match next_service_request {
Some(TaskRequest::BroadcastTransaction(transaction)) => {
let tx_id = transaction.id(&self.chain_id);
let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
let result = self.p2p_service.publish_message(broadcast);
if let Err(e) = result {
tracing::error!("Got an error during transaction broadcasting {}", e);
tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
}
}
Some(TaskRequest::BroadcastBlock(block)) => {
Expand Down Expand Up @@ -478,12 +488,18 @@ impl SharedState {
}
}

pub fn new_service<D, B>(p2p_config: Config, db: D, block_importer: B) -> Service<D>
pub fn new_service<D, B>(
chain_id: ChainId,
p2p_config: Config,
db: D,
block_importer: B,
) -> Service<D>
where
D: P2pDb + 'static,
B: BlockHeightImporter,
{
Service::new(Task::new(
chain_id,
p2p_config,
Arc::new(db),
Arc::new(block_importer),
Expand Down Expand Up @@ -588,7 +604,8 @@ pub mod tests {
#[tokio::test]
async fn start_and_stop_awaits_works() {
let p2p_config = Config::default_initialized("start_stop_works");
let service = new_service(p2p_config, FakeDb, FakeBlockImporter);
let service =
new_service(ChainId::default(), p2p_config, FakeDb, FakeBlockImporter);

// Node with p2p service started
assert!(service.start_and_await().await.unwrap().started());
Expand Down
Loading