Skip to content

Commit

Permalink
Feature/Dont download existing block (#7665)
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap authored Oct 31, 2024
1 parent 173a663 commit 0fdb10a
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 35 deletions.
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.Api/IApiWithStores.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Autofac;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Find;
using Nethermind.Blockchain.Receipts;
using Nethermind.Consensus;
using Nethermind.Core;
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ public AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options

public Hash256? FindBlockHash(long blockNumber) => GetBlockHashOnMainOrBestDifficultyHash(blockNumber);

public bool HasBlock(long blockNumber, Hash256 blockHash) => _blockStore.HasBlock(blockNumber, blockHash);

public BlockHeader? FindHeader(Hash256? blockHash, BlockTreeLookupOptions options, long? blockNumber = null)
{
if (blockHash is null || blockHash == Keccak.Zero)
Expand Down
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ public void UpdateBeaconMainChain(BlockInfo[]? blockInfos, long clearBeaconMainC
public Block? FindBlock(long blockNumber, BlockTreeLookupOptions options) =>
_overlayTree.FindBlock(blockNumber, options) ?? _baseTree.FindBlock(blockNumber, options);

public bool HasBlock(long blockNumber, Hash256 blockHash) =>
_overlayTree.HasBlock(blockNumber, blockHash) || _baseTree.HasBlock(blockNumber, blockHash);

public BlockHeader? FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) =>
_overlayTree.FindHeader(blockHash, options, blockNumber) ?? _baseTree.FindHeader(blockHash, options, blockNumber);

Expand Down
7 changes: 7 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public void SetMetadata(byte[] key, byte[] value)
return blockDb.Get(key);
}

public bool HasBlock(long blockNumber, Hash256 blockHash)
{
Span<byte> dbKey = stackalloc byte[40];
KeyValueStoreExtensions.GetBlockNumPrefixedKey(blockNumber, blockHash, dbKey);
return blockDb.KeyExists(dbKey);
}

public void Insert(Block block, WriteFlags writeFlags = WriteFlags.None)
{
if (block.Hash is null)
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public interface IBlockStore
// These two are used by blocktree. Try not to use them...
void SetMetadata(byte[] key, byte[] value);
byte[]? GetMetadata(byte[] key);
bool HasBlock(long blockNumber, Hash256 blockHash);
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Find/IBlockFinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface IBlockFinder

Block? FindBlock(long blockNumber, BlockTreeLookupOptions options);

bool HasBlock(long blockNumber, Hash256 blockHash);

/// Find a header. blockNumber is optional, but specifying it can improve performance.
BlockHeader? FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null);

Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public void UpdateHeadBlock(Hash256 blockHash)

public Block FindBlock(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => _wrapped.FindBlock(blockHash, options, blockNumber);

public bool HasBlock(long blockNumber, Hash256 blockHash) => _wrapped.HasBlock(blockNumber, blockHash);

public BlockHeader FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => _wrapped.FindHeader(blockHash, options, blockNumber: blockNumber);

public BlockHeader FindHeader(long blockNumber, BlockTreeLookupOptions options) => _wrapped.FindHeader(blockNumber, options);
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db.Rocks/Config/DbConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class DbConfig : IDbConfig
public ulong? ReceiptsDbCompactionReadAhead { get; set; }
public ulong ReceiptsDbTargetFileSizeBase { get; set; } = (ulong)64.MiB();
public double ReceiptsDbCompressibilityHint { get; set; } = 0.35;
public bool ReceiptsDbOptimizeFiltersForHits { get; set; } = false;
public string? ReceiptsDbAdditionalRocksDbOptions { get; set; } = "compaction_pri=kOldestLargestSeqFirst";

public ulong BlocksDbWriteBufferSize { get; set; } = (ulong)64.MiB();
Expand All @@ -79,6 +80,7 @@ public class DbConfig : IDbConfig
public bool? BlocksDbUseDirectReads { get; set; }
public bool? BlocksDbUseDirectIoForFlushAndCompactions { get; set; }
public ulong? BlocksDbCompactionReadAhead { get; set; }
public bool BlocksDbOptimizeFiltersForHits { get; set; } = false;
public string? BlocksDbAdditionalRocksDbOptions { get; set; } = "compaction_pri=kOldestLargestSeqFirst";

public ulong HeadersDbWriteBufferSize { get; set; } = (ulong)8.MiB();
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db.Rocks/Config/IDbConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public interface IDbConfig : IConfig
ulong? ReceiptsDbCompactionReadAhead { get; set; }
ulong ReceiptsDbTargetFileSizeBase { get; set; }
double ReceiptsDbCompressibilityHint { get; set; }
bool ReceiptsDbOptimizeFiltersForHits { get; set; }
string? ReceiptsDbAdditionalRocksDbOptions { get; set; }

ulong BlocksDbWriteBufferSize { get; set; }
Expand All @@ -80,6 +81,7 @@ public interface IDbConfig : IConfig
bool? BlocksDbUseDirectReads { get; set; }
bool? BlocksDbUseDirectIoForFlushAndCompactions { get; set; }
ulong? BlocksDbCompactionReadAhead { get; set; }
bool BlocksDbOptimizeFiltersForHits { get; set; }
string? BlocksDbAdditionalRocksDbOptions { get; set; }

ulong HeadersDbWriteBufferSize { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public void SetMetadata(byte[] key, byte[] value)
{
return _metadataDict.TryGetValue(key, out var value) ? value : readonlyBaseBlockStore.GetMetadata(key);
}

public bool HasBlock(long blockNumber, Hash256 blockHash)
{
return _blockNumDict.ContainsKey(blockNumber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Extensions;
Expand Down Expand Up @@ -116,6 +117,29 @@ async Task HandleAndPrepareNextRequest()
req.Dispose();
}

[Test]
public async Task ShouldNotReDownloadExistingBlock()
{
_feed.InitializeFeed();

_syncingToBlockTree.Insert(_syncingFromBlockTree.FindBlock(_pivotBlock.Number - 2)!);
_syncingToBlockTree.Insert(_syncingFromBlockTree.FindBlock(_pivotBlock.Number - 4)!);

using BodiesSyncBatch req = (await _feed.PrepareRequest())!;
req.Infos
.Where((bi) => bi is not null)
.Select((bi) => bi!.BlockNumber)
.Take(4)
.Should()
.BeEquivalentTo([
_pivotBlock.Number,
_pivotBlock.Number - 1,
// Skipped
_pivotBlock.Number - 3,
// Skipped
_pivotBlock.Number - 5]);
}

[Test]
public async Task ShouldRecoverOnInsertFailure()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
Expand All @@ -10,6 +11,7 @@
using Nethermind.Core.Test.Builders;
using Nethermind.Synchronization.FastBlocks;
using NSubstitute;
using NSubstitute.Core;
using NUnit.Framework;

namespace Nethermind.Synchronization.Test.FastBlocks;
Expand Down Expand Up @@ -50,12 +52,44 @@ public void Will_not_go_below_ancient_barrier()
blockTree.FindCanonicalBlockInfo(Arg.Any<long>()).Returns(new BlockInfo(TestItem.KeccakA, 0));
SyncStatusList syncStatusList = new SyncStatusList(blockTree, 1000, null, 900);

BlockInfo?[] infos = new BlockInfo?[500];
syncStatusList.GetInfosForBatch(infos);
BlockInfo?[] infos;
syncStatusList.TryGetInfosForBatch(500, (_) => false, out infos);

infos.Count((it) => it is not null).Should().Be(101);
}

[Test]
public void Will_skip_existing_keys()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.FindCanonicalBlockInfo(Arg.Any<long>())
.Returns((Func<CallInfo, BlockInfo>)((ci) =>
{
long blockNumber = (long)ci[0];
return new BlockInfo(TestItem.KeccakA, 0)
{
BlockNumber = blockNumber
};
}));

SyncStatusList syncStatusList = new SyncStatusList(blockTree, 100000, null, 1000);

HashSet<long> needToFetchBlocks = [99999, 99995, 99950, 99000, 99001, 99003, 85000];

List<long> TryGetInfos()
{
BlockInfo?[] infos;
syncStatusList.TryGetInfosForBatch(50, (bi) => !needToFetchBlocks.Contains(bi.BlockNumber), out infos);
return infos.Where(bi => bi != null).Select((bi) => bi!.BlockNumber).ToList();
}

TryGetInfos().Should().BeEquivalentTo([99999, 99995]); // first two as it will try the first 50 only
TryGetInfos().Should().BeEquivalentTo([99950]); // Then the next 50
TryGetInfos().Should().BeEquivalentTo([99000, 99001, 99003]); // If the next 50 failed, it will try looking far back.
TryGetInfos().Should().BeEmpty(); // If it look far back enough and still does not find anything it will just return so that progress can update.
TryGetInfos().Should().BeEquivalentTo([85000]); // But as the existing blocks was already marked as inserted, it should be able to make progress on later call.
}

[Test]
public void Can_read_back_all_parallel_set_values()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
Expand All @@ -25,64 +26,105 @@ namespace Nethermind.Synchronization.Test;

public class ReceiptSyncFeedTests
{
private IBlockTree _syncingFromBlockTree = null!;
private IBlockTree _syncingToBlockTree = null!;
private ReceiptsSyncFeed _feed = null!;
private ISyncConfig _syncConfig = null!;
private Block _pivotBlock = null!;
private InMemoryReceiptStorage _syncingFromReceiptStore;
private IReceiptStorage _receiptStorage;

[Test]
public async Task ShouldRecoverOnInsertFailure()
[SetUp]
public void Setup()
{
InMemoryReceiptStorage syncingFromReceiptStore = new InMemoryReceiptStorage();
BlockTree syncingFromBlockTree = Build.A.BlockTree()
.WithTransactions(syncingFromReceiptStore)
_syncingFromReceiptStore = new InMemoryReceiptStorage();
_syncingFromBlockTree = Build.A.BlockTree()
.WithTransactions(_syncingFromReceiptStore)
.OfChainLength(100)
.TestObject;

BlockTree syncingTooBlockTree = Build.A.BlockTree()
_receiptStorage = Substitute.For<IReceiptStorage>();
_syncingToBlockTree = Build.A.BlockTree()
.TestObject;

for (int i = 1; i < 100; i++)
{
Block block = syncingFromBlockTree.FindBlock(i, BlockTreeLookupOptions.None)!;
syncingTooBlockTree.Insert(block.Header);
syncingTooBlockTree.Insert(block);
Block block = _syncingFromBlockTree.FindBlock(i, BlockTreeLookupOptions.None)!;
_syncingToBlockTree.Insert(block.Header);
_syncingToBlockTree.Insert(block);
}

Block pivot = syncingFromBlockTree.FindBlock(99, BlockTreeLookupOptions.None)!;
_pivotBlock = _syncingFromBlockTree.FindBlock(99, BlockTreeLookupOptions.None)!;

SyncConfig syncConfig = new()
_syncConfig = new SyncConfig()
{
FastSync = true,
PivotHash = pivot.Hash!.ToString(),
PivotNumber = pivot.Number.ToString(),
PivotHash = _pivotBlock.Hash!.ToString(),
PivotNumber = _pivotBlock.Number.ToString(),
AncientBodiesBarrier = 0,
DownloadBodiesInFastSync = true,
};

IReceiptStorage receiptStorage = Substitute.For<IReceiptStorage>();
ReceiptsSyncFeed syncFeed = new ReceiptsSyncFeed(
_feed = new ReceiptsSyncFeed(
MainnetSpecProvider.Instance,
syncingTooBlockTree,
receiptStorage,
_syncingToBlockTree,
_receiptStorage,
Substitute.For<ISyncPeerPool>(),
syncConfig,
_syncConfig,
new NullSyncReport(),
new MemDb(),
LimboLogs.Instance
);
syncFeed.InitializeFeed();
}

[TearDown]
public void TearDown()
{
_feed.Dispose();
}

[Test]
public async Task ShouldRecoverOnInsertFailure()
{
_feed.InitializeFeed();

using ReceiptsSyncBatch req = (await syncFeed.PrepareRequest())!;
req.Response = req.Infos.Take(8).Select(info => syncingFromReceiptStore.Get(info!.BlockHash)).ToPooledList(8)!;
using ReceiptsSyncBatch req = (await _feed.PrepareRequest())!;
req.Response = req.Infos.Take(8).Select(info => _syncingFromReceiptStore.Get(info!.BlockHash)).ToPooledList(8)!;

receiptStorage
_receiptStorage
.When((it) => it.Insert(Arg.Any<Block>(), Arg.Any<TxReceipt[]?>(), Arg.Any<bool>()))
.Do((callInfo) =>
{
Block block = (Block)callInfo[0];
if (block.Number == 95) throw new Exception("test exception");
});

Func<SyncResponseHandlingResult> act = () => syncFeed.HandleResponse(req);
Func<SyncResponseHandlingResult> act = () => _feed.HandleResponse(req);
act.Should().Throw<Exception>();
using ReceiptsSyncBatch req2 = (await syncFeed.PrepareRequest())!;
using ReceiptsSyncBatch req2 = (await _feed.PrepareRequest())!;
req2.Infos[0]!.BlockNumber.Should().Be(95);
}

[Test]
public async Task ShouldNotRedownloadExistingReceipts()
{
_feed.InitializeFeed();
_receiptStorage.HasBlock(Arg.Is(_pivotBlock.Number - 2), Arg.Any<Hash256>()).Returns(true);
_receiptStorage.HasBlock(Arg.Is(_pivotBlock.Number - 4), Arg.Any<Hash256>()).Returns(true);

using ReceiptsSyncBatch req = (await _feed.PrepareRequest())!;

req.Infos
.Where((bi) => bi is not null)
.Select((bi) => bi!.BlockNumber)
.Take(4)
.Should()
.BeEquivalentTo([
_pivotBlock.Number,
_pivotBlock.Number - 1,
// Skipped
_pivotBlock.Number - 3,
// Skipped
_pivotBlock.Number - 5]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Autofac.Features.AttributeFilters;
using Microsoft.Extensions.DependencyInjection;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
Expand Down Expand Up @@ -130,11 +131,20 @@ private void PostFinishCleanUp()
BodiesSyncBatch? batch = null;
if (ShouldBuildANewBatch())
{
BlockInfo?[] infos = new BlockInfo[_requestSize];
_syncStatusList.GetInfosForBatch(infos);
BlockInfo?[] infos = null;
while (!_syncStatusList.TryGetInfosForBatch(_requestSize, (info) => _blockTree.HasBlock(info.BlockNumber, info.BlockHash), out infos))
{
token.ThrowIfCancellationRequested();

// Otherwise, the progress does not update correctly
_blockTree.LowestInsertedBodyNumber = _syncStatusList.LowestInsertWithoutGaps;
UpdateSyncReport();
}

if (infos[0] is not null)
{
batch = new BodiesSyncBatch(infos);
// Used for peer allocation. It pick peer which have the at least this number
batch.MinNumber = infos[0].BlockNumber;
batch.Prioritized = true;
}
Expand Down
Loading

0 comments on commit 0fdb10a

Please sign in to comment.