Skip to content

Commit

Permalink
Abort dispatch if channel size grows beyond a configured threshold.
Browse files Browse the repository at this point in the history
  • Loading branch information
ebarlas committed May 9, 2024
1 parent 492ba78 commit 7619d0d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
3 changes: 2 additions & 1 deletion config/config.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"version": {
"major": 5,
"minor": 0
"minor": 1
},
"windowSize": {
"x": 1200,
"y": 600
},
"maxDispatchChannelSize": 100,
"extraLives": 2,
"highScores": 15,
"levelBonus": 1000,
Expand Down
1 change: 1 addition & 0 deletions src/game/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ void trippin::from_json(const nlohmann::json &j, Configuration &config) {
j.at("windowSize").at("x").get_to(config.windowSize.x);
j.at("windowSize").at("y").get_to(config.windowSize.y);
}
j.at("maxDispatchChannelSize").get_to(config.maxDispatchChannelSize);
j.at("extraLives").get_to(config.extraLives);
j.at("highScores").get_to(config.highScores);
j.at("levelBonus").get_to(config.levelBonus);
Expand Down
1 change: 1 addition & 0 deletions src/game/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace trippin {

Version version;
Point<int> windowSize;
int maxDispatchChannelSize;
int extraLives;
int highScores;
int levelBonus;
Expand Down
8 changes: 8 additions & 0 deletions src/game/Db.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ namespace trippin {
explicit Db(
const char *name,
int version,
int maxChannelSize,
std::function<void(const T &)> addCallback,
std::function<bool(const T &)> dispatchFn) :
name(name),
fileName(makeFileName(name, version)),
maxChannelSize(maxChannelSize),
addCallback(addCallback),
dispatchFn(dispatchFn) {
}
Expand All @@ -48,6 +50,7 @@ namespace trippin {
std::string fileName;
Channel<Event> inChannel;
Channel<T> outChannel;
const int maxChannelSize;
std::function<void(const T &)> addCallback;
std::function<bool(const T &)> dispatchFn;

Expand Down Expand Up @@ -137,6 +140,11 @@ namespace trippin {
break;
}
while (!dispatchFn(*val)) {
auto sz = outChannel.size();
if (sz >= maxChannelSize) {
SDL_Log("aborting dispatch due to full channel, iter=%lu, type=%s, size=%d", iter, name, sz);
break;
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
inChannel.put({*val, false});
Expand Down
10 changes: 6 additions & 4 deletions src/game/Game.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ namespace trippin {
};

void trippin::Game::initDbSynchronizer() {
int version = configuration.version.major;
transport = std::make_unique<Transport>(
configuration.db.host,
configuration.db.port,
configuration.version.major,
version,
configuration.highScores);
stagingArea = std::make_unique<StagingArea>(*transport);
stagingArea->start();
myScores = std::make_unique<MyScores>(configuration.version.major, 10);
myScores = std::make_unique<MyScores>(version, 10);
auto scoreAddCallback = [m = myScores.get()](const Score &s) {
m->addScore(s);
};
Expand All @@ -104,15 +105,16 @@ void trippin::Game::initDbSynchronizer() {
s.id.c_str(), s.game, s.name.c_str(), s.score, toString(result));
return result == AddResult::success || result == AddResult::clientError;
};
scoreDb = std::make_unique<Db<Score>>("scores", configuration.version.major, scoreAddCallback, scoreDispatchFn);
int mdcs = configuration.maxDispatchChannelSize;
scoreDb = std::make_unique<Db<Score>>("scores", version, mdcs, scoreAddCallback, scoreDispatchFn);
scoreDb->start();
auto logAddCallback = [](const LogEvent &e) {};
auto logDispatchFn = [t = transport.get()](const LogEvent &e) {
auto result = t->addLogEvent(e);
SDL_Log("add log event attempted, index=%d, result=%s", e.index, toString(result));
return result == AddResult::success || result == AddResult::clientError;
};
logDb = std::make_unique<Db<LogEvent>>("logs", configuration.version.major, logAddCallback, logDispatchFn);
logDb = std::make_unique<Db<LogEvent>>("logs", version, mdcs, logAddCallback, logDispatchFn);
logDb->start();
}

Expand Down
9 changes: 8 additions & 1 deletion src/net/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace trippin {
return e;
}

bool put(const T& elem) {
bool put(const T &elem) {
{
std::lock_guard lock(mutex);
if (closed) {
Expand All @@ -41,6 +41,13 @@ namespace trippin {
cv.notify_all();
}

int size() {
{
std::lock_guard lock(mutex);
return queue.size();
}
}

private:
std::queue<T> queue;
std::mutex mutex;
Expand Down

0 comments on commit 7619d0d

Please sign in to comment.