Skip to content

Commit

Permalink
Use GetStates for reduce request
Browse files Browse the repository at this point in the history
  • Loading branch information
ipdae committed Sep 10, 2024
1 parent 7c34c64 commit 3f94ea9
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 43 deletions.
1 change: 0 additions & 1 deletion ArenaService/ArenaService/ArenaWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public async Task PrepareArenaParticipants()
var currentRoundData = await _rpcClient.GetRoundData(tip);
var participants = await _rpcClient.GetArenaParticipantsState(tip, currentRoundData);
var cacheKey = $"{currentRoundData.ChampionshipId}_{currentRoundData.Round}";
var scoreCacheKey = $"{cacheKey}_score";
var expiry = TimeSpan.FromMinutes(5);
if (participants is null)
{
Expand Down
161 changes: 119 additions & 42 deletions ArenaService/ArenaService/RpcClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using Bencodex;
using Bencodex.Types;
using Grpc.Core;
Expand Down Expand Up @@ -344,11 +345,16 @@ public async Task<List<ArenaParticipant>> GetArenaParticipants(Block block, List

var itemSlotStates = await GetItemSlotStates(block, avatarAddrList);
var runeSlotStates = await GetRuneSlotStates(block, avatarAddrList);
var avatarStates = await GetAvatarStates(block, avatarAddrList);
var allRuneStates = await GetAllRuneStates(block, avatarAddrList);
var tasks = avatarAddrAndScoresWithRank.Select(async tuple =>
{
var avatarAddr = tuple.AvatarAddr;
var runeStates = await GetRuneState(block, avatarAddr, runeListSheet);
var avatar = await GetAvatarState(block, avatarAddr);
if (!allRuneStates.TryGetValue(avatarAddr, out var runeStates))
{
runeStates = await GetRuneState(block, avatarAddr, runeListSheet);
}
var avatar = avatarStates[avatarAddr];
var itemSlotState = itemSlotStates[avatarAddr];
var runeSlotState = runeSlotStates[avatarAddr];

Expand Down Expand Up @@ -430,18 +436,23 @@ public async Task<Dictionary<Address, CollectionState>> GetCollectionStates(Bloc
return result;
}

public async Task<Dictionary<Address, IValue>> GetStates(Block block, Address accountAddress, IReadOnlyList<Address> addresses)
public async Task<Dictionary<Address, IValue>> GetStates(Block block, Address accountAddress, IReadOnlyList<Address> addresses, int chunkSize = 500)
{
var result = new ConcurrentDictionary<Address, IValue>();
var queryResult = await _service.GetBulkStateByStateRootHash(block.StateRootHash.ToByteArray(), accountAddress.ToByteArray(),
addresses.Select(i => i.ToByteArray()));
queryResult
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(kv =>
{
result.TryAdd(new Address(kv.Key), _codec.Decode(kv.Value));
});
var chunks = addresses
.Select((x, i) => new {Index = i, Value = x})
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value.ToByteArray()).ToList())
.ToList();
ParallelOptions options = new()
{
MaxDegreeOfParallelism = MaxDegreeOfParallelism
};
await Parallel.ForEachAsync(chunks, options, async (chunk, _) =>
{
var queryResult = await _service.GetBulkStateByStateRootHash(block.StateRootHash.ToByteArray(), accountAddress.ToByteArray(), chunk);
foreach (var kv in queryResult) result[new Address(kv.Key)] = _codec.Decode(kv.Value);
});
return result.ToDictionary(kv => kv.Key, kv => kv.Value);
}

Expand All @@ -453,14 +464,11 @@ public async Task<AllRuneState> GetRuneState(Block block, Address avatarAddress,
{
// Get legacy rune states
allRuneState = new AllRuneState();
foreach (var rune in runeListSheet.Values)
var runeAddresses = runeListSheet.Values.Select(r => RuneState.DeriveAddress(avatarAddress, r.Id)).ToList();
var runeStates = await GetRuneStates(block, runeAddresses);
foreach (var runeState in runeStates)
{
var runeAddress = RuneState.DeriveAddress(avatarAddress, rune.Id);
if (await GetLegacyState(block, runeAddress) is List rawState)
{
var runeState = new RuneState(rawState);
allRuneState.AddRuneState(runeState);
}
allRuneState.AddRuneState(runeState);
}
}
else
Expand Down Expand Up @@ -538,21 +546,23 @@ public async Task<AvatarState> GetAvatarState(Block block, Address avatarAddress
/// <param name="avatarAddresses">The list of addresses to retrieve the item slot states for.</param>
/// <returns>A dictionary of Address and <see cref="ItemSlotState"/> pairs representing the item slot states
/// for the given addresses.</returns>
public async Task<Dictionary<Address, ItemSlotState>> GetItemSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
public async Task<IDictionary<Address, ItemSlotState>> GetItemSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
{
var result = new Dictionary<Address, ItemSlotState>();
var result = new ConcurrentDictionary<Address, ItemSlotState>();
var slotAddresses = avatarAddresses.Select(a => ItemSlotState.DeriveAddress(a, BattleType.Arena)).ToList();
var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses);
foreach (var address in avatarAddresses)
{
var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var itemSlotState = serialized is List bencoded
? new ItemSlotState(bencoded)
: new ItemSlotState(BattleType.Arena);
result.TryAdd(address, itemSlotState);
}

avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(address =>
{
var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var itemSlotState = serialized is List bencoded
? new ItemSlotState(bencoded)
: new ItemSlotState(BattleType.Arena);
result.TryAdd(address, itemSlotState);
});
return result;
}

Expand All @@ -563,21 +573,88 @@ public async Task<Dictionary<Address, ItemSlotState>> GetItemSlotStates(Block bl
/// <param name="avatarAddresses">The list of avatar addresses to retrieve the rune slot states for.</param>
/// <returns>A dictionary of Address and <see cref="RuneSlotState"/> pairs representing the rune slot states
/// for the given addresses.</returns>
public async Task<Dictionary<Address, RuneSlotState>> GetRuneSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
public async Task<IDictionary<Address, RuneSlotState>> GetRuneSlotStates(Block block, IReadOnlyList<Address> avatarAddresses)
{
var result = new Dictionary<Address, RuneSlotState>();
var result = new ConcurrentDictionary<Address, RuneSlotState>();
var slotAddresses = avatarAddresses.Select(a => RuneSlotState.DeriveAddress(a, BattleType.Arena)).ToList();
var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses);
foreach (var address in avatarAddresses)
{
var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var runeSlotState = serialized is List bencoded
? new RuneSlotState(bencoded)
: new RuneSlotState(BattleType.Arena);
result.TryAdd(address, runeSlotState);
}
avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(address =>
{
var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena);
var serialized = values[slotAddress];
var runeSlotState = serialized is List bencoded
? new RuneSlotState(bencoded)
: new RuneSlotState(BattleType.Arena);
result.TryAdd(address, runeSlotState);

});
return result;
}

public async Task<IDictionary<Address, AvatarState>> GetAvatarStates(Block block,
IReadOnlyList<Address> avatarAddresses)
{
var avatarResults = await GetStates(block, Addresses.Avatar, avatarAddresses);
var inventoryResults = await GetStates(block, Addresses.Inventory, avatarAddresses);
var result = new ConcurrentDictionary<Address, AvatarState>();
avatarResults
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(kv =>
{
var address = kv.Key;
var avatarResult = kv.Value;
if (avatarResult is List l)
{
var avatarState = new AvatarState(l);
var inventory = inventoryResults[address] is List l2
? new Inventory(l2)
: new Inventory();
avatarState.inventory = inventory;
result.TryAdd(address, avatarState);
}
});
return result;
}

public async Task<ConcurrentBag<RuneState>> GetRuneStates(Block block, IReadOnlyList<Address> runeAddresses)
{
var result = new ConcurrentBag<RuneState>();
var runeResults = await GetStates(block, ReservedAddresses.LegacyAccount, runeAddresses);
runeResults
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(pair =>
{
if (pair.Value is List rawState)
{
var runeState = new RuneState(rawState);
result.Add(runeState);
}
});
return result;
}

public async Task<IDictionary<Address, AllRuneState>> GetAllRuneStates(Block block,
IReadOnlyList<Address> avatarAddresses)
{
var serializedResults = await GetStates(block, Addresses.RuneState, avatarAddresses);
var result = new ConcurrentDictionary<Address, AllRuneState>();
avatarAddresses
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
.ForAll(address =>
{
var serialized = serializedResults[address];
if (serialized is List l)
{
var allRuneState = new AllRuneState(l);
result.TryAdd(address, allRuneState);
}
});
return result;
}

Expand Down

0 comments on commit 3f94ea9

Please sign in to comment.