Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP_DDS: support automatic reconnect to micro-ROS agent #25228

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 94 additions & 10 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#if AP_DDS_ENABLED

#include <uxr/client/util/ping.h>

#include <AP_GPS/AP_GPS.h>
#include <AP_HAL/AP_HAL.h>
#include <AP_RTC/AP_RTC.h>
Expand Down Expand Up @@ -34,6 +36,7 @@ static constexpr uint16_t DELAY_LOCAL_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_LOCAL_VELOCITY_TOPIC_MS = 33;
static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_CLOCK_TOPIC_MS = 10;
static constexpr uint16_t DELAY_PING_MS = 500;

// Define the subscriber data members, which are static class scope.
// If these are created on the stack in the subscriber,
Expand Down Expand Up @@ -67,6 +70,18 @@ const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
AP_GROUPEND
};

AP_DDS_Client::~AP_DDS_Client()
{
// close transport
if (is_using_serial) {
uxr_close_custom_transport(&serial.transport);
} else {
#if AP_DDS_UDP_ENABLED
uxr_close_custom_transport(&udp.transport);
#endif
}
}

void AP_DDS_Client::update_topic(builtin_interfaces_msg_Time& msg)
{
uint64_t utc_usec;
Expand Down Expand Up @@ -582,22 +597,83 @@ void AP_DDS_Client::on_request(uxrSession* uxr_session, uxrObjectId object_id, u
*/
void AP_DDS_Client::main_loop(void)
{
if (!init() || !create()) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "%s Creation Requests failed", msg_prefix);
if (!init_transport()) {
return;
}
GCS_SEND_TEXT(MAV_SEVERITY_INFO, "%s Initialization passed", msg_prefix);

populate_static_transforms(tx_static_transforms_topic);
write_static_transforms();

//! @todo check for request to stop task
while (true) {
hal.scheduler->delay(1);
update();
if (comm == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: transport invalid, exiting");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is set from uxr_init_session(&session, comm, key);, so perhaps better wording would be session invalid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think the wording is ok: comm is set when the transport initialises - before the session is initialised.

bool AP_DDS_Client::ddsSerialInit()
{
    // setup a framed transport for serial
    uxr_set_custom_transport_callbacks(&serial.transport, true,
                                       serial_transport_open,
                                       serial_transport_close,
                                       serial_transport_write,
                                       serial_transport_read);


    if (!uxr_init_custom_transport(&serial.transport, (void*)this)) {
        return false;
    }
    comm = &serial.transport.comm;

return;
}

// check ping
const uint64_t ping_timeout_ms{1000};
const uint8_t ping_max_attempts{10};
if (!uxr_ping_agent_attempts(comm, ping_timeout_ms, ping_max_attempts)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: No ping response, exiting");
return;
}

// create session
if (!init_session() || !create()) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "DDS Client: Creation Requests failed");
return;
}
connected = true;
GCS_SEND_TEXT(MAV_SEVERITY_INFO, "DDS Client: Initialization passed");

populate_static_transforms(tx_static_transforms_topic);
write_static_transforms();

uint64_t last_ping_ms{0};
uint8_t num_pings_missed{0};
bool had_ping_reply{false};
while (connected) {
hal.scheduler->delay(1);

// publish topics
update();

// check ping response
if (session.on_pong_flag == PONG_IN_SESSION_STATUS) {
had_ping_reply = true;
}

const auto cur_time_ms = AP_HAL::millis64();
if (cur_time_ms - last_ping_ms > DELAY_PING_MS) {
last_ping_ms = cur_time_ms;

if (had_ping_reply) {
num_pings_missed = 0;

} else {
++num_pings_missed;
}

const int ping_agent_timeout_ms{0};
const uint8_t ping_agent_attempts{1};
uxr_ping_agent_session(&session, ping_agent_timeout_ms, ping_agent_attempts);

had_ping_reply = false;
}

if (num_pings_missed > 2) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,
"DDS Client: No ping response, disconnecting");
connected = false;
}
}

// delete session if connected
if (connected) {
uxr_delete_session(&session);
}
}
}

bool AP_DDS_Client::init()
bool AP_DDS_Client::init_transport()
{
// serial init will fail if the SERIALn_PROTOCOL is not setup
bool initTransportStatus = ddsSerialInit();
Expand All @@ -615,6 +691,14 @@ bool AP_DDS_Client::init()
return false;
}

return true;
}

bool AP_DDS_Client::init_session()
{
// init session
uxr_init_session(&session, comm, key);

// Register topic callbacks
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic_trampoline, this);

Expand Down Expand Up @@ -948,7 +1032,7 @@ void AP_DDS_Client::update()
write_clock_topic();
}

connected = uxr_run_session_time(&session, 1);
status_ok = uxr_run_session_time(&session, 1);
}

#if CONFIG_HAL_BOARD != HAL_BOARD_SITL
Expand Down
17 changes: 13 additions & 4 deletions libraries/AP_DDS/AP_DDS_Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class AP_DDS_Client
HAL_Semaphore csem;

// connection parametrics
bool connected = true;
bool status_ok{false};
bool connected{false};

static void update_topic(builtin_interfaces_msg_Time& msg);
bool update_topic(sensor_msgs_msg_NavSatFix& msg, const uint8_t instance) WARN_IF_UNUSED;
Expand Down Expand Up @@ -143,17 +144,25 @@ class AP_DDS_Client
SocketAPM *socket;
} udp;
#endif
// pointer to transport's communication structure
uxrCommunication *comm{nullptr};

// client key we present
static constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
static constexpr uint32_t key = 0xAAAABBBB;

public:
~AP_DDS_Client();

bool start(void);
void main_loop(void);

//! @brief Initialize the client's transport, uxr session, and IO stream(s)
//! @brief Initialize the client's transport
//! @return True on successful initialization, false on failure
bool init_transport() WARN_IF_UNUSED;

//! @brief Initialize the client's uxr session and IO stream(s)
//! @return True on successful initialization, false on failure
bool init() WARN_IF_UNUSED;
bool init_session() WARN_IF_UNUSED;

//! @brief Set up the client's participants, data read/writes,
// publishers, subscribers
Expand Down
2 changes: 1 addition & 1 deletion libraries/AP_DDS/AP_DDS_Serial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ bool AP_DDS_Client::ddsSerialInit()
if (!uxr_init_custom_transport(&serial.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &serial.transport.comm, uniqueClientKey);
comm = &serial.transport.comm;
return true;
}
2 changes: 1 addition & 1 deletion libraries/AP_DDS/AP_DDS_UDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ bool AP_DDS_Client::ddsUdpInit()
if (!uxr_init_custom_transport(&udp.transport, (void*)this)) {
return false;
}
uxr_init_session(&session, &udp.transport.comm, uniqueClientKey);
comm = &udp.transport.comm;
return true;
}
#endif // AP_DDS_UDP_ENABLED