Skip to content

Commit

Permalink
Merge pull request #455 from mtconnect/merge_main_to_main_dev
Browse files Browse the repository at this point in the history
Merge main to main dev
  • Loading branch information
wsobel authored May 13, 2024
2 parents eb94b8f + ef30913 commit 2b5a5e8
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 5)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 0)
set(AGENT_VERSION_RC "RC2")
set(AGENT_VERSION_RC "RC4")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
cmake_minimum_required(VERSION 3.23 FATAL_ERROR)
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class MTConnectAgentConan(ConanFile):
name = "mtconnect_agent"
version = "2.2"
version = "2.3"
url = "https://github.com/mtconnect/cppagent.git"
license = "Apache License 2.0"
settings = "os", "compiler", "arch", "build_type"
Expand Down
1 change: 1 addition & 0 deletions src/mtconnect/mqtt/mqtt_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ namespace mtconnect {
m_connected = false;
if (m_handler && m_handler->m_disconnected)
m_handler->m_disconnected(shared_from_this());

if (m_running)
{
reconnect();
Expand Down
22 changes: 12 additions & 10 deletions src/mtconnect/sink/mqtt_sink/mqtt_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ namespace mtconnect {
}
}

auto seq = m_sinkContract->getCircularBuffer().getSequence();
auto seq = publishCurrent(boost::system::error_code {});
for (auto &dev : m_sinkContract->getDevices())
{
FilterSet filterSet = filterForDevice(dev);
Expand All @@ -199,10 +199,8 @@ namespace mtconnect {
sampler->observe(seq, [this](const std::string &id) {
return m_sinkContract->getDataItemById(id).get();
});
sampler->handlerCompleted();
publishSample(sampler);
}

publishCurrent(boost::system::error_code {});
}

/// @brief publish sample when observations arrive.
Expand Down Expand Up @@ -246,18 +244,20 @@ namespace mtconnect {
return end;
}

void MqttService::publishCurrent(boost::system::error_code ec)
SequenceNumber_t MqttService::publishCurrent(boost::system::error_code ec)
{
SequenceNumber_t firstSeq, seq = 0;

if (ec)
{
LOG(warning) << "Mqtt2Service::publishCurrent: " << ec.message();
return;
return 0;
}

if (!m_client->isRunning() || !m_client->isConnected())
{
LOG(warning) << "Mqtt2Service::publishCurrent: client stopped";
return;
return 0;
}

for (auto &device : m_sinkContract->getDevices())
Expand All @@ -266,7 +266,6 @@ namespace mtconnect {
LOG(debug) << "Publishing current for: " << topic;

ObservationList observations;
SequenceNumber_t firstSeq, seq;
auto filterSet = filterForDevice(device);

{
Expand All @@ -290,6 +289,8 @@ namespace mtconnect {
m_currentTimer.expires_after(m_currentInterval);
m_currentTimer.async_wait(boost::asio::bind_executor(
m_strand, boost::bind(&MqttService::publishCurrent, this, _1)));

return seq;
}

bool MqttService::publish(observation::ObservationPtr &observation)
Expand Down Expand Up @@ -327,8 +328,9 @@ namespace mtconnect {

LOG(debug) << "Publishing Asset to topic: " << topic;

auto doc = m_jsonPrinter->print(asset);

asset::AssetList list {asset};
auto doc = m_printer->printAssets(
m_instanceId, uint32_t(m_sinkContract->getAssetStorage()->getMaxAssets()), 1, list);
stringstream buffer;
buffer << doc;

Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/sink/mqtt_sink/mqtt_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace mtconnect {
void pubishInitialContent();

/// @brief Publish a current using `CurrentInterval` option.
void publishCurrent(boost::system::error_code ec);
SequenceNumber_t publishCurrent(boost::system::error_code ec);

/// @brief publish sample when observations arrive.
SequenceNumber_t publishSample(std::shared_ptr<observation::AsyncObserver> sampler);
Expand Down
4 changes: 3 additions & 1 deletion src/mtconnect/sink/rest_sink/rest_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,18 @@ namespace mtconnect {

auto idHandler = [&](SessionPtr session, RequestPtr request) -> bool {
auto asset = request->parameter<string>("assetIds");
auto pretty = *request->parameter<bool>("pretty");
if (asset)
{
auto pretty = request->parameter<bool>("pretty").value_or(false);
auto printer = m_sinkContract->getPrinter(acceptFormat(request->m_accepts));

list<string> ids;
stringstream str(*asset);
string id;
while (getline(str, id, ';'))
ids.emplace_back(id);

respond(session, assetIdsRequest(printer, ids, pretty, request->m_requestId),
request->m_requestId);
}
Expand Down
15 changes: 11 additions & 4 deletions test_package/mqtt_sink_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class MqttSinkTest : public testing::Test
void TearDown() override
{
const auto agent = m_agentTestHelper->getAgent();
m_agentTestHelper->m_ioContext.run_for(500ms);
if (agent)
{
m_agentTestHelper->getAgent()->stop();
Expand Down Expand Up @@ -249,15 +248,23 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Sample)

auto handler = make_unique<ClientHandler>();
bool gotSample = false;
handler->m_receive = [&gotSample](std::shared_ptr<MqttClient> client, const std::string &topic,
bool first = true;
handler->m_receive = [&gotSample, &first](std::shared_ptr<MqttClient> client, const std::string &topic,
const std::string &payload) {
if (first)
{
first = false;
}
else
{
EXPECT_EQ("MTConnect/Sample/000", topic);

auto jdoc = json::parse(payload);
auto streams = jdoc.at("/MTConnectStreams/Streams/0/DeviceStream"_json_pointer);
EXPECT_EQ(string("LinuxCNC"), streams.at("/name"_json_pointer).get<string>());

gotSample = true;
}
};

createClient(options, std::move(handler));
Expand All @@ -268,8 +275,8 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Sample)

auto service = m_agentTestHelper->getMqttService();

ASSERT_TRUE(waitFor(60s, [&service]() { return service->isConnected(); }));
ASSERT_FALSE(gotSample);
ASSERT_TRUE(waitFor(60s, [&first]() { return !first; }));
ASSERT_FALSE(first);

m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204");
ASSERT_TRUE(waitFor(10s, [&gotSample]() { return gotSample; }));
Expand Down

0 comments on commit 2b5a5e8

Please sign in to comment.