Skip to content

Commit

Permalink
Merge pull request #129 from qicosmos/remove_server_mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jan 29, 2024
2 parents 439925f + 4fd3a63 commit 07d5735
Showing 1 changed file with 58 additions and 49 deletions.
107 changes: 58 additions & 49 deletions include/rest_rpc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ class connection : public std::enable_shared_from_this<connection>,

void response(uint64_t req_id, std::string data,
request_type req_type = request_type::req_res) {
auto len = data.size();
assert(len < MAX_BUF_LEN);

std::unique_lock<std::mutex> lock(write_mtx_);
write_queue_.emplace_back(message_type{
req_id, req_type, std::make_shared<std::string>(std::move(data))});
if (write_queue_.size() > 1) {
return;
}

write();
assert(data.size() < MAX_BUF_LEN);
auto sp_data = std::make_shared<std::string>(std::move(data));
std::weak_ptr<connection> weak = shared_from_this();
asio::post([this, weak, sp_data, req_id, req_type] {
auto conn = weak.lock();
if (conn) {
response_interal(req_id, std::move(sp_data), req_type);
}
});
}

template <typename T> void pack_and_response(uint64_t req_id, T data) {
Expand Down Expand Up @@ -196,48 +194,61 @@ class connection : public std::enable_shared_from_this<connection>,

void read_body(uint32_t func_id, std::size_t size) {
auto self(this->shared_from_this());
async_read(
size, [this, func_id, self](asio::error_code ec, std::size_t length) {
cancel_timer();
async_read(size, [this, func_id, self](asio::error_code ec,
std::size_t length) {
cancel_timer();

if (!socket_.is_open()) {
if (on_net_err_) {
(*on_net_err_)(self, "socket already closed");
}
return;
}
if (!socket_.is_open()) {
if (on_net_err_) {
(*on_net_err_)(self, "socket already closed");
}
return;
}

if (!ec) {
read_head();
if (req_type_ == request_type::req_res) {
route_result_t ret = router_.route<connection>(
func_id, nonstd::string_view{body_.data(), length},
this->shared_from_this());
if (delay_) {
delay_ = false;
} else {
response(req_id_, std::move(ret.result));
}
} else if (req_type_ == request_type::sub_pub) {
try {
msgpack_codec codec;
auto p = codec.unpack<std::tuple<std::string, std::string>>(
body_.data(), length);
callback_(std::move(std::get<0>(p)), std::move(std::get<1>(p)),
this->shared_from_this());
} catch (const std::exception &ex) {
print(ex);
if (on_net_err_) {
(*on_net_err_)(self, ex.what());
}
}
}
if (!ec) {
read_head();
if (req_type_ == request_type::req_res) {
route_result_t ret = router_.route<connection>(
func_id, nonstd::string_view{body_.data(), length},
this->shared_from_this());
if (delay_) {
delay_ = false;
} else {
response_interal(
req_id_, std::make_shared<std::string>(std::move(ret.result)));
}
} else if (req_type_ == request_type::sub_pub) {
try {
msgpack_codec codec;
auto p = codec.unpack<std::tuple<std::string, std::string>>(
body_.data(), length);
callback_(std::move(std::get<0>(p)), std::move(std::get<1>(p)),
this->shared_from_this());
} catch (const std::exception &ex) {
print(ex);
if (on_net_err_) {
(*on_net_err_)(self, ec.message());
(*on_net_err_)(self, ex.what());
}
}
});
}
} else {
if (on_net_err_) {
(*on_net_err_)(self, ec.message());
}
}
});
}

void response_interal(uint64_t req_id, std::shared_ptr<std::string> data,
request_type req_type = request_type::req_res) {
assert(data->size() < MAX_BUF_LEN);

write_queue_.emplace_back(message_type{req_id, req_type, std::move(data)});
if (write_queue_.size() > 1) {
return;
}

write();
}

void write() {
Expand Down Expand Up @@ -269,7 +280,6 @@ class connection : public std::enable_shared_from_this<connection>,
return;
}

std::unique_lock<std::mutex> lock(write_mtx_);
write_queue_.pop_front();

if (!write_queue_.empty()) {
Expand Down Expand Up @@ -414,7 +424,6 @@ class connection : public std::enable_shared_from_this<connection>,
rpc_header header_;

uint32_t write_size_ = 0;
std::mutex write_mtx_;

asio::steady_timer timer_;
std::size_t timeout_seconds_;
Expand Down

0 comments on commit 07d5735

Please sign in to comment.