Skip to content

Commit

Permalink
Refactor multipart handling in C++
Browse files Browse the repository at this point in the history
  • Loading branch information
tuokri committed Dec 19, 2023
1 parent 89b3d51 commit fcdfaef
Showing 1 changed file with 59 additions and 43 deletions.
102 changes: 59 additions & 43 deletions tests/umb_echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ void print_header(const Header& header)
std::cout << std::format("type: {}\n", mt_str);
}

// TODO: unify error codes!
// https://www.boost.org/doc/libs/1_76_0/libs/system/doc/html/system.html
boost::asio::experimental::coro<void() noexcept, std::expected<Header, Error>>
read_header(
tcp::socket& socket,
Expand Down Expand Up @@ -333,62 +335,75 @@ handle_multi_part_msg(
}

const auto bytes_out = msg->to_bytes();
// Only count the number of payload bytes left here.
auto bytes_left = bytes_out.size() - umb::g_header_size;
auto it_bytes_out = bytes_out.cbegin();

const auto num_hdr_bytes = static_cast<size_t>(std::ceil(
static_cast<float>(bytes_out.size() - umb::g_header_size) /
static_cast<float>(umb::g_packet_size)) * umb::g_header_size);
const auto num_parts_out = static_cast<size_t>(std::ceil(
static_cast<float>(num_hdr_bytes + (bytes_out.size() - umb::g_header_size)) /
static_cast<float>(umb::g_packet_size)));
const auto total_bytes_to_send = bytes_out.size() + num_hdr_bytes - umb::g_header_size;

// Cache message type values, will be reused for all headers.
const auto mt0 = bytes_out[2];
const auto mt1 = bytes_out[3];

std::array<umb::byte, umb::g_packet_size> send_buf{};
umb::byte send_size = 0;
umb::byte send_part = 0;
unsigned num_from_buf = 0;

// Send full parts while we have enough bytes left for them.
while ((bytes_left + umb::g_header_size) > umb::g_packet_size)
{
send_size = static_cast<umb::byte>(umb::g_packet_size);
send_buf[0] = send_size;
send_buf[1] = send_part++;
send_buf[2] = mt0;
send_buf[3] = mt1;

num_from_buf = send_size - umb::g_header_size;
std::copy_n(it_bytes_out, num_from_buf, send_buf.begin() + umb::g_header_size);
it_bytes_out += send_size;

unsigned bytes_sent = 0;
unsigned offset = umb::g_header_size;
std::cout << std::format(
"sending {} bytes, send_part: {}, bytes_left: {}, num_from_buf: {}\n",
send_size, send_part, bytes_left, num_from_buf);
// TODO: check ec.
co_await async_write(
socket,
boost::asio::buffer(send_buf, send_size),
deferred);

std::cout << std::format(
"bytes_out: {}, num_hdr_bytes: {}, num_parts_out: {}, total_bytes_to_send: {}\n",
bytes_out.size(), num_hdr_bytes, num_parts_out, total_bytes_to_send);
bytes_left -= send_size;
}

// TODO: rethink this. We want a neat way of doing this... Is this neat?
for (size_t part_out = 0; part_out < num_parts_out; ++part_out)
// Send the final non-full part.
if ((bytes_left > 0) && (bytes_left <= (umb::g_packet_size - umb::g_header_size)))
{
if (part_out == (num_parts_out - 1))
{
part_out = umb::g_part_multi_part_end;
}
send_size = static_cast<umb::byte>(bytes_left) + umb::g_header_size;
send_part = umb::g_part_multi_part_end;
send_buf[0] = send_size;
send_buf[1] = send_part;
send_buf[2] = mt0;
send_buf[3] = mt1;

const auto num_to_send = std::min(umb::g_packet_size, total_bytes_to_send - bytes_sent);
const auto num_to_take_from_buffer = num_to_send - umb::g_header_size;
std::cout << std::format(
"sending {} bytes, offset: {}, part_out: {}, num_to_take_from_buffer: {}\n",
num_to_send, offset, part_out, num_to_take_from_buffer);

// Header.
auto it_send_buf = send_buf.begin();
*it_send_buf++ = num_to_send;
*it_send_buf++ = part_out;
*it_send_buf++ = mt0;
*it_send_buf++ = mt1;

// Grab the bytes we need for this part from the buffer containing
// all message bytes for the multipart message.
const std::span<const umb::byte> send_data{bytes_out.cbegin() + offset,
bytes_out.cend()};
std::copy_n(send_data.cbegin(), num_to_take_from_buffer, it_send_buf);
num_from_buf = send_size - umb::g_header_size;
std::copy_n(it_bytes_out, num_from_buf, send_buf.begin() + umb::g_header_size);
it_bytes_out += send_size;

std::cout << std::format(
"sending {} bytes, send_part: {}, bytes_left: {}, num_from_buf: {}\n",
send_size, send_part, bytes_left, num_from_buf);
// TODO: check ec.
co_await async_write(
socket,
boost::asio::buffer(send_buf, num_to_send),
boost::asio::buffer(send_buf, send_size),
deferred);

bytes_sent += num_to_send;
offset += num_to_send - umb::g_header_size;
}
else
{
std::cout << std::format(
"ERROR: invalid number of bytes left for last part: {}!\n",
bytes_left);
}

// TODO: check bytes left is 0 here.
}

// TODO: close connection on bad data, error, etc.?
Expand Down Expand Up @@ -421,8 +436,9 @@ awaitable<void> echo(tcp::socket socket)
{
const auto err = result.error();
// TODO: error messages.
std::cout << std::format("error: {}\n", static_cast<int>(err));
continue; // TODO: close conn?
std::cout << std::format("error: {}, closing connection\n",
static_cast<int>(err));
break;
}

const auto header = *result;
Expand Down

0 comments on commit fcdfaef

Please sign in to comment.