Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

未接続時のReq送信を修正 #403

Merged
merged 4 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions client/include/webcface/internal/client_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct ClientData : std::enable_shared_from_this<ClientData> {
* を、
* * これが空ならその時点のsyncDataFirstを、
* * 送信する
* * 送信したら再度これを空にする
* * <del>送信したら</del> 切断時に再度これを空にする
*/
std::optional<SyncDataFirst> sync_first;

Expand All @@ -182,9 +182,8 @@ struct ClientData : std::enable_shared_from_this<ClientData> {
/*!
* 接続中の場合メッセージをキューに入れtrueを返し、
* 接続していない場合なにもせずfalseを返す
*
* 未接続の間送る必要のないデータに使う。
* Req, Callなど
*
* Call, Pingなど
*/
bool messagePushOnline(std::string &&msg) {
std::lock_guard lock(this->ws_m);
Expand All @@ -196,6 +195,22 @@ struct ClientData : std::enable_shared_from_this<ClientData> {
return false;
}
}
/*!
* sync_firstが空でなければメッセージをキューに入れtrueを返し、
* sync_firstが空ならなにもせずfalseを返す
*
* Reqはsync_first時にすべて含まれるので。
*/
bool messagePushReq(std::string &&msg) {
std::lock_guard lock(this->sync_m);
if (this->sync_first) {
this->sync_queue.push(std::move(msg));
this->ws_cond.notify_all();
return true;
} else {
return false;
}
}
/*!
* 接続中かどうかに関係なくメッセージをキューに入れる
*
Expand Down
2 changes: 1 addition & 1 deletion client/src/canvas2d.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const Canvas2D &Canvas2D::request() const {
auto data = dataLock();
auto req = data->canvas2d_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::Canvas2D>{{}, member_, field_, req}));
}
return *this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/canvas3d.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const Canvas3D &Canvas3D::request() const {
auto data = dataLock();
auto req = data->canvas3d_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::Canvas3D>{{}, member_, field_, req}));
}
return *this;
Expand Down
6 changes: 5 additions & 1 deletion client/src/client_threading.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ void internal::wsThreadMain(const std::shared_ptr<ClientData> &data) {
}
internal::WebSocket::send(
data, data->packSyncDataFirst(*data->sync_first));
data->sync_first = std::nullopt;
}
} else {
data->do_ws_init = false;
if (last_recv) {
// syncデータがなければ、100us間隔を空ける
data->recv_ready = true;
Expand Down Expand Up @@ -156,6 +156,10 @@ void internal::wsThreadMain(const std::shared_ptr<ClientData> &data) {
if (!data->connected) {
data->self_member_id = std::nullopt;
data->sync_init_end = false;
{
std::lock_guard lock_s(data->sync_m);
data->sync_first = std::nullopt;
}
}
data->ws_cond.notify_all();
last_recv = std::chrono::steady_clock::now();
Expand Down
4 changes: 2 additions & 2 deletions client/src/image.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Image::Image(const Field &base) : Field(base) {}
const Image &Image::tryRequest() const {
auto req_id = dataLock()->image_store.addReq(member_, field_);
if (req_id) {
dataLock()->messagePushOnline(
dataLock()->messagePushReq(
message::packSingle(message::Req<message::Image>{
member_, field_, req_id, message::ImageReq{}}));
}
Expand All @@ -30,7 +30,7 @@ const Image &Image::request(std::optional<int> rows, std::optional<int> cols,
frame_rate};
auto req_id = dataLock()->image_store.addReq(member_, field_, req);
if (req_id) {
dataLock()->messagePushOnline(message::packSingle(
dataLock()->messagePushReq(message::packSingle(
message::Req<message::Image>{member_, field_, req_id, req}));
this->clear();
}
Expand Down
2 changes: 1 addition & 1 deletion client/src/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const Log &Log::request() const {
auto data = dataLock();
auto req = data->log_store.addReq(member_);
if (req) {
data->messagePushOnline(
data->messagePushReq(
message::packSingle(message::LogReq{{}, member_}));
}
return *this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/robot_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const RobotModel &RobotModel::request() const {
auto data = dataLock();
auto req = data->robot_model_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::RobotModel>{{}, member_, field_, req}));
}
return *this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/text.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const Variant &Variant::request() const {
auto data = dataLock();
auto req = data->text_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::Text>{{}, member_, field_, req}));
}
return *this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/value.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const Value &Value::request() const {
auto data = dataLock();
auto req = data->value_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::Value>{{}, member_, field_, req}));
}
return *this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ const View &View::request() const {
auto data = dataLock();
auto req = data->view_store.addReq(member_, field_);
if (req) {
data->messagePushOnline(message::packSingle(
data->messagePushReq(message::packSingle(
message::Req<message::View>{{}, member_, field_, req}));
}
return *this;
Expand Down
1 change: 1 addition & 0 deletions server-store/include/webcface/server/server_ws.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class AppWrapper {
~AppWrapper() noexcept;
static void send(wsConnPtr conn, const char *msg,
std::size_t size) noexcept;
static void close(wsConnPtr conn) noexcept;
void stop() noexcept;
void run() noexcept;
const char *exception() noexcept;
Expand Down
3 changes: 3 additions & 0 deletions server-store/src/server_ws.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ void AppWrapper::send(wsConnPtr conn, const char *msg,
static_cast<crow::websocket::connection *>(conn)->send_binary(
std::string(msg, size));
}
void AppWrapper::close(wsConnPtr conn) noexcept {
static_cast<crow::websocket::connection *>(conn)->close();
}

AppWrapper::AppWrapper(const LoggerCallback &callback, const char *static_dir_s,
std::uint16_t port, const char *unix_path,
Expand Down
5 changes: 5 additions & 0 deletions server-store/src/websock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ Server::~Server() {
server_stop.store(true);
}
server_ping_wait.notify_one();
store->forEach([](const auto &cd) {
if (cd->connected()) {
AppWrapper::close(cd->con);
}
});
for (auto &app : apps) {
static_cast<AppWrapper *>(app)->stop();
}
Expand Down
63 changes: 63 additions & 0 deletions tests/client_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,69 @@ TEST_F(ClientTest, valueReq) {
.size(),
3u);
}
TEST_F(ClientTest, valueReqTiming) {
wcli_->autoReconnect(false);
// 1. 接続前、sync前
wcli_->member("a").value("1").request();
wcli_->sync();
// 2. 接続前、sync後
wcli_->member("a").value("2").request();

wait();
dummy_s = std::make_shared<DummyServer>(false);
wait();
wcli_->start();
while (!dummy_s->connected() || !wcli_->connected()) {
wait();
}

// 3. 接続後
wcli_->member("a").value("3").request();

wait();

dummy_s.reset();
while (wcli_->connected()) {
wait();
}
wait();
// 4. 切断後、sync前
wcli_->member("a").value("4").request();
wcli_->sync();
// 5. 切断後、sync後
wcli_->member("a").value("5").request();

wait();
dummy_s = std::make_shared<DummyServer>(false);
wait();
wcli_->start();
while (!dummy_s->connected() || !wcli_->connected()) {
wait();
}

// 6. 再接続後
wcli_->member("a").value("6").request();
wait();

std::array<bool, 6> req_found = {};
{
std::lock_guard lock(dummy_s->server_m);
for (const auto &m : dummy_s->recv_data) {
if (m.first ==
message::MessageKind::req + message::MessageKind::value) {
auto &obj = *static_cast<message::Req<message::Value> *>(
m.second.get());
EXPECT_EQ(obj.member.u8String(), "a");
ASSERT_GE(obj.req_id, 1u);
ASSERT_LE(obj.req_id, 6u);
EXPECT_EQ(obj.field.u8String(), std::to_string(obj.req_id));
EXPECT_FALSE(req_found.at(obj.req_id - 1));
req_found.at(obj.req_id - 1) = true;
}
}
}
EXPECT_EQ(req_found, (std::array<bool, 6>{1, 1, 1, 1, 1, 1}));
}
TEST_F(ClientTest, textSend) {
dummy_s = std::make_shared<DummyServer>(false);
wcli_->start();
Expand Down
11 changes: 11 additions & 0 deletions tests/dummy_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
#include <sys/stat.h>
#endif

static void wait() {
std::this_thread::sleep_for(
std::chrono::milliseconds(WEBCFACE_TEST_TIMEOUT));
}

using namespace webcface;

// 同じ実装がserver-internalにあるがimportもincludeもできないのでコピペしている
Expand Down Expand Up @@ -44,6 +49,12 @@ class CustomLogger : public crow::ILogHandler {

DummyServer::~DummyServer() {
try {
if (connPtr) {
reinterpret_cast<crow::websocket::connection *>(connPtr)->send_binary("");
reinterpret_cast<crow::websocket::connection *>(connPtr)->close();
wait();
connPtr = nullptr;
}
static_cast<crow::SimpleApp *>(server_)->stop();
t.join();
} catch (...) {
Expand Down
Loading