Skip to content

Commit

Permalink
サーバー側の新旧logメッセージの処理を書いた
Browse files Browse the repository at this point in the history
  • Loading branch information
na-trium-144 committed Sep 26, 2024
1 parent 0fd04dd commit 2d1b38a
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 46 deletions.
3 changes: 1 addition & 2 deletions client/src/data_store2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ StrMap2<unsigned int> SyncDataStore2<T, ReqT>::transferReq() {
// }
}

// ライブラリ外からは参照できないが、testのためにexportしている
template class SyncDataStore2<std::string, int>; // test用
template class SyncDataStore2<ValueData, int>;
template class SyncDataStore2<TextData, int>;
Expand All @@ -259,6 +258,6 @@ template class SyncDataStore2<RobotModelData, int>;
template class SyncDataStore2<Canvas3DData, int>;
template class SyncDataStore2<Canvas2DData, int>;
template class SyncDataStore2<ImageData, message::ImageReq>;
template class SyncDataStore2<std::shared_ptr<std::deque<LogLineData>>, int>;
template class SyncDataStore2<std::shared_ptr<LogData>, int>;
} // namespace internal
WEBCFACE_NS_END
10 changes: 5 additions & 5 deletions server-store/include/webcface/server/member_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct MemberData {
std::chrono::system_clock::time_point last_sync_time;
//! リクエストしているmember,nameのペア
StrMap2<unsigned int> value_req, text_req, view_req, image_req,
robot_model_req, canvas3d_req, canvas2d_req;
robot_model_req, canvas3d_req, canvas2d_req, log_req;

// image_convert_thread[imageのmember][imageのfield] =
// imageを変換してthisに送るスレッド
Expand All @@ -96,10 +96,11 @@ struct MemberData {
const SharedString &field);
StrMap1<std::vector<std::shared_ptr<message::RobotLink>>> robot_model;

StrSet1 log_req;
bool hasReq(const SharedString &member);
//! 古いLogリクエスト ("default"のログを古いメッセージ形式で返す)
StrSet1 log_req_default;
//! ログ全履歴
std::shared_ptr<std::deque<message::LogLine>> log;
StrMap1<std::shared_ptr<std::deque<message::LogLine>>> log;
/*!
* \brief まだ完了していない自分へのcall呼び出しのリスト
*
Expand All @@ -123,8 +124,7 @@ struct MemberData {
const spdlog::sink_ptr &sink,
spdlog::level::level_enum level)
: sink(sink), logger_level(level), store(store), con(con),
remote_addr(remote_addr),
log(std::make_shared<std::deque<message::LogLine>>()) {
remote_addr(remote_addr) {
this->member_id = ++last_member_id;
logger = std::make_shared<spdlog::logger>(
std::to_string(member_id) + "_(unknown client)", this->sink);
Expand Down
167 changes: 128 additions & 39 deletions server-store/src/member_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,22 @@ void MemberData::onRecv(const std::string &message) {
f.first.decode(), cd->member_id);
}
}
if (!cd->log->empty()) {
this->pack(
webcface::message::LogEntry{{}, cd->member_id});
logger->trace("send log_entry of member {}",
cd->member_id);
for (const auto &f : cd->log) {
if (!f.first.startsWith(field_separator)) {
this->pack(webcface::message::Entry<
webcface::message::Log>{
{}, cd->member_id, f.first});
logger->trace("send log_entry {} of member {}",
f.first.decode(), cd->member_id);
// 古いクライアントのために古いLogEntryも送る
if (f.first == message::Log::defaultLogName()) {
this->pack(webcface::message::LogEntryDefault{
{}, cd->member_id});
logger->trace(
"send log_entry_default(obsolete) of member {}",
cd->member_id);
}
}
}
for (const auto &f : cd->func) {
if (!f.first.startsWith(field_separator)) {
Expand Down Expand Up @@ -607,42 +618,77 @@ void MemberData::onRecv(const std::string &message) {
}
break;
}
case MessageKind::log: {
auto &v = *static_cast<webcface::message::Log *>(obj.get());
v.member_id = this->member_id;
logger->debug("log {} lines", v.log->size());
if (store->keep_log >= 0 &&
this->log->size() <
static_cast<unsigned int>(store->keep_log) &&
this->log->size() + v.log->size() >=
static_cast<unsigned int>(store->keep_log)) {
logger->info("number of log lines reached {}, so the oldest "
"log will be romoved.",
store->keep_log);
case MessageKind::log:
case MessageKind::log_default: {
SharedString field;
std::shared_ptr<std::deque<message::LogLine>> log_data;
if (kind == MessageKind::log) {
auto &v = *static_cast<webcface::message::Log *>(obj.get());
logger->debug("log {}: {} lines", v.field.decode(),
v.log->size());
field = v.field;
log_data = v.log;
} else {
auto &v =
*static_cast<webcface::message::LogDefault *>(obj.get());
logger->debug("log_default(obsolete) {} lines", v.log->size());
field = message::Log::defaultLogName();
log_data = v.log;
}
if (this->log->empty()) {
if (!this->log.count(field)) {
this->log.emplace(field, log_data);
store->forEach([&](auto cd) {
if (cd->name != this->name) {
cd->pack(
webcface::message::LogEntry{{}, this->member_id});
cd->logger->trace("send log_entry of member {}",
this->member_id);
cd->pack(webcface::message::Entry<message::Log>{
{}, this->member_id, field});
cd->logger->trace("send log_entry {} of member {}",
field.decode(), this->member_id);
// 古いクライアントのために古いLogEntryも送る
if (field == message::Log::defaultLogName()) {
cd->pack(webcface::message::LogEntryDefault{
{}, this->member_id});
cd->logger->trace(
"send log_entry_default(obsolete) of member {}",
this->member_id);
}
}
});
}
for (auto &ll : *v.log) {
this->log->push_back(ll);
}
while (store->keep_log >= 0 &&
this->log->size() >
static_cast<unsigned int>(store->keep_log)) {
this->log->pop_front();
} else {
auto this_log = this->log.at(field);
if (store->keep_log >= 0 &&
this_log->size() <
static_cast<unsigned int>(store->keep_log) &&
this_log->size() + log_data->size() >=
static_cast<unsigned int>(store->keep_log)) {
logger->info(
"number of log lines reached {}, so the oldest "
"log will be romoved.",
store->keep_log);
}
for (auto &ll : *log_data) {
this_log->push_back(ll);
}
while (store->keep_log >= 0 &&
this_log->size() >
static_cast<unsigned int>(store->keep_log)) {
this_log->pop_front();
}
}
// このlogをsubscribeしてるところに送り返す
store->forEach([&](auto cd) {
if (cd->log_req.count(this->name)) {
cd->pack(v);
cd->logger->trace("send log {} lines", v.log->size());
auto req_field = findReqField(cd->log_req, this->name, field);
auto &req_id = req_field.first;
auto &sub_field = req_field.second;
if (req_id > 0) {
cd->pack(webcface::message::Res<webcface::message::Log>(
req_id, sub_field, log_data));
cd->logger->trace("send log_res req_id={} + '{}'", req_id,
sub_field.decode());
}
if (cd->log_req_default.count(this->name)) {
cd->pack(message::LogDefault{this->member_id, log_data});
cd->logger->trace("send log_default(obsolete) {} lines",
log_data->size());
}
});
break;
Expand Down Expand Up @@ -922,14 +968,55 @@ void MemberData::onRecv(const std::string &message) {
}
break;
}
case MessageKind::log_req: {
auto &s = *static_cast<webcface::message::LogReq *>(obj.get());
case MessageKind::req + MessageKind::log: {
auto &s =
*static_cast<webcface::message::Req<webcface::message::Log> *>(
obj.get());
logger->debug("request log ({}): {} from {}", s.req_id,
s.field.decode(), s.member.decode());
// 指定した値を返す
store->findAndDo(s.member, [&](auto cd) {
// if (!this->hasReq(s.member)) {
// this->pack(webcface::message::Sync{cd->member_id,
// cd->last_sync_time});
// logger->trace("send sync {}", this->member_id);
// }
for (const auto &it : cd->log) {
if (it.first == s.field ||
it.first.startsWith(s.field.u8String() +
field_separator)) {
SharedString sub_field;
if (it.first == s.field) {
} else {
sub_field = SharedString::fromU8String(
it.first.u8String().substr(
s.field.u8String().size() + 1));
}
this->pack(
webcface::message::Res<webcface::message::Log>{
s.req_id, sub_field, it.second});
logger->trace("send log_res {} lines, req_id={} + '{}'",
it.second->size(), s.req_id,
sub_field.decode());
}
}
});
log_req[s.member][s.field] = s.req_id;
break;
}
case MessageKind::log_req_default: {
auto &s =
*static_cast<webcface::message::LogReqDefault *>(obj.get());
logger->debug("request log from {}", s.member.decode());
log_req.insert(s.member);
this->log_req_default.insert(s.member);
// 指定した値を返す
store->findAndDo(s.member, [&](auto cd) {
this->pack(webcface::message::Log{cd->member_id, cd->log});
logger->trace("send log {} lines", cd->log->size());
if (cd->log.count(message::Log::defaultLogName())) {
auto log_data = cd->log.at(message::Log::defaultLogName());
this->pack(
webcface::message::LogDefault{cd->member_id, log_data});
logger->trace("send log {} lines", log_data->size());
}
});
break;
}
Expand All @@ -947,7 +1034,9 @@ void MemberData::onRecv(const std::string &message) {
case MessageKind::res + MessageKind::canvas2d:
case MessageKind::entry + MessageKind::image:
case MessageKind::res + MessageKind::image:
case MessageKind::log_entry:
case MessageKind::entry + MessageKind::log:
case MessageKind::res + MessageKind::log:
case MessageKind::log_entry_default:
case MessageKind::sync_init_end:
case MessageKind::ping_status:
if (!message_kind_warned[kind]) {
Expand Down

0 comments on commit 2d1b38a

Please sign in to comment.