Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CARMACloudPlugin: Fix the issue to unzip the compressed TCMs #602

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,49 +205,57 @@ string CARMACloudPlugin::updateTags(string str,string tagout, string tagin)

void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
{
QString st;
QByteArray st;
while(socket->bytesAvailable()>0)
{
PLOG(logDEBUG) << "Bytes available." << std::endl;
auto readBytes = socket->readAll();
if (socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE)
{
//readBytes is compressed in gzip format
st.append(UncompressBytes(readBytes));
}else{
st.append(readBytes);
}
st.append(readBytes);
}
QByteArray array = st.toLocal8Bit();

char* _cloudUpdate = array.data(); // would be the cloud update packet, needs parsing


string tcm = _cloudUpdate;

PLOG(logINFO) << "Received TCM from cloud" << tcm << std::endl;
if(tcm.length() == 0)
if(st.size() == 0)
{
PLOG(logERROR) << "Received TCM length is zero, and skipped." << std::endl;
PLOG(logERROR) << "Received TCM is empty, and skipped." << std::endl;
return;
}
PLOG(logINFO) << "Received TCM bytes size: " << st.size()<< std::endl;

std::string tcm = "";
bool isCompressed = socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE;
if (isCompressed)
{
QByteArray tcmBytes = UncompressBytes(st);
if(tcmBytes.size() == 0)
{
return;
}
tcm = tcmBytes.data();
}else{
tcm = st.data();
}

//Transform carma-cloud TCM XML to J2735 compatible TCM XML by updating tags
tcm=updateTags(tcm,"<TrafficControlMessage>","<TestMessage05><body>");
tcm=updateTags(tcm,"</TrafficControlMessage>","</body></TestMessage05>");
tcm=updateTags(tcm,"TrafficControlParams","params");
tcm=updateTags(tcm,"TrafficControlGeometry","geometry");
tcm=updateTags(tcm,"TrafficControlPackage","package");

//List of tcm in string format
std::list<std::string> tcm_sl = FilterTCMs(tcm);
PLOG(logDEBUG2) << "Received TCM: " << tcm << std::endl;

for(const auto tcm_s: tcm_sl)
std::list<std::string> tcmSL = {};
if (isCompressed)
{
tcmSL = FilterTCMs(tcm);
}else{
tcmSL.push_back(tcm);
}

for(const auto tcm_s: tcmSL)
{
tsm5Message tsm5message;
tsm5EncodedMessage tsm5ENC;
tmx::message_container_type container;


std::stringstream ss;
ss << tcm_s;

Expand Down Expand Up @@ -376,7 +384,7 @@ void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM()
}
else
{
PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl;
PLOG(logDEBUG4) << "NO TCMs to broadcast." << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to have a status that evaluates to the number of TCMs currently broadcasting rather than this log statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be a nice feature to add. This is an indicator that no more TCMs that is left to broadcast.

_tcm_broadcast_times->clear();
_tcm_broadcast_starting_time->clear();
}
Expand Down Expand Up @@ -604,13 +612,13 @@ void CARMACloudPlugin::ConvertString2Pair(std::pair<string,string> &str_pair, co
QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) const
{
z_stream strm;
strm.zalloc = nullptr;//Refer to zlib docs (https://zlib.net/zlib_how.html)
strm.zfree = nullptr;
strm.opaque = nullptr;
strm.zalloc = Z_NULL;//Refer to zlib docs (https://zlib.net/zlib_how.html)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace in this function looks a little weird, but could just be github's rendering.

strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = compressedBytes.size();
strm.next_in = (Byte *)compressedBytes.data();
strm.next_in = (Byte *)compressedBytes.data();
//checking input z_stream to see if there is any error, eg: invalid data etc.
auto err = inflateInit2(&strm, MAX_WBITS + 16); // gzip input
auto err = inflateInit2(&strm, MAX_WBITS+32); // gzip input https://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
QByteArray outBuf;
//MAX numbers of bytes stored in a buffer
const int BUFFER_SIZE = 4092;
Expand All @@ -623,12 +631,11 @@ QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) c
char buffer[BUFFER_SIZE] = {0};
strm.avail_out = BUFFER_SIZE;
strm.next_out = (Byte *)buffer;
//Uncompress finished
isDone = inflate(&strm, Z_FINISH);
outBuf.append(buffer);
} while (Z_STREAM_END != isDone); //Reach the end of stream to be uncompressed
}else{
PLOG(logWARNING) << "Error initalize stream. Err code = " << err << std::endl;
isDone = inflate(&strm, Z_NO_FLUSH);
outBuf.append(buffer, BUFFER_SIZE - strm.avail_out);
} while (Z_STREAM_END != isDone); // Reach the end of stream to be uncompressed
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
}else{
PLOG(logERROR) << "Error initalize stream. Err code = " << err << std::endl;
}
//Finished decompress data stream
inflateEnd(&strm);
Expand Down
Loading