From 57ce96778fb07dc73004b6058383fa903fbee6af Mon Sep 17 00:00:00 2001 From: "Crunch (Chaz9)" Date: Sat, 2 Mar 2024 17:29:33 +0000 Subject: [PATCH] l --- .../network/internal/ReceivedPacket.h | 62 ++++++++ .../network/internal/ReceivedPacketCache.cpp | 106 +++++++++++++ .../network/internal/ReceivedPacketCache.h | 37 +++++ .../network/internal/SentPacket.h | 35 ++++ .../network/internal/SentPacketCache.cpp | 149 ++++++++++++++++++ 5 files changed, 389 insertions(+) create mode 100644 sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacket.h create mode 100644 sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.cpp create mode 100644 sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.h create mode 100644 sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacket.h create mode 100644 sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacketCache.cpp diff --git a/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacket.h b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacket.h new file mode 100644 index 00000000..662fd110 --- /dev/null +++ b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacket.h @@ -0,0 +1,62 @@ +/* +* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution. +* Copyright (C) 2017-2024 by Eukaryot +* +* Published under the GNU GPLv3 open source software license, see license.txt +* or https://www.gnu.org/licenses/gpl-3.0.en.html +*/ + +#pragma once + +#include "oxygen_netcore/network/Sockets.h" + +class NetConnection; + + +struct ReceivedPacket +{ +friend class ReceivedPacketCache; + +public: + struct Dump + { + std::vector mPackets; + }; + +public: + std::vector mContent; + SocketAddress mSenderAddress; + uint16 mLowLevelSignature = 0; + NetConnection* mConnection = nullptr; + +public: + inline void initializeWithDump(Dump* dump) + { + RMX_ASSERT(nullptr == mDump, "Expected a packet that is removed from the dump to not have a dump pointer already set"); + RMX_ASSERT(mReferenceCounter == 0, "Expected a packet that is removed from the dump to not have a reference count of zero"); + mDump = dump; + mReferenceCounter = 1; + } + + inline void incReferenceCounter() + { + RMX_ASSERT(nullptr != mDump, "Reference counting is used for an instance already returned to the dump"); + ++mReferenceCounter; + } + + inline void decReferenceCounter() + { + RMX_ASSERT(nullptr != mDump, "Reference counting is used for an instance already returned to the dump"); + RMX_ASSERT(mReferenceCounter > 0, "Trying to remove a reference when counter already is at zero"); + --mReferenceCounter; + if (mReferenceCounter == 0) + { + mDump->mPackets.push_back(this); + mDump = nullptr; // Mark as being returned + } + } + +private: + Dump* mDump = nullptr; + int mReferenceCounter = 0; +}; diff --git a/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.cpp b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.cpp new file mode 100644 index 00000000..07505583 --- /dev/null +++ b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.cpp @@ -0,0 +1,106 @@ +/* +* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution. +* Copyright (C) 2017-2024 by Eukaryot +* +* Published under the GNU GPLv3 open source software license, see license.txt +* or https://www.gnu.org/licenses/gpl-3.0.en.html +*/ + +#include "oxygen_netcore/pch.h" +#include "oxygen_netcore/network/internal/ReceivedPacketCache.h" +#include "oxygen_netcore/network/LowLevelPackets.h" + + +ReceivedPacketCache::~ReceivedPacketCache() +{ + clear(); +} + +void ReceivedPacketCache::clear() +{ + // Remove references to all received packets still in the queue + for (CacheItem& item : mQueue) + { + if (nullptr != item.mReceivedPacket) + item.mReceivedPacket->decReferenceCounter(); + } + mQueue.clear(); + mLastExtractedUniquePacketID = 0; +} + +bool ReceivedPacketCache::enqueuePacket(ReceivedPacket& receivedPacket, const lowlevel::HighLevelPacket& packet, VectorBinarySerializer& serializer, uint32 uniqueRequestID) +{ + ReceivedPacketCache::CacheItem* itemToFill = nullptr; + + // Check the unique packet ID + const uint32 uniquePacketID = packet.mUniquePacketID; + const uint32 lastEnqueuedPacketID = mLastExtractedUniquePacketID + (uint32)mQueue.size(); + if (uniquePacketID <= lastEnqueuedPacketID) + { + if (uniquePacketID <= mLastExtractedUniquePacketID) + { + // It was extracted previously already + return false; + } + + // It's part of the queue, find it + const uint32 firstQueueIndex = mLastExtractedUniquePacketID + 1; + const size_t queueIndex = (size_t)(uniquePacketID - firstQueueIndex); + if (nullptr != mQueue[queueIndex].mReceivedPacket) + { + // It already exists in the cache + return false; + } + + // Replace the empty spot with the received packet + itemToFill = &mQueue[queueIndex]; + } + else + { + // Is it the next packet that we're just waiting for? + uint32 indexAfterQueue = uniquePacketID - (lastEnqueuedPacketID + 1); + if (indexAfterQueue != 0) + { + // Create a gap... + // TODO: Check if that would create a very large gap + for (size_t k = 0; k < indexAfterQueue; ++k) + { + mQueue.emplace_back(); + } + } + + // Enqueue the packet content + mQueue.emplace_back(); + itemToFill = &mQueue.back(); + } + + // Fill the cache item accordingly + itemToFill->mPacketHeader = packet; + itemToFill->mUniqueRequestID = uniqueRequestID; + itemToFill->mHeaderSize = serializer.getReadPosition(); + itemToFill->mReceivedPacket = &receivedPacket; + + // Retain packet while it's referenced in the queue + receivedPacket.incReferenceCounter(); + + // Now the packet was enqueued, and we can't do more here right now + return true; +} + +bool ReceivedPacketCache::extractPacket(CacheItem& outExtractionResult) +{ + if (mQueue.empty()) + return false; + + CacheItem& item = mQueue.front(); + if (nullptr == item.mReceivedPacket) + return false; + + RMX_ASSERT(item.mPacketHeader.mUniquePacketID == mLastExtractedUniquePacketID + 1, "Detected a mismatch in unique packet IDs"); + // Note that the reference counter does not get decreased here, the caller of this method takes over the reference + outExtractionResult = item; + + mQueue.pop_front(); + ++mLastExtractedUniquePacketID; + return true; +} diff --git a/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.h b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.h new file mode 100644 index 00000000..2a51f076 --- /dev/null +++ b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/ReceivedPacketCache.h @@ -0,0 +1,37 @@ +/* +* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution. +* Copyright (C) 2017-2024 by Eukaryot +* +* Published under the GNU GPLv3 open source software license, see license.txt +* or https://www.gnu.org/licenses/gpl-3.0.en.html +*/ + +#pragma once + +#include "oxygen_netcore/network/internal/ReceivedPacket.h" +#include "oxygen_netcore/network/LowLevelPackets.h" + + +class ReceivedPacketCache +{ +public: + struct CacheItem + { + lowlevel::HighLevelPacket mPacketHeader; + uint32 mUniqueRequestID = 0; // Only used for "lowlevel::RequestResponsePacket" + size_t mHeaderSize = 0; + ReceivedPacket* mReceivedPacket = nullptr; + }; + +public: + ~ReceivedPacketCache(); + + void clear(); + + bool enqueuePacket(ReceivedPacket& receivedPacket, const lowlevel::HighLevelPacket& packet, VectorBinarySerializer& serializer, uint32 uniqueRequestID); + bool extractPacket(CacheItem& outExtractionResult); + +private: + uint32 mLastExtractedUniquePacketID = 0; + std::deque mQueue; +}; diff --git a/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacket.h b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacket.h new file mode 100644 index 00000000..325d9b73 --- /dev/null +++ b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacket.h @@ -0,0 +1,35 @@ +/* +* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution. +* Copyright (C) 2017-2024 by Eukaryot +* +* Published under the GNU GPLv3 open source software license, see license.txt +* or https://www.gnu.org/licenses/gpl-3.0.en.html +*/ + +#pragma once + +#include + + +struct SentPacket +{ +public: + std::vector mContent; + uint64 mInitialTimestamp = 0; + uint64 mLastSendTimestamp = 0; + int mResendCounter = 0; + +public: + inline void initializeWithPool(RentableObjectPool& pool) + { + mOwningPool = &pool; + } + + inline void returnToPool() + { + mOwningPool->returnObject(*this); + } + +private: + RentableObjectPool* mOwningPool = nullptr; +}; diff --git a/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacketCache.cpp b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacketCache.cpp new file mode 100644 index 00000000..aa334f1e --- /dev/null +++ b/sonic3air-main/Oxygen/oxygenengine/source/oxygen_netcore/network/internal/SentPacketCache.cpp @@ -0,0 +1,149 @@ +/* +* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution. +* Copyright (C) 2017-2024 by Eukaryot +* +* Published under the GNU GPLv3 open source software license, see license.txt +* or https://www.gnu.org/licenses/gpl-3.0.en.html +*/ + +#include "oxygen_netcore/pch.h" +#include "oxygen_netcore/network/internal/SentPacketCache.h" +#include "oxygen_netcore/network/LowLevelPackets.h" + + +void SentPacketCache::clear() +{ + for (SentPacket* sentPacket : mQueue) + { + if (nullptr != sentPacket) + sentPacket->returnToPool(); + } + mQueue.clear(); + mQueueStartUniquePacketID = 0; + mNextUniquePacketID = 1; +} + +uint32 SentPacketCache::getNextUniquePacketID() const +{ + return mNextUniquePacketID; +} + +void SentPacketCache::addPacket(SentPacket& sentPacket, uint64 currentTimestamp, bool isStartConnectionPacket) +{ + // Special handling if this is the first packet added + if (mQueueStartUniquePacketID == 0) + { + // First expected incoming unique packet ID can be 0 or 1: + // - On client side, ID 0 is the first one, as it's used for the StartConnectionPacket (parameter "isStartConnectionPacket" is true in that exact case) + // - On server side, no packet with ID 0 will be added, so we'd expect the first packet to use ID 1 + RMX_ASSERT(mNextUniquePacketID <= 1, "Unique packet ID differs from expected ID"); + + const uint32 uniquePacketID = isStartConnectionPacket ? 0 : 1; + mQueueStartUniquePacketID = uniquePacketID; + mNextUniquePacketID = uniquePacketID; + } + else + { + RMX_ASSERT(!isStartConnectionPacket, "When adding a isStartConnectionPacket, it must be the first one in the cache"); + } + + sentPacket.mInitialTimestamp = currentTimestamp; + sentPacket.mLastSendTimestamp = currentTimestamp; + + mQueue.push_back(&sentPacket); + ++mNextUniquePacketID; +} + +void SentPacketCache::onPacketReceiveConfirmed(uint32 uniquePacketID) +{ + // If the ID part is not part of the queue, ignore it + if (uniquePacketID < mQueueStartUniquePacketID) + return; + const size_t index = uniquePacketID - mQueueStartUniquePacketID; + if (index >= mQueue.size()) + return; + + // Also ignore if the packet already got confirmed + if (nullptr == mQueue[index]) + return; + + mQueue[index]->returnToPool(); + mQueue[index] = nullptr; + + // Remove as many items from the queue as possible + if (index == 0) + { + do + { + mQueue.pop_front(); + ++mQueueStartUniquePacketID; + } + while (!mQueue.empty() && nullptr == mQueue.front()); + } +} + +void SentPacketCache::updateResend(std::vector& outPacketsToResend, uint64 currentTimestamp) +{ + if (mQueue.empty()) + return; + + const uint64 minimumInitialTimestamp = currentTimestamp - 500; // Start resending packets after 500 ms + int timeBetweenResends = 500; + size_t remainingPacketsToConsider = 3; + + // Check the first packet in the queue, i.e. the one that's waiting for the longest time + // -> That one determines how many packets in the queue are even considered for resending, and how long to wait between resends + { + SentPacket& sentPacket = *mQueue.front(); + if (sentPacket.mInitialTimestamp > minimumInitialTimestamp) + return; + + if (sentPacket.mResendCounter < 5) + { + // Until 5th resend (2.5 seconds gone): Send with a high frequency + timeBetweenResends = 500; + remainingPacketsToConsider = 3; + } + else if (sentPacket.mResendCounter < 10) + { + // Until 10th resend (10 seconds gone): The connection seems to have some issues, reduce resending + timeBetweenResends = 1500; + remainingPacketsToConsider = 2; + } + else + { + // After that: There's serious connection problems, reduce resending to a minimum + timeBetweenResends = 2500; + remainingPacketsToConsider = 1; + } + } + + size_t index = 0; + while (true) + { + // Skip the already confirmed packets + if (nullptr != mQueue[index]) + { + SentPacket& sentPacket = *mQueue[index]; + if (currentTimestamp >= sentPacket.mLastSendTimestamp + timeBetweenResends) + { + ++sentPacket.mResendCounter; + sentPacket.mLastSendTimestamp = currentTimestamp; + + // Trigger a resend + outPacketsToResend.push_back(&sentPacket); + } + + --remainingPacketsToConsider; + if (remainingPacketsToConsider == 0) + return; + } + + // Go to the next packet in the queue + ++index; + if (index >= mQueue.size()) + return; + if (nullptr != mQueue[index] && mQueue[index]->mInitialTimestamp > minimumInitialTimestamp) + return; + } +}