Skip to content

Commit

Permalink
add async workflow profile
Browse files Browse the repository at this point in the history
  • Loading branch information
zizdlp committed Oct 8, 2023
1 parent c653edb commit 7854a82
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 10 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
LOOP ?= 1
LOOP ?= 100
LENGTH ?= 25000

build:
bazel build //profile/workflow:sync_server
bazel build //profile/workflow:sync_client
bazel build //profile/workflow:async_server
bazel build //profile/workflow:async_client
benchmark:
python benchmark.py --loop=$(LOOP) --length=$(LENGTH)
.PHONY: workflow_sync_server workflow_sync_client benchmark
18 changes: 13 additions & 5 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def benchmark(server, client, length=None, loop=None,port=None):
server_process = subprocess.Popen(server_args)

# 等待一段时间确保 sync_server 完全启动
time.sleep(5)
time.sleep(3)

# 启动 sync_client 进程
# 准备参数列表
Expand All @@ -35,7 +35,15 @@ def benchmark(server, client, length=None, loop=None,port=None):
parser.add_argument("--port", type=int, help="port parameter")

args = parser.parse_args()

server = "./bazel-bin/profile/workflow/sync_server"
client = "./bazel-bin/profile/workflow/sync_client"
benchmark(server, client, args.length, args.loop,args.port)
server_lists=[
"./bazel-bin/profile/workflow/sync_server",
"./bazel-bin/profile/workflow/async_server"
]
client_lists=[
"./bazel-bin/profile/workflow/sync_client",
"./bazel-bin/profile/workflow/async_client"
]
for i in range(len(server_lists)):
server = server_lists[i]
client = client_lists[i]
benchmark(server, client, args.length, args.loop,args.port)
2 changes: 1 addition & 1 deletion examples/profile_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void process(WFHttpTask *server_task)
{
call_count+=1;
if(call_count%100==0){
std::cout<<"call_count is:"<<call_count<<std::endl;
// std::cout<<"call_count is:"<<call_count<<std::endl;
}
protocol::HttpRequest *req = server_task->get_req();
protocol::HttpResponse *resp = server_task->get_resp();
Expand Down
19 changes: 19 additions & 0 deletions profile/workflow/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,23 @@ cc_binary(
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/strings:str_format",
],
)

cc_binary(
name = 'async_server',
srcs = ['async_server.cc'],
deps = ["@workflow//:http",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/strings:str_format",
],
)
cc_binary(
name = 'async_client',
srcs = ['async_client.cc'],
deps = ["@workflow//:http",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/strings:str_format",
],
)
81 changes: 81 additions & 0 deletions profile/workflow/async_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

#include <signal.h>
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"

ABSL_FLAG(std::string, target, "http://localhost", "Server address");
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
ABSL_FLAG(uint16_t, loop, 1, "client call loop times");
ABSL_FLAG(uint32_t, length, 25000, "send data bytes each time");

static int counter = 0;
static WFFacilities::WaitGroup* wait_group;
void sig_handler(int signo){
wait_group->done();
}
void callback(WFHttpTask *httpTask){
int state = httpTask->get_state();
int error = httpTask->get_error();
switch (state){
case WFT_STATE_SYS_ERROR:
fprintf(stderr, "system error: %s\n", strerror(error));
break;
case WFT_STATE_DNS_ERROR:
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
break;
case WFT_STATE_SUCCESS:
break;
}
if (state != WFT_STATE_SUCCESS){
fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
return;
}
counter+=1;
wait_group->done();
}
int main(int argc, char *argv[]){
absl::ParseCommandLine(argc, argv);

std::string target = absl::GetFlag(FLAGS_target);
uint16_t port = absl::GetFlag(FLAGS_port);
int loop = absl::GetFlag(FLAGS_loop);
int length = absl::GetFlag(FLAGS_length);

std::string server_address = absl::StrFormat("%s:%d",target, port);
std::string send_data(length, 'a');

WFFacilities::WaitGroup wait_group_instance(loop);
wait_group = &wait_group_instance;



// std::cout<<"=== workflow sync server is:"<<server_address<<" ==="<<std::endl;
auto s = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
for(int i =0;i<loop;++i){
auto httpTask = WFTaskFactory::create_http_task(server_address, 0, 0,callback);
protocol::HttpRequest *req = httpTask->get_req();

req->set_method("POST");
req->append_output_body_nocopy(send_data.c_str(), length); /* nocopy */
req->add_header_pair("Accept", "*/*");
req->add_header_pair("User-Agent", "TestAgent");
req->add_header_pair("Connection", "close");
httpTask->start();
}
wait_group->wait();
auto e = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
std::cout << "=== workflow async;"<<"port:"<<port<<";loop:"<<loop<<";length:"<<length<<";time:" << e - s << "us ===" << std::endl;
return 0;
}
77 changes: 77 additions & 0 deletions profile/workflow/async_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFServer.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFFacilities.h"

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"

#include <iostream>

ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");

int call_count = 0;


void process(WFHttpTask *server_task)
{
call_count+=1;
if(call_count%100==0){
// std::cout<<"call_count is:"<<call_count<<std::endl;
}
protocol::HttpRequest *req = server_task->get_req();
protocol::HttpResponse *resp = server_task->get_resp();
long long seq = server_task->get_task_seq();
protocol::HttpHeaderCursor cursor(req);

/* Set status line if you like. */
resp->set_http_version("HTTP/1.1");
resp->set_status_code("200");
resp->set_reason_phrase("OK");
if (seq == 9) /* no more than 10 requests on the same connection. */
resp->add_header_pair("Connection", "close");

}

static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
absl::ParseCommandLine(argc, argv);
uint16_t port = absl::GetFlag(FLAGS_port);


signal(SIGINT, sig_handler);

WFHttpServer server(process);
if (server.start(port) == 0)
{
wait_group.wait();
server.stop();
}
else
{
perror("Cannot start server");
exit(1);
}

return 0;
}

4 changes: 2 additions & 2 deletions profile/workflow/sync_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ int main(int argc, char *argv[]){

std::string server_address = absl::StrFormat("%s:%d",target, port);
std::string send_data(length, 'a');
std::cout<<"=== workflow sync server is:"<<server_address<<" ==="<<std::endl;
// std::cout<<"=== workflow sync server is:"<<server_address<<" ==="<<std::endl;
auto s = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
for(int i =0;i<loop;++i){
Expand All @@ -68,6 +68,6 @@ int main(int argc, char *argv[]){
}
auto e = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
std::cout << "=== workflow sync;loop:"<<loop<<";length:"<<length<<";time:" << e - s << "us ===" << std::endl;
std::cout << "=== workflow sync;"<<"port:"<<port<<";loop:"<<loop<<";length:"<<length<<";time:" << e - s << "us ===" << std::endl;
return 0;
}
2 changes: 1 addition & 1 deletion profile/workflow/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void process(WFHttpTask *server_task)
{
call_count+=1;
if(call_count%100==0){
std::cout<<"call_count is:"<<call_count<<std::endl;
// std::cout<<"call_count is:"<<call_count<<std::endl;
}
protocol::HttpRequest *req = server_task->get_req();
protocol::HttpResponse *resp = server_task->get_resp();
Expand Down

0 comments on commit 7854a82

Please sign in to comment.