From 3f94ea947051aa99481a56b1c00a24b8f08018b2 Mon Sep 17 00:00:00 2001 From: Yang Chun Ung Date: Tue, 10 Sep 2024 17:16:54 +0900 Subject: [PATCH] Use GetStates for reduce request --- ArenaService/ArenaService/ArenaWorker.cs | 1 - ArenaService/ArenaService/RpcClient.cs | 161 +++++++++++++++++------ 2 files changed, 119 insertions(+), 43 deletions(-) diff --git a/ArenaService/ArenaService/ArenaWorker.cs b/ArenaService/ArenaService/ArenaWorker.cs index b48f54c..79df974 100644 --- a/ArenaService/ArenaService/ArenaWorker.cs +++ b/ArenaService/ArenaService/ArenaWorker.cs @@ -66,7 +66,6 @@ public async Task PrepareArenaParticipants() var currentRoundData = await _rpcClient.GetRoundData(tip); var participants = await _rpcClient.GetArenaParticipantsState(tip, currentRoundData); var cacheKey = $"{currentRoundData.ChampionshipId}_{currentRoundData.Round}"; - var scoreCacheKey = $"{cacheKey}_score"; var expiry = TimeSpan.FromMinutes(5); if (participants is null) { diff --git a/ArenaService/ArenaService/RpcClient.cs b/ArenaService/ArenaService/RpcClient.cs index e71dc00..ea253dc 100644 --- a/ArenaService/ArenaService/RpcClient.cs +++ b/ArenaService/ArenaService/RpcClient.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using Bencodex; using Bencodex.Types; using Grpc.Core; @@ -344,11 +345,16 @@ public async Task> GetArenaParticipants(Block block, List var itemSlotStates = await GetItemSlotStates(block, avatarAddrList); var runeSlotStates = await GetRuneSlotStates(block, avatarAddrList); + var avatarStates = await GetAvatarStates(block, avatarAddrList); + var allRuneStates = await GetAllRuneStates(block, avatarAddrList); var tasks = avatarAddrAndScoresWithRank.Select(async tuple => { var avatarAddr = tuple.AvatarAddr; - var runeStates = await GetRuneState(block, avatarAddr, runeListSheet); - var avatar = await GetAvatarState(block, avatarAddr); + if (!allRuneStates.TryGetValue(avatarAddr, out var runeStates)) + { + runeStates = await GetRuneState(block, avatarAddr, runeListSheet); + } + var avatar = avatarStates[avatarAddr]; var itemSlotState = itemSlotStates[avatarAddr]; var runeSlotState = runeSlotStates[avatarAddr]; @@ -430,18 +436,23 @@ public async Task> GetCollectionStates(Bloc return result; } - public async Task> GetStates(Block block, Address accountAddress, IReadOnlyList
addresses) + public async Task> GetStates(Block block, Address accountAddress, IReadOnlyList
addresses, int chunkSize = 500) { var result = new ConcurrentDictionary(); - var queryResult = await _service.GetBulkStateByStateRootHash(block.StateRootHash.ToByteArray(), accountAddress.ToByteArray(), - addresses.Select(i => i.ToByteArray())); - queryResult - .AsParallel() - .WithDegreeOfParallelism(MaxDegreeOfParallelism) - .ForAll(kv => - { - result.TryAdd(new Address(kv.Key), _codec.Decode(kv.Value)); - }); + var chunks = addresses + .Select((x, i) => new {Index = i, Value = x}) + .GroupBy(x => x.Index / chunkSize) + .Select(x => x.Select(v => v.Value.ToByteArray()).ToList()) + .ToList(); + ParallelOptions options = new() + { + MaxDegreeOfParallelism = MaxDegreeOfParallelism + }; + await Parallel.ForEachAsync(chunks, options, async (chunk, _) => + { + var queryResult = await _service.GetBulkStateByStateRootHash(block.StateRootHash.ToByteArray(), accountAddress.ToByteArray(), chunk); + foreach (var kv in queryResult) result[new Address(kv.Key)] = _codec.Decode(kv.Value); + }); return result.ToDictionary(kv => kv.Key, kv => kv.Value); } @@ -453,14 +464,11 @@ public async Task GetRuneState(Block block, Address avatarAddress, { // Get legacy rune states allRuneState = new AllRuneState(); - foreach (var rune in runeListSheet.Values) + var runeAddresses = runeListSheet.Values.Select(r => RuneState.DeriveAddress(avatarAddress, r.Id)).ToList(); + var runeStates = await GetRuneStates(block, runeAddresses); + foreach (var runeState in runeStates) { - var runeAddress = RuneState.DeriveAddress(avatarAddress, rune.Id); - if (await GetLegacyState(block, runeAddress) is List rawState) - { - var runeState = new RuneState(rawState); - allRuneState.AddRuneState(runeState); - } + allRuneState.AddRuneState(runeState); } } else @@ -538,21 +546,23 @@ public async Task GetAvatarState(Block block, Address avatarAddress /// The list of addresses to retrieve the item slot states for. /// A dictionary of Address and pairs representing the item slot states /// for the given addresses. - public async Task> GetItemSlotStates(Block block, IReadOnlyList
avatarAddresses) + public async Task> GetItemSlotStates(Block block, IReadOnlyList
avatarAddresses) { - var result = new Dictionary(); + var result = new ConcurrentDictionary(); var slotAddresses = avatarAddresses.Select(a => ItemSlotState.DeriveAddress(a, BattleType.Arena)).ToList(); var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses); - foreach (var address in avatarAddresses) - { - var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena); - var serialized = values[slotAddress]; - var itemSlotState = serialized is List bencoded - ? new ItemSlotState(bencoded) - : new ItemSlotState(BattleType.Arena); - result.TryAdd(address, itemSlotState); - } - + avatarAddresses + .AsParallel() + .WithDegreeOfParallelism(MaxDegreeOfParallelism) + .ForAll(address => + { + var slotAddress = ItemSlotState.DeriveAddress(address, BattleType.Arena); + var serialized = values[slotAddress]; + var itemSlotState = serialized is List bencoded + ? new ItemSlotState(bencoded) + : new ItemSlotState(BattleType.Arena); + result.TryAdd(address, itemSlotState); + }); return result; } @@ -563,21 +573,88 @@ public async Task> GetItemSlotStates(Block bl /// The list of avatar addresses to retrieve the rune slot states for. /// A dictionary of Address and pairs representing the rune slot states /// for the given addresses. - public async Task> GetRuneSlotStates(Block block, IReadOnlyList
avatarAddresses) + public async Task> GetRuneSlotStates(Block block, IReadOnlyList
avatarAddresses) { - var result = new Dictionary(); + var result = new ConcurrentDictionary(); var slotAddresses = avatarAddresses.Select(a => RuneSlotState.DeriveAddress(a, BattleType.Arena)).ToList(); var values = await GetStates(block, ReservedAddresses.LegacyAccount, slotAddresses); - foreach (var address in avatarAddresses) - { - var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena); - var serialized = values[slotAddress]; - var runeSlotState = serialized is List bencoded - ? new RuneSlotState(bencoded) - : new RuneSlotState(BattleType.Arena); - result.TryAdd(address, runeSlotState); - } + avatarAddresses + .AsParallel() + .WithDegreeOfParallelism(MaxDegreeOfParallelism) + .ForAll(address => + { + var slotAddress = RuneSlotState.DeriveAddress(address, BattleType.Arena); + var serialized = values[slotAddress]; + var runeSlotState = serialized is List bencoded + ? new RuneSlotState(bencoded) + : new RuneSlotState(BattleType.Arena); + result.TryAdd(address, runeSlotState); + + }); + return result; + } + public async Task> GetAvatarStates(Block block, + IReadOnlyList
avatarAddresses) + { + var avatarResults = await GetStates(block, Addresses.Avatar, avatarAddresses); + var inventoryResults = await GetStates(block, Addresses.Inventory, avatarAddresses); + var result = new ConcurrentDictionary(); + avatarResults + .AsParallel() + .WithDegreeOfParallelism(MaxDegreeOfParallelism) + .ForAll(kv => + { + var address = kv.Key; + var avatarResult = kv.Value; + if (avatarResult is List l) + { + var avatarState = new AvatarState(l); + var inventory = inventoryResults[address] is List l2 + ? new Inventory(l2) + : new Inventory(); + avatarState.inventory = inventory; + result.TryAdd(address, avatarState); + } + }); + return result; + } + + public async Task> GetRuneStates(Block block, IReadOnlyList
runeAddresses) + { + var result = new ConcurrentBag(); + var runeResults = await GetStates(block, ReservedAddresses.LegacyAccount, runeAddresses); + runeResults + .AsParallel() + .WithDegreeOfParallelism(MaxDegreeOfParallelism) + .ForAll(pair => + { + if (pair.Value is List rawState) + { + var runeState = new RuneState(rawState); + result.Add(runeState); + } + }); + return result; + } + + public async Task> GetAllRuneStates(Block block, + IReadOnlyList
avatarAddresses) + { + var serializedResults = await GetStates(block, Addresses.RuneState, avatarAddresses); + var result = new ConcurrentDictionary(); + avatarAddresses + .AsParallel() + .WithDegreeOfParallelism(MaxDegreeOfParallelism) + .ForAll(address => + { + var serialized = serializedResults[address]; + if (serialized is List l) + { + var allRuneState = new AllRuneState(l); + result.TryAdd(address, allRuneState); + } + }); return result; }