Skip to content

Commit

Permalink
sync code
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Aug 21, 2024
1 parent e6eebd4 commit 9d0812c
Show file tree
Hide file tree
Showing 19 changed files with 2,968 additions and 1,477 deletions.
42 changes: 19 additions & 23 deletions example/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,9 @@ async_simple::coro::Lazy<void> basic_usage() {

void use_metric() {
using namespace ylt::metric;
auto c =
std::make_shared<counter_t>("request_count", "request count",
std::vector<std::string>{"method", "url"});
auto failed = std::make_shared<gauge_t>(
"not_found_request_count", "not found request count",
std::vector<std::string>{"method", "code", "url"});
auto c = std::make_shared<counter_t>("request_count", "request count");
auto failed = std::make_shared<gauge_t>("not_found_request_count",
"not found request count");
auto total =
std::make_shared<counter_t>("total_request_count", "total request count");

Expand All @@ -448,19 +445,19 @@ void use_metric() {
summary_t::Quantiles{
{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}});

default_metric_manager::register_metric_dynamic(c);
default_metric_manager::register_metric_dynamic(total);
default_metric_manager::register_metric_dynamic(failed);
default_metric_manager::register_metric_dynamic(h);
default_metric_manager::register_metric_dynamic(summary);
default_static_metric_manager::instance().register_metric(c);
default_static_metric_manager::instance().register_metric(total);
default_static_metric_manager::instance().register_metric(failed);
default_static_metric_manager::instance().register_metric(h);
default_static_metric_manager::instance().register_metric(summary);

std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distr(1, 100);

std::thread thd([&] {
while (true) {
c->inc({"GET", "/test"});
c->inc();
total->inc();
h->observe(distr(gen));
summary->observe(distr(gen));
Expand All @@ -473,9 +470,7 @@ void use_metric() {
server.set_default_handler(
[&](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
failed->inc({std::string(req.get_method()),
std::to_string((int)status_type::not_found),
std::string(req.get_url())});
failed->inc();
total->inc();
resp.set_status_and_content(status_type::not_found, "not found");
co_return;
Expand All @@ -484,14 +479,14 @@ void use_metric() {
server.set_http_handler<GET>(
"/get", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "ok");
c->inc({std::string(req.get_method()), std::string(req.get_url())});
c->inc();
total->inc();
});

server.set_http_handler<GET>(
"/test", [&](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "ok");
c->inc({std::string(req.get_method()), std::string(req.get_url())});
c->inc();
total->inc();
});

Expand All @@ -514,13 +509,13 @@ void metrics_example() {
"get_req_count", "get req count",
std::map<std::string, std::string>{{"url", "/get"}});
auto get_req_qps = std::make_shared<gauge_t>("get_req_qps", "get req qps");
// default_metric_manager::register_metric_static(get_req_counter,
// default_static_metric_manager::register_metric_static(get_req_counter,
// get_req_qps);
int64_t last = 0;
std::thread thd([&] {
while (true) {
std::this_thread::sleep_for(1s);
auto value = get_req_counter->value({"/get"});
auto value = get_req_counter->value();
get_req_qps->update(value - last);
last = value;
}
Expand Down Expand Up @@ -551,11 +546,12 @@ async_simple::coro::Lazy<void> use_channel() {
server.async_start();
std::this_thread::sleep_for(100ms);

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(
auto load_blancer = std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
{"127.0.0.1:9001"}, {.lba = coro_io::load_blance_algorithm::random}));

std::string url = "http://127.0.0.1:9001/";
co_await channel->send_request(
co_await load_blancer->send_request(
[&url](coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
auto data = co_await client.async_get(url);
Expand All @@ -573,7 +569,7 @@ async_simple::coro::Lazy<void> use_pool() {
server.use_metrics();
server.async_start();

auto map = default_metric_manager::metric_map_static();
auto map = default_static_metric_manager::instance().metric_map();
for (auto &[k, m] : map) {
std::cout << k << ", ";
std::cout << m->help() << "\n";
Expand Down
44 changes: 32 additions & 12 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: coro_http_client(executor->get_asio_executor()) {}

bool init_config(const config &conf) {
config_ = conf;
if (conf.conn_timeout_duration.has_value()) {
set_conn_timeout(*conf.conn_timeout_duration);
}
Expand Down Expand Up @@ -207,6 +208,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }

const config &get_config() { return config_; }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) {
Expand Down Expand Up @@ -875,22 +878,24 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

async_simple::coro::Lazy<void> send_file_no_chunked_with_copy(
std::string_view source, std::error_code &ec, std::size_t length) {
std::string_view source, std::error_code &ec, std::size_t length,
std::size_t offset) {
if (length <= 0) {
co_return;
}
std::string file_data;
detail::resize(file_data, std::min(max_single_part_size_, length));
detail::resize(file_data, (std::min)(max_single_part_size_, length));
coro_io::coro_file file{};
file.open(source, std::ios::in);
file.seek(offset, std::ios::cur);
if (!file.is_open()) {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
std::size_t size;
while (length > 0) {
if (std::tie(ec, size) = co_await file.async_read(
file_data.data(), std::min(file_data.size(), length));
file_data.data(), (std::min)(file_data.size(), length));
ec) {
// bad request, file may smaller than content-length
break;
Expand Down Expand Up @@ -920,15 +925,15 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
};
async_simple::coro::Lazy<void> send_file_without_copy(
const std::filesystem::path &source, std::error_code &ec,
std::size_t length) {
std::size_t length, std::size_t offset) {
fd_guard guard(source.c_str());
if (guard.fd < 0) [[unlikely]] {
ec = std::make_error_code(std::errc::bad_file_descriptor);
co_return;
}
std::size_t actual_len = 0;
std::tie(ec, actual_len) =
co_await coro_io::async_sendfile(socket_->impl_, guard.fd, 0, length);
std::tie(ec, actual_len) = co_await coro_io::async_sendfile(
socket_->impl_, guard.fd, offset, length);
if (ec) [[unlikely]] {
co_return;
}
Expand Down Expand Up @@ -1006,7 +1011,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
template <typename S, typename Source>
async_simple::coro::Lazy<resp_data> async_upload(
S uri, http_method method, Source source /* file */,
int64_t content_length = -1,
uint64_t offset = 0 /*file offset*/,
int64_t content_length = -1 /*upload size*/,
req_content_type content_type = req_content_type::text,
std::unordered_map<std::string, std::string> headers = {}) {
std::error_code ec{};
Expand Down Expand Up @@ -1050,6 +1056,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}
content_length -= offset;
if (content_length < 0) {
CINATRA_LOG_ERROR << "the offset is larger than the end of file";
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}
}

assert(content_length >= 0);
Expand Down Expand Up @@ -1088,6 +1100,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

if constexpr (is_stream_file) {
source->seekg(offset, std::ios::cur);
std::string file_data;
detail::resize(file_data, std::min<std::size_t>(max_single_part_size_,
content_length));
Expand Down Expand Up @@ -1116,15 +1129,17 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (!has_init_ssl_) {
#endif
co_await send_file_without_copy(std::filesystem::path{source}, ec,
content_length);
content_length, offset);
#ifdef CINATRA_ENABLE_SSL
}
else {
co_await send_file_no_chunked_with_copy(source, ec, content_length);
co_await send_file_no_chunked_with_copy(source, ec, content_length,
offset);
}
#endif
#else
co_await send_file_no_chunked_with_copy(source, ec, content_length);
co_await send_file_no_chunked_with_copy(source, ec, content_length,
offset);
#endif
}
else {
Expand Down Expand Up @@ -1718,11 +1733,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

bool is_out_buf = false;

bool is_ranges = parser_.is_resp_ranges();
if (is_ranges) {
is_keep_alive = true;
}
if (parser_.is_chunked()) {
out_buf_ = {};
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
Expand All @@ -1735,6 +1753,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

if (parser_.is_multipart()) {
out_buf_ = {};
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
Expand Down Expand Up @@ -1771,7 +1790,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
total_len_ = parser_.total_len();
#endif

bool is_out_buf = !out_buf_.empty();
is_out_buf = !out_buf_.empty();
if (is_out_buf) {
if (content_len > 0 && out_buf_.size() < content_len) {
out_buf_ = {};
Expand Down Expand Up @@ -2425,13 +2444,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool enable_follow_redirect_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
std::chrono::seconds(30);
std::chrono::steady_clock::duration req_timeout_duration_ =
std::chrono::seconds(60);
bool enable_tcp_no_delay_ = true;
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;
config config_;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
Expand Down
46 changes: 24 additions & 22 deletions include/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include "cinatra/mime_types.hpp"
#include "cinatra_log_wrapper.hpp"
#include "coro_http_connection.hpp"
#include "ylt/coro_io/channel.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_io/load_blancer.hpp"
#include "ylt/metric/system_metric.hpp"

namespace cinatra {
Expand Down Expand Up @@ -185,9 +185,9 @@ class coro_http_server {
void use_metrics(bool enable_json = false,
std::string url_path = "/metrics") {
init_metrics();
using root =
ylt::metric::metric_collector_t<ylt::metric::default_metric_manager,
ylt::metric::system_metric_manager>;
using root = ylt::metric::metric_collector_t<
ylt::metric::default_static_metric_manager,
ylt::metric::system_metric_manager>;
set_http_handler<http_method::GET>(
url_path,
[enable_json](coro_http_request &req, coro_http_response &res) {
Expand Down Expand Up @@ -216,14 +216,15 @@ class coro_http_server {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));
auto handler =
[this, channel, type](
[this, load_blancer, type](
coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await channel->send_request(
co_await load_blancer->send_request(
[this, &req, &response](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand Down Expand Up @@ -255,14 +256,15 @@ class coro_http_server {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));

set_http_handler<cinatra::GET>(
url_path,
[channel](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
[load_blancer](coro_http_request &req, coro_http_response &resp)
-> async_simple::coro::Lazy<void> {
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
Expand All @@ -275,7 +277,7 @@ class coro_http_server {
break;
}

co_await channel->send_request(
co_await load_blancer->send_request(
[&req, result](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand Down Expand Up @@ -928,20 +930,20 @@ class coro_http_server {
using namespace ylt::metric;

cinatra_metric_conf::enable_metric = true;
default_metric_manager::create_metric_static<counter_t>(
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_req, "");
default_metric_manager::create_metric_static<counter_t>(
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_failed_req, "");
default_metric_manager::create_metric_static<counter_t>(
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_recv_bytes, "");
default_metric_manager::create_metric_static<counter_t>(
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_send_bytes, "");
default_metric_manager::create_metric_static<gauge_t>(
default_static_metric_manager::instance().create_metric_static<gauge_t>(
cinatra_metric_conf::server_total_fd, "");
default_metric_manager::create_metric_static<histogram_t>(
default_static_metric_manager::instance().create_metric_static<histogram_t>(
cinatra_metric_conf::server_req_latency, "",
std::vector<double>{30, 40, 50, 60, 70, 80, 90, 100, 150});
default_metric_manager::create_metric_static<histogram_t>(
default_static_metric_manager::instance().create_metric_static<histogram_t>(
cinatra_metric_conf::server_read_latency, "",
std::vector<double>{3, 5, 7, 9, 13, 18, 23, 35, 50});
#if defined(__GNUC__)
Expand Down
Loading

0 comments on commit 9d0812c

Please sign in to comment.