Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于协程调度的问题 #111

Open
coolhuhu opened this issue Apr 23, 2024 · 0 comments
Open

关于协程调度的问题 #111

coolhuhu opened this issue Apr 23, 2024 · 0 comments

Comments

@coolhuhu
Copy link

coolhuhu commented Apr 23, 2024

问题背景

我正在学习协程的内容,我参照 example 中的例子尝试用 co_context 库写了一个 echo_client 和 echo_server,其中 echo_client 借鉴了 muduo 中的例子,并想基于此和 muduo 进行性能对比,但程序运行结果和预想的不一致。

问题描述

我的运行环境为:阿里云 ECS 云服务器,2核(vCPU) 2GiB,Ubuntu 22.04,g++ 11.4.0

muduo 的实现代码链接为 https://github.com/chenshuo/muduo/tree/master/examples/pingpong

使用 muduo 中提供的 脚本 直接运行,测试结果为:

pingpong_server: no process found
Bufsize: 16384 Threads: 1 Sessions: 1
20240423 08:05:48.429949Z 630711 INFO pid = 630711, tid = 630711 - server.cc:38
20240423 08:05:49.430760Z 630713 INFO pid = 630713, tid = 630713 - client.cc:197
20240423 08:05:49.431014Z 630713 WARN all connected - client.cc:122
20240423 08:05:59.430831Z 630713 WARN stop - client.cc:158
20240423 08:05:59.431107Z 630713 WARN all disconnected - client.cc:130
20240423 08:05:59.431125Z 630713 WARN 4261986304 total bytes read - client.cc:139
20240423 08:05:59.431132Z 630713 WARN 260131 total messages read - client.cc:140
20240423 08:05:59.431138Z 630713 WARN 16384 average message size - client.cc:141
20240423 08:05:59.431149Z 630713 WARN 406.4546875 MiB/s throughput - client.cc:143
Bufsize: 16384 Threads: 1 Sessions: 10
20240423 08:06:04.435839Z 630754 INFO pid = 630754, tid = 630754 - server.cc:38
20240423 08:06:05.437393Z 630757 INFO pid = 630757, tid = 630757 - client.cc:197
20240423 08:06:05.437965Z 630757 WARN all connected - client.cc:122
20240423 08:06:15.437484Z 630757 WARN stop - client.cc:158
20240423 08:06:15.437957Z 630757 WARN all disconnected - client.cc:130
20240423 08:06:15.437972Z 630757 WARN 17275011072 total bytes read - client.cc:139
20240423 08:06:15.437974Z 630757 WARN 1054383 total messages read - client.cc:140
20240423 08:06:15.438011Z 630757 WARN 16384 average message size - client.cc:141
20240423 08:06:15.438021Z 630757 WARN 1647.4734375 MiB/s throughput - client.cc:143
Bufsize: 16384 Threads: 1 Sessions: 100
20240423 08:06:20.442012Z 630820 INFO pid = 630820, tid = 630820 - server.cc:38
20240423 08:06:21.442723Z 630823 INFO pid = 630823, tid = 630823 - client.cc:197
20240423 08:06:21.447357Z 630823 WARN all connected - client.cc:122
20240423 08:06:31.443814Z 630823 WARN stop - client.cc:158
20240423 08:06:31.445412Z 630823 WARN all disconnected - client.cc:130
20240423 08:06:31.445427Z 630823 WARN 19154436096 total bytes read - client.cc:139
20240423 08:06:31.445430Z 630823 WARN 1169094 total messages read - client.cc:140
20240423 08:06:31.445431Z 630823 WARN 16384 average message size - client.cc:141
20240423 08:06:31.445446Z 630823 WARN 1826.709375 MiB/s throughput - client.cc:143
Bufsize: 16384 Threads: 1 Sessions: 1000
20240423 08:06:36.449339Z 630884 INFO pid = 630884, tid = 630884 - server.cc:38
20240423 08:06:37.450658Z 630889 INFO pid = 630889, tid = 630889 - client.cc:197
20240423 08:06:37.495573Z 630889 WARN all connected - client.cc:122
20240423 08:06:47.451004Z 630889 WARN stop - client.cc:158
20240423 08:06:47.484683Z 630889 WARN all disconnected - client.cc:130
20240423 08:06:47.484710Z 630889 WARN 12080676864 total bytes read - client.cc:139
20240423 08:06:47.484859Z 630889 WARN 737346 total messages read - client.cc:140
20240423 08:06:47.484863Z 630889 WARN 16384 average message size - client.cc:141
20240423 08:06:47.484912Z 630889 WARN 1152.103125 MiB/s throughput - client.cc:143
Bufsize: 16384 Threads: 1 Sessions: 10000
20240423 08:06:52.490363Z 630949 INFO pid = 630949, tid = 630949 - server.cc:38
20240423 08:06:53.490886Z 630963 INFO pid = 630963, tid = 630963 - client.cc:197
20240423 08:06:53.909448Z 630963 WARN all connected - client.cc:122
20240423 08:07:03.537228Z 630963 WARN stop - client.cc:158
20240423 08:07:03.933908Z 630963 WARN all disconnected - client.cc:130
20240423 08:07:03.934199Z 630963 WARN 10123739136 total bytes read - client.cc:139
20240423 08:07:03.934213Z 630963 WARN 617904 total messages read - client.cc:140
20240423 08:07:03.934215Z 630963 WARN 16384 average message size - client.cc:141
20240423 08:07:03.934220Z 630963 WARN 965.475 MiB/s throughput - client.cc:143

我写的 echo_client 和 echo_server 程序贴在末尾。
运行结果为:
echo_server: no process found
Bufsize: 16384 Threads: 1 Sessions: 1
Tue Apr 23 16:20:17 2024
client start...
all connected
stop.
all disconnected
4071440384 total bytes read
248501 total messages read
16384 bytes per message
388.283 MiB/s throughout
Tue Apr 23 16:20:27 2024
client end...

Bufsize: 16384 Threads: 1 Sessions: 10
Tue Apr 23 16:20:33 2024
client start...
all connected
stop.
all disconnected
15644426240 total bytes read
954860 total messages read
16384 bytes per message
1491.97 MiB/s throughout
Tue Apr 23 16:20:43 2024
client end...

Bufsize: 16384 Threads: 1 Sessions: 100
Tue Apr 23 16:20:49 2024
client start...
all connected
stop.
all disconnected
17007493120 total bytes read
1038055 total messages read
16384 bytes per message
1621.96 MiB/s throughout
Tue Apr 23 16:20:59 2024
client end...

Bufsize: 16384 Threads: 1 Sessions: 1000
Tue Apr 23 16:21:05 2024
client start...
all connected
stop.
all disconnected
8327217152 total bytes read
508253 total messages read
16384 bytes per message
794.145 MiB/s throughout
Tue Apr 23 16:21:15 2024
client end...

Bufsize: 16384 Threads: 1 Sessions: 10000
Tue Apr 23 16:21:21 2024
client start...
stop.
all disconnected
259922837504 total bytes read
15864431 total messages read
16384 bytes per message
24788.2 MiB/s throughout
Tue Apr 23 16:26:12 2024
client end...

测试并发量的时长都是 10s

由上面贴出的muduo的运行结果和我写的测试程序的运行结果可知,当并发量为1000时,使用 co_context 写的测试程序在性能上有下降。当并发量为10000时,Client::handleTimeout函数执行时间太长(上面结果的标粗部分)。

我猜测是 Session::stop() 中 使用 mutex 保护了 co_await,但不确定,希望能够解答。若是 mutex 的原因,我该如何修改程序呢?

另一方面,对于初学协程的我,对协程调度的有疑惑。在 co_context 中:

  1. 我 co_spawn 了一个新的协程,这个 co_spawn 什么时候被调度呢?是不是在底层,有类似队列的东西?按先进先出的顺序调度。
  2. 当我 co_await 了一个操作,会暂停当前协程,让出控制权。那当这个操作完成后,会及时的恢复这个协程吗?

我的实现

测试脚本为:

#!/bin/sh

killall echo_server
timeout=${timeout:-10}
bufsize=${bufsize:-16384}
nothreads=1

for nosessions in 1 10 100 1000 10000; do
  sleep 5
  echo "Bufsize: $bufsize Threads: $nothreads Sessions: $nosessions"
  taskset -c 0 ./echo_server 33333 & srvpid=$!
  sleep 1
  taskset -c 1 ./echo_client 127.0.0.1 33333 $nothreads $bufsize $nosessions $timeout
  kill -9 $srvpid
done
// echo_server.cpp

#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
using namespace co_context;

task<> session(int sockfd)
{
    co_context::socket sock{sockfd};
    constexpr int len = 16384;
    char buf[len];

    while (true)
    {
        auto nr = co_await lazy::recv(sockfd, buf);
        if (nr <= 0) {
            co_await lazy::close(sockfd);
            co_return;
        }

        co_await lazy::send(sockfd, {buf, static_cast<size_t>(nr)});
    }
}

task<> server(const uint16_t port)
{
    /*
        使用 co_contenxt 提供的 helper
        为简化工作,不考虑一些错误处理,可以直接使用 co_context 提供的 acceptor 类
    */ 
    inet_address listen_addr{port};
    const int listen_sockfd = ::socket(listen_addr.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
    if (listen_sockfd < 0) {
        std::cerr << "socket error: " << strerror(errno) << std::endl;
        co_return;
    }
    
    int optval = 1;
    if (::setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, 
        &optval, static_cast<socklen_t>(sizeof(optval))) < 0) {
        std::cerr << "setsockopt SO_REUSEADDR error: " << strerror(errno) << std::endl;
        co_return;
    }

    optval = 1;
    if (::setsockopt(
            listen_sockfd, IPPROTO_TCP, TCP_NODELAY, &optval,
            static_cast<socklen_t>(sizeof optval)
        ) < 0) {
        std::cout << "setsockopt TCP_NODELAY error: " << strerror(errno) << std::endl;
        co_return;
    }

    if (::bind(listen_sockfd, listen_addr.get_sockaddr(), listen_addr.length()) != 0) {
        std::cerr << "bind error: " << strerror(errno) << std::endl;
        co_return;
    }

    if (::listen(listen_sockfd, SOMAXCONN) != 0) {
        std::cerr << "listen error: " << strerror(errno) << std::endl;
        co_return;
    }

    for (; ;) {
        int sockfd = co_await lazy::accept(listen_sockfd, nullptr, nullptr, 0);
        if (sockfd < 0) {
            std::cerr << "accept error: " << strerror(errno) << std::endl;
            co_return;
        }
        co_spawn(session(sockfd));
    }
}


/*
    ./echo_server 33333
*/
int main(int argc, char *argv[])
{
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <port>\n";
        return 1;
    }

    uint16_t port = static_cast<uint16_t>(std::atoi(argv[1]));
    io_context ctx;
    ctx.co_spawn(server(port));
    ctx.start();
    ctx.join();
    return 0;
}
// echo_client.cpp

#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <string>
#include <atomic>
#include <vector>
#include <chrono>
#include <mutex>

using Second = int;

class Client;

class Session
{
public:
    Session(co_context::io_context *io_context,
            Client *client,
            const co_context::inet_address &addr) : io_context_(io_context),
                                                    client_(client),
                                                    serverAddr_(addr),
                                                    socket_(::socket(serverAddr_.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP)),
                                                    bytesRead_(0),
                                                    byteWritten_(0),
                                                    messagesRead_(0)
    {
    }

    co_context::task<> start();

    co_context::task<> onMessage();

    co_context::task<> stop();

    int64_t bytesRead() const { return bytesRead_; }

    int64_t messagesRead() const { return messagesRead_; }

private:
    co_context::io_context *io_context_;
    Client *client_;
    co_context::inet_address serverAddr_;
    co_context::socket socket_;
    std::vector<char> buffers_;
    int64_t bytesRead_;
    int64_t byteWritten_;
    int64_t messagesRead_;
    std::mutex mutex_;
};

class Client
{
public:
    Client(co_context::io_context *io_context,
           const co_context::inet_address &serverAddr,
           int blockSize,
           int sessionCount,
           Second timeout, int threadCount) : io_context_(io_context),
                                             addr_(serverAddr),
                                             blockSize_(blockSize),
                                             sessionCount_(sessionCount),
                                             timeout_(timeout),
                                             numConntected_(0)
    {
        if (threadCount > 1)
        {
            // TODO: 多线程
        }

        /* 生成数据包 */
        for (int i = 0; i < blockSize; ++i)
        {
            message_.push_back(static_cast<char>(i % 128));
        }
    }

    co_context::task<> start()
    {

        for (int i = 0; i < sessionCount_; ++i)
        {
            Session *session = new Session(io_context_, this, addr_);
            io_context_->co_spawn(session->start());
            // co_await session->start();
            sessions_.emplace_back(session);
        }  

        co_await handleTimeout();
    }

    const std::string &message() const { return message_; }

    int blockSize() const { return blockSize_; }

    void onConnect()
    {
        // std::cout << numConntected_ << " connected" << std::endl;
        if (++numConntected_ == sessionCount_)
        {
            std::cout << "all connected" << std::endl;
        }
    }

    void onDisconnect()
    {
        // std::cout << numConntected_ << " disconnected" << std::endl;
        if (--numConntected_ == 0)
        {
            std::cout << "all disconnected" << std::endl;

            int64_t totalBytesRead = 0;
            int64_t totalMessagesRead = 0;

            for (const auto &session : sessions_)
            {
                totalBytesRead += session->bytesRead();
                totalMessagesRead += session->messagesRead();
            }

            std::cout << totalBytesRead << " total bytes read" << std::endl;
            std::cout << totalMessagesRead << " total messages read" << std::endl;
            std::cout << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead)
                      << " bytes per message" << std::endl;
            std::cout << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024)
                      << " MiB/s throughout" << std::endl;

            auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
            std::cout << std::asctime(std::localtime(&now)) << " client end..." << std::endl;
        }
    }

private:
    co_context::task<> handleTimeout()
    {
        auto deadline = std::chrono::steady_clock::now();
        deadline += std::chrono::seconds(timeout_);
        co_await co_context::timeout_at(deadline);

        std::cout << "stop." << std::endl;
        for (auto &session : sessions_)
        {
            co_await session->stop();
        }
    }

private:
    co_context::io_context *io_context_;
    const co_context::inet_address addr_;
    int blockSize_;
    int sessionCount_;
    Second timeout_;
    std::atomic<int> numConntected_;
    std::string message_;
    std::vector<std::unique_ptr<Session>> sessions_;
};


co_context::task<> Session::start()
{
    // FIXME: connect error ?
    int ret = co_await socket_.connect(serverAddr_);
    if (ret < 0)
    {
        std::cerr << "connect error" << std::endl;
    }

    socket_.set_tcp_no_delay(true);
    client_->onConnect();

    io_context_->co_spawn(onMessage());
    
    // FIXME: send error ?
    int sn = co_await socket_.send(client_->message());
}

co_context::task<> Session::onMessage()
{
    char buf[16384];

    while (true)
    {
        int nr = co_await socket_.recv(buf);
        if (nr <= 0)
        {
            co_return;
        }
        ++messagesRead_;
        bytesRead_ += nr;

        int ns = co_await socket_.send(buf, static_cast<size_t>(nr));
        if (ns <= 0)
        {
            co_return;
        }
        byteWritten_ += ns;
    }
}

co_context::task<> Session::stop()
{
    {
        std::unique_lock<std::mutex> locker(mutex_);
        co_await socket_.shutdown_write();
    }
    
    client_->onDisconnect();
}


/*
    ./echo_client 127.0.0.1 33333 1 16384 100 10
*/
int main(int argc, char *argv[])
{
    auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    std::cout << std::asctime(std::localtime(&now)) << " client start..." << std::endl;

    if (argc != 7)
    {
        fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time>\n");
    }
    else
    {
        const char *ip = argv[1];
        uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
        int threadCount = atoi(argv[3]);
        int blockSize = atoi(argv[4]);
        int sessionCount = atoi(argv[5]);
        int timeout = atoi(argv[6]);

        co_context::io_context io_ctx;
        co_context::inet_address serverAddr(ip, port);
        
        Client client(&io_ctx, serverAddr, blockSize, sessionCount, timeout, threadCount);
        io_ctx.co_spawn(client.start());
        io_ctx.start();
        io_ctx.join();
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant