Skip to content

Commit

Permalink
Revert parallel loops
Browse files Browse the repository at this point in the history
  • Loading branch information
ipdae committed Sep 11, 2024
1 parent 2633300 commit 20f56fe
Showing 1 changed file with 54 additions and 77 deletions.
131 changes: 54 additions & 77 deletions ArenaService/ArenaService/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ namespace ArenaService;

public class RpcClient: IDisposable, IActionEvaluationHubReceiver
{
private int _maxDegreeOfParallelism;

public Address Address => _privateKey.Address;
public Block PreviousTip;
public Block Tip;
Expand All @@ -42,7 +40,6 @@ public RpcClient(PrivateKey privateKey, IConfiguration configuration)
_privateKey = privateKey;
_codec = new Codec();
_rpcHost = configuration["Rpc:Host"]!;
_maxDegreeOfParallelism = int.Parse(configuration["Rpc:Degree"]!);
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -468,15 +465,11 @@ public async Task<Dictionary<Address, IValue>> GetStates(Block block, Address ac
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value.ToByteArray()).ToList())
.ToList();
ParallelOptions options = new()
{
MaxDegreeOfParallelism = _maxDegreeOfParallelism
};
await Parallel.ForEachAsync(chunks, options, async (chunk, _) =>
foreach (var chunk in chunks)
{
var queryResult = await _service.GetBulkStateByStateRootHash(block.StateRootHash.ToByteArray(), accountAddress.ToByteArray(), chunk);
foreach (var kv in queryResult) result[new Address(kv.Key)] = _codec.Decode(kv.Value);
});
}
return result.ToDictionary(kv => kv.Key, kv => kv.Value);
}

Expand Down Expand Up @@ -572,21 +565,18 @@ public async Task<AvatarState> GetAvatarState(Block block, Address avatarAddress
/// for the given addresses.</returns>
public async Task<IDictionary<Address, ItemSlotState>> GetItemSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
{
var result = new ConcurrentDictionary<Address, ItemSlotState>();
var result = new Dictionary<Address, ItemSlotState>();
var slotAddresses = avatarAddresses.Select(a => ItemSlotState.DeriveAddress(a, BattleType.Arena)).ToList();
var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses);
avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(_maxDegreeOfParallelism)
.ForAll(address =>
{
var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var itemSlotState = serialized is List bencoded
? new ItemSlotState(bencoded)
: new ItemSlotState(BattleType.Arena);
result.TryAdd(address, itemSlotState);
});
foreach (var address in avatarAddresses)
{
var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var itemSlotState = serialized is List bencoded
? new ItemSlotState(bencoded)
: new ItemSlotState(BattleType.Arena);
result.TryAdd(address, itemSlotState);
}
return result;
}

Expand All @@ -599,22 +589,18 @@ public async Task<IDictionary<Address, ItemSlotState>> GetItemSlotStates(Block b
/// for the given addresses.</returns>
public async Task<IDictionary<Address, RuneSlotState>> GetRuneSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
{
var result = new ConcurrentDictionary<Address, RuneSlotState>();
var result = new Dictionary<Address, RuneSlotState>();
var slotAddresses = avatarAddresses.Select(a => RuneSlotState.DeriveAddress(a, BattleType.Arena)).ToList();
var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses);
avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(_maxDegreeOfParallelism)
.ForAll(address =>
{
var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var runeSlotState = serialized is List bencoded
? new RuneSlotState(bencoded)
: new RuneSlotState(BattleType.Arena);
result.TryAdd(address, runeSlotState);

});
foreach (var address in avatarAddresses)
{
var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var runeSlotState = serialized is List bencoded
? new RuneSlotState(bencoded)
: new RuneSlotState(BattleType.Arena);
result.TryAdd(address, runeSlotState);
}
return result;
}

Expand All @@ -623,62 +609,53 @@ public async Task<IDictionary<Address, AvatarState>> GetAvatarStates(Block block
{
var avatarResults = await GetStates(block, Addresses.Avatar, avatarAddresses);
var inventoryResults = await GetStates(block, Addresses.Inventory, avatarAddresses);
var result = new ConcurrentDictionary<Address, AvatarState>();
avatarResults
.AsParallel()
.WithDegreeOfParallelism(_maxDegreeOfParallelism)
.ForAll(kv =>
var result = new Dictionary<Address, AvatarState>();
foreach (var kv in avatarResults)
{
var address = kv.Key;
var avatarResult = kv.Value;
if (avatarResult is List l)
{
var address = kv.Key;
var avatarResult = kv.Value;
if (avatarResult is List l)
{
var avatarState = new AvatarState(l);
var inventory = inventoryResults[address] is List l2
? new Inventory(l2)
: new Inventory();
avatarState.inventory = inventory;
result.TryAdd(address, avatarState);
}
});
var avatarState = new AvatarState(l);
var inventory = inventoryResults[address] is List l2
? new Inventory(l2)
: new Inventory();
avatarState.inventory = inventory;
result.TryAdd(address, avatarState);
}
}
return result;
}

public async Task<ConcurrentBag<RuneState>> GetRuneStates(Block block, IReadOnlyList<Address> runeAddresses)
public async Task<List<RuneState>> GetRuneStates(Block block, IReadOnlyList<Address> runeAddresses)
{
var result = new ConcurrentBag<RuneState>();
var result = new List<RuneState>();
var runeResults = await GetStates(block, ReservedAddresses.LegacyAccount, runeAddresses);
runeResults
.AsParallel()
.WithDegreeOfParallelism(_maxDegreeOfParallelism)
.ForAll(pair =>
foreach (var pair in runeResults)
{
if (pair.Value is List rawState)
{
if (pair.Value is List rawState)
{
var runeState = new RuneState(rawState);
result.Add(runeState);
}
});
var runeState = new RuneState(rawState);
result.Add(runeState);
}
}
return result;
}

public async Task<IDictionary<Address, AllRuneState>> GetAllRuneStates(Block block,
IReadOnlyList<Address> avatarAddresses)
{
var serializedResults = await GetStates(block, Addresses.RuneState, avatarAddresses);
var result = new ConcurrentDictionary<Address, AllRuneState>();
avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(_maxDegreeOfParallelism)
.ForAll(address =>
var result = new Dictionary<Address, AllRuneState>();
foreach (var address in avatarAddresses)
{
var serialized = serializedResults[address];
if (serialized is List l)
{
var serialized = serializedResults[address];
if (serialized is List l)
{
var allRuneState = new AllRuneState(l);
result.TryAdd(address, allRuneState);
}
});
var allRuneState = new AllRuneState(l);
result.TryAdd(address, allRuneState);
}
}
return result;
}

Expand Down

0 comments on commit 20f56fe

Please sign in to comment.