Skip to content

Commit

Permalink
l
Browse files Browse the repository at this point in the history
  • Loading branch information
CCIGAMES committed Mar 2, 2024
1 parent 1583308 commit 57ce967
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution.
* Copyright (C) 2017-2024 by Eukaryot
*
* Published under the GNU GPLv3 open source software license, see license.txt
* or https://www.gnu.org/licenses/gpl-3.0.en.html
*/

#pragma once

#include "oxygen_netcore/network/Sockets.h"

class NetConnection;


struct ReceivedPacket
{
friend class ReceivedPacketCache;

public:
struct Dump
{
std::vector<const ReceivedPacket*> mPackets;
};

public:
std::vector<uint8> mContent;
SocketAddress mSenderAddress;
uint16 mLowLevelSignature = 0;
NetConnection* mConnection = nullptr;

public:
inline void initializeWithDump(Dump* dump)
{
RMX_ASSERT(nullptr == mDump, "Expected a packet that is removed from the dump to not have a dump pointer already set");
RMX_ASSERT(mReferenceCounter == 0, "Expected a packet that is removed from the dump to not have a reference count of zero");
mDump = dump;
mReferenceCounter = 1;
}

inline void incReferenceCounter()
{
RMX_ASSERT(nullptr != mDump, "Reference counting is used for an instance already returned to the dump");
++mReferenceCounter;
}

inline void decReferenceCounter()
{
RMX_ASSERT(nullptr != mDump, "Reference counting is used for an instance already returned to the dump");
RMX_ASSERT(mReferenceCounter > 0, "Trying to remove a reference when counter already is at zero");
--mReferenceCounter;
if (mReferenceCounter == 0)
{
mDump->mPackets.push_back(this);
mDump = nullptr; // Mark as being returned
}
}

private:
Dump* mDump = nullptr;
int mReferenceCounter = 0;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution.
* Copyright (C) 2017-2024 by Eukaryot
*
* Published under the GNU GPLv3 open source software license, see license.txt
* or https://www.gnu.org/licenses/gpl-3.0.en.html
*/

#include "oxygen_netcore/pch.h"
#include "oxygen_netcore/network/internal/ReceivedPacketCache.h"
#include "oxygen_netcore/network/LowLevelPackets.h"


ReceivedPacketCache::~ReceivedPacketCache()
{
clear();
}

void ReceivedPacketCache::clear()
{
// Remove references to all received packets still in the queue
for (CacheItem& item : mQueue)
{
if (nullptr != item.mReceivedPacket)
item.mReceivedPacket->decReferenceCounter();
}
mQueue.clear();
mLastExtractedUniquePacketID = 0;
}

bool ReceivedPacketCache::enqueuePacket(ReceivedPacket& receivedPacket, const lowlevel::HighLevelPacket& packet, VectorBinarySerializer& serializer, uint32 uniqueRequestID)
{
ReceivedPacketCache::CacheItem* itemToFill = nullptr;

// Check the unique packet ID
const uint32 uniquePacketID = packet.mUniquePacketID;
const uint32 lastEnqueuedPacketID = mLastExtractedUniquePacketID + (uint32)mQueue.size();
if (uniquePacketID <= lastEnqueuedPacketID)
{
if (uniquePacketID <= mLastExtractedUniquePacketID)
{
// It was extracted previously already
return false;
}

// It's part of the queue, find it
const uint32 firstQueueIndex = mLastExtractedUniquePacketID + 1;
const size_t queueIndex = (size_t)(uniquePacketID - firstQueueIndex);
if (nullptr != mQueue[queueIndex].mReceivedPacket)
{
// It already exists in the cache
return false;
}

// Replace the empty spot with the received packet
itemToFill = &mQueue[queueIndex];
}
else
{
// Is it the next packet that we're just waiting for?
uint32 indexAfterQueue = uniquePacketID - (lastEnqueuedPacketID + 1);
if (indexAfterQueue != 0)
{
// Create a gap...
// TODO: Check if that would create a very large gap
for (size_t k = 0; k < indexAfterQueue; ++k)
{
mQueue.emplace_back();
}
}

// Enqueue the packet content
mQueue.emplace_back();
itemToFill = &mQueue.back();
}

// Fill the cache item accordingly
itemToFill->mPacketHeader = packet;
itemToFill->mUniqueRequestID = uniqueRequestID;
itemToFill->mHeaderSize = serializer.getReadPosition();
itemToFill->mReceivedPacket = &receivedPacket;

// Retain packet while it's referenced in the queue
receivedPacket.incReferenceCounter();

// Now the packet was enqueued, and we can't do more here right now
return true;
}

bool ReceivedPacketCache::extractPacket(CacheItem& outExtractionResult)
{
if (mQueue.empty())
return false;

CacheItem& item = mQueue.front();
if (nullptr == item.mReceivedPacket)
return false;

RMX_ASSERT(item.mPacketHeader.mUniquePacketID == mLastExtractedUniquePacketID + 1, "Detected a mismatch in unique packet IDs");
// Note that the reference counter does not get decreased here, the caller of this method takes over the reference
outExtractionResult = item;

mQueue.pop_front();
++mLastExtractedUniquePacketID;
return true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution.
* Copyright (C) 2017-2024 by Eukaryot
*
* Published under the GNU GPLv3 open source software license, see license.txt
* or https://www.gnu.org/licenses/gpl-3.0.en.html
*/

#pragma once

#include "oxygen_netcore/network/internal/ReceivedPacket.h"
#include "oxygen_netcore/network/LowLevelPackets.h"


class ReceivedPacketCache
{
public:
struct CacheItem
{
lowlevel::HighLevelPacket mPacketHeader;
uint32 mUniqueRequestID = 0; // Only used for "lowlevel::RequestResponsePacket"
size_t mHeaderSize = 0;
ReceivedPacket* mReceivedPacket = nullptr;
};

public:
~ReceivedPacketCache();

void clear();

bool enqueuePacket(ReceivedPacket& receivedPacket, const lowlevel::HighLevelPacket& packet, VectorBinarySerializer& serializer, uint32 uniqueRequestID);
bool extractPacket(CacheItem& outExtractionResult);

private:
uint32 mLastExtractedUniquePacketID = 0;
std::deque<CacheItem> mQueue;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution.
* Copyright (C) 2017-2024 by Eukaryot
*
* Published under the GNU GPLv3 open source software license, see license.txt
* or https://www.gnu.org/licenses/gpl-3.0.en.html
*/

#pragma once

#include <rmxbase.h>


struct SentPacket
{
public:
std::vector<uint8> mContent;
uint64 mInitialTimestamp = 0;
uint64 mLastSendTimestamp = 0;
int mResendCounter = 0;

public:
inline void initializeWithPool(RentableObjectPool<SentPacket>& pool)
{
mOwningPool = &pool;
}

inline void returnToPool()
{
mOwningPool->returnObject(*this);
}

private:
RentableObjectPool<SentPacket>* mOwningPool = nullptr;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Part of the Oxygen Engine / Sonic 3 A.I.R. software distribution.
* Copyright (C) 2017-2024 by Eukaryot
*
* Published under the GNU GPLv3 open source software license, see license.txt
* or https://www.gnu.org/licenses/gpl-3.0.en.html
*/

#include "oxygen_netcore/pch.h"
#include "oxygen_netcore/network/internal/SentPacketCache.h"
#include "oxygen_netcore/network/LowLevelPackets.h"


void SentPacketCache::clear()
{
for (SentPacket* sentPacket : mQueue)
{
if (nullptr != sentPacket)
sentPacket->returnToPool();
}
mQueue.clear();
mQueueStartUniquePacketID = 0;
mNextUniquePacketID = 1;
}

uint32 SentPacketCache::getNextUniquePacketID() const
{
return mNextUniquePacketID;
}

void SentPacketCache::addPacket(SentPacket& sentPacket, uint64 currentTimestamp, bool isStartConnectionPacket)
{
// Special handling if this is the first packet added
if (mQueueStartUniquePacketID == 0)
{
// First expected incoming unique packet ID can be 0 or 1:
// - On client side, ID 0 is the first one, as it's used for the StartConnectionPacket (parameter "isStartConnectionPacket" is true in that exact case)
// - On server side, no packet with ID 0 will be added, so we'd expect the first packet to use ID 1
RMX_ASSERT(mNextUniquePacketID <= 1, "Unique packet ID differs from expected ID");

const uint32 uniquePacketID = isStartConnectionPacket ? 0 : 1;
mQueueStartUniquePacketID = uniquePacketID;
mNextUniquePacketID = uniquePacketID;
}
else
{
RMX_ASSERT(!isStartConnectionPacket, "When adding a isStartConnectionPacket, it must be the first one in the cache");
}

sentPacket.mInitialTimestamp = currentTimestamp;
sentPacket.mLastSendTimestamp = currentTimestamp;

mQueue.push_back(&sentPacket);
++mNextUniquePacketID;
}

void SentPacketCache::onPacketReceiveConfirmed(uint32 uniquePacketID)
{
// If the ID part is not part of the queue, ignore it
if (uniquePacketID < mQueueStartUniquePacketID)
return;
const size_t index = uniquePacketID - mQueueStartUniquePacketID;
if (index >= mQueue.size())
return;

// Also ignore if the packet already got confirmed
if (nullptr == mQueue[index])
return;

mQueue[index]->returnToPool();
mQueue[index] = nullptr;

// Remove as many items from the queue as possible
if (index == 0)
{
do
{
mQueue.pop_front();
++mQueueStartUniquePacketID;
}
while (!mQueue.empty() && nullptr == mQueue.front());
}
}

void SentPacketCache::updateResend(std::vector<SentPacket*>& outPacketsToResend, uint64 currentTimestamp)
{
if (mQueue.empty())
return;

const uint64 minimumInitialTimestamp = currentTimestamp - 500; // Start resending packets after 500 ms
int timeBetweenResends = 500;
size_t remainingPacketsToConsider = 3;

// Check the first packet in the queue, i.e. the one that's waiting for the longest time
// -> That one determines how many packets in the queue are even considered for resending, and how long to wait between resends
{
SentPacket& sentPacket = *mQueue.front();
if (sentPacket.mInitialTimestamp > minimumInitialTimestamp)
return;

if (sentPacket.mResendCounter < 5)
{
// Until 5th resend (2.5 seconds gone): Send with a high frequency
timeBetweenResends = 500;
remainingPacketsToConsider = 3;
}
else if (sentPacket.mResendCounter < 10)
{
// Until 10th resend (10 seconds gone): The connection seems to have some issues, reduce resending
timeBetweenResends = 1500;
remainingPacketsToConsider = 2;
}
else
{
// After that: There's serious connection problems, reduce resending to a minimum
timeBetweenResends = 2500;
remainingPacketsToConsider = 1;
}
}

size_t index = 0;
while (true)
{
// Skip the already confirmed packets
if (nullptr != mQueue[index])
{
SentPacket& sentPacket = *mQueue[index];
if (currentTimestamp >= sentPacket.mLastSendTimestamp + timeBetweenResends)
{
++sentPacket.mResendCounter;
sentPacket.mLastSendTimestamp = currentTimestamp;

// Trigger a resend
outPacketsToResend.push_back(&sentPacket);
}

--remainingPacketsToConsider;
if (remainingPacketsToConsider == 0)
return;
}

// Go to the next packet in the queue
++index;
if (index >= mQueue.size())
return;
if (nullptr != mQueue[index] && mQueue[index]->mInitialTimestamp > minimumInitialTimestamp)
return;
}
}

0 comments on commit 57ce967

Please sign in to comment.