Skip to content

Commit

Permalink
fix: add codes to address this issue. (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn authored Nov 14, 2024
1 parent 48d5389 commit 304b32b
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=StandaloneTest.BasicC"
"--gtest_filter=ExtensionTest.NoAudioFrameDest"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
92 changes: 48 additions & 44 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,63 +451,67 @@ static bool ten_extension_determine_out_msg_dest_from_graph(
// Fetch the destinations from the graph.
ten_msg_dest_info_t *msg_dest_info =
ten_extension_get_msg_dests_from_graph(self, msg);
*result_return_policy = msg_dest_info->policy;
ten_list_t *dests = &msg_dest_info->dest;

if (dests && ten_list_size(dests) > 0) {
ten_list_foreach (dests, iter) {
bool need_to_clone_msg = need_to_clone_msg_when_sending(msg, iter.index);

ten_shared_ptr_t *curr_msg = NULL;
if (need_to_clone_msg) {
curr_msg = ten_msg_clone(msg, NULL);
} else {
curr_msg = msg;
}
if (msg_dest_info) {
*result_return_policy = msg_dest_info->policy;
ten_list_t *dests = &msg_dest_info->dest;

if (dests && ten_list_size(dests) > 0) {
ten_list_foreach (dests, iter) {
bool need_to_clone_msg =
need_to_clone_msg_when_sending(msg, iter.index);

ten_shared_ptr_t *curr_msg = NULL;
if (need_to_clone_msg) {
curr_msg = ten_msg_clone(msg, NULL);
} else {
curr_msg = msg;
}

ten_extension_info_t *dest_extension_info =
ten_smart_ptr_get_data(ten_smart_ptr_listnode_get(iter.node));
TEN_ASSERT(dest_extension_info, "Should not happen.");
ten_extension_info_t *dest_extension_info =
ten_smart_ptr_get_data(ten_smart_ptr_listnode_get(iter.node));
TEN_ASSERT(dest_extension_info, "Should not happen.");

// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: The graph-related information of the extension remains
// unchanged during the lifecycle of engine/graph, allowing safe
// cross-thread access.
TEN_ASSERT(ten_extension_info_check_integrity(dest_extension_info, false),
"Invalid use of extension_info %p.", dest_extension_info);
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: The graph-related information of the extension remains
// unchanged during the lifecycle of engine/graph, allowing safe
// cross-thread access.
TEN_ASSERT(
ten_extension_info_check_integrity(dest_extension_info, false),
"Invalid use of extension_info %p.", dest_extension_info);

ten_msg_clear_and_set_dest_from_extension_info(curr_msg,
dest_extension_info);
ten_msg_clear_and_set_dest_from_extension_info(curr_msg,
dest_extension_info);

ten_list_push_smart_ptr_back(result_msgs, curr_msg);
ten_list_push_smart_ptr_back(result_msgs, curr_msg);

if (need_to_clone_msg) {
ten_shared_ptr_destroy(curr_msg);
if (need_to_clone_msg) {
ten_shared_ptr_destroy(curr_msg);
}
}

return true;
}
}

return true;
} else {
// Graph doesn't specify how to route the messages.
// Graph doesn't specify how to route the messages.

const char *msg_name = ten_msg_get_name(msg);
const char *msg_name = ten_msg_get_name(msg);

if (err) {
ten_error_set(err, TEN_ERRNO_INVALID_GRAPH,
"Failed to find destination of a message (%s) from graph.",
msg_name);
if (err) {
ten_error_set(err, TEN_ERRNO_INVALID_GRAPH,
"Failed to find destination of a message (%s) from graph.",
msg_name);
} else {
if (ten_msg_is_cmd_and_result(msg)) {
TEN_LOGE("Failed to find destination of a command (%s) from graph.",
msg_name);
} else {
if (ten_msg_is_cmd_and_result(msg)) {
TEN_LOGE("Failed to find destination of a command (%s) from graph.",
msg_name);
} else {
// The amount of the data-like messages might be huge, so we don't dump
// error logs here to prevent log flooding.
}
// The amount of the data-like messages might be huge, so we don't
// dump error logs here to prevent log flooding.
}

return false;
}

return false;
}

typedef enum TEN_EXTENSION_DETERMINE_OUT_MSGS_RESULT {
Expand Down
178 changes: 178 additions & 0 deletions tests/ten_runtime/smoke/extension_test/dest/no_audio_frame_dest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#include <memory>
#include <nlohmann/json.hpp>
#include <string>

#include "gtest/gtest.h"
#include "include_internal/ten_runtime/binding/cpp/ten.h"
#include "ten_runtime/binding/cpp/internal/ten_env.h"
#include "ten_utils/lang/cpp/lib/error.h"
#include "ten_utils/lib/thread.h"
#include "tests/common/client/cpp/msgpack_tcp.h"
#include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h"

namespace {

class test_extension_1 : public ten::extension_t {
public:
explicit test_extension_1(const std::string &name) : ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
if (std::string(cmd->get_name()) == "hello_world") {
hello_world_cmd = std::move(cmd);

ten::error_t err;

auto audio_frame_no_dest =
ten::audio_frame_t::create("audio_frame_no_dest");
bool rc = ten_env.send_audio_frame(std::move(audio_frame_no_dest), &err);
ASSERT_EQ(rc, false);
ASSERT_EQ(err.is_success(), false);

TEN_ENV_LOG_ERROR(ten_env, err.errmsg());

auto audio_frame = ten::audio_frame_t::create("audio_frame");
rc = ten_env.send_audio_frame(std::move(audio_frame));
ASSERT_EQ(rc, true);
} else if (std::string(cmd->get_name()) == "audio_frame_ack") {
auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property("detail", "hello world, too");
bool rc = ten_env.return_result(std::move(cmd_result),
std::move(hello_world_cmd));
ASSERT_EQ(rc, true);
}
}

private:
std::unique_ptr<ten::cmd_t> hello_world_cmd;
};

class test_extension_2 : public ten::extension_t {
public:
explicit test_extension_2(const std::string &name) : ten::extension_t(name) {}

void on_audio_frame(
ten::ten_env_t &ten_env,
std::unique_ptr<ten::audio_frame_t> audio_frame) override {
auto cmd = ten::cmd_t::create("audio_frame_ack");
ten_env.send_cmd(std::move(cmd));
}
};

class test_app : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
bool rc = ten_env.init_property_from_json(
// clang-format off
R"({
"_ten": {
"uri": "msgpack://127.0.0.1:8001/",
"log_level": 2
}
})"
// clang-format on
,
nullptr);
ASSERT_EQ(rc, true);

ten_env.on_configure_done();
}
};

void *test_app_thread_main(TEN_UNUSED void *args) {
auto *app = new test_app();
app->run();
delete app;

return nullptr;
}

TEN_CPP_REGISTER_ADDON_AS_EXTENSION(no_audio_frame_dest__test_extension_1,
test_extension_1);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(no_audio_frame_dest__test_extension_2,
test_extension_2);

} // namespace

TEST(ExtensionTest, NoAudioFrameDest) { // NOLINT
// Start app.
auto *app_thread =
ten_thread_create("app thread", test_app_thread_main, nullptr);

// Create a client and connect to the app.
auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/");

// Send graph.
nlohmann::json resp = client->send_json_and_recv_resp_in_json(
R"({
"_ten": {
"type": "start_graph",
"seq_id": "55",
"nodes": [{
"type": "extension",
"name": "test_extension_1",
"addon": "no_audio_frame_dest__test_extension_1",
"extension_group": "basic_extension_group",
"app": "msgpack://127.0.0.1:8001/"
},{
"type": "extension",
"name": "test_extension_2",
"addon": "no_audio_frame_dest__test_extension_2",
"extension_group": "basic_extension_group",
"app": "msgpack://127.0.0.1:8001/"
}],
"connections": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "basic_extension_group",
"extension": "test_extension_1",
"audio_frame": [{
"name": "audio_frame",
"dest": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "basic_extension_group",
"extension": "test_extension_2"
}]
}]
},{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "basic_extension_group",
"extension": "test_extension_2",
"cmd": [{
"name": "audio_frame_ack",
"dest": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "basic_extension_group",
"extension": "test_extension_1"
}]
}]
}]
}
})"_json);
ten_test::check_status_code_is(resp, TEN_STATUS_CODE_OK);

// Send a user-defined 'hello world' command.
resp = client->send_json_and_recv_resp_in_json(
R"({
"_ten": {
"name": "hello_world",
"seq_id": "137",
"dest": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "basic_extension_group",
"extension": "test_extension_1"
}]
}
})"_json);
ten_test::check_result_is(resp, "137", TEN_STATUS_CODE_OK,
"hello world, too");

delete client;

ten_thread_join(app_thread, -1);
}

0 comments on commit 304b32b

Please sign in to comment.