Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CARMACloudPlugin: Fix the issue to unzip the compressed TCMs #602

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions src/v2i-hub/CARMACloudPlugin/src/CARMACloudPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,49 +205,50 @@ string CARMACloudPlugin::updateTags(string str,string tagout, string tagin)

void CARMACloudPlugin::CARMAResponseHandler(QHttpEngine::Socket *socket)
{
QString st;
QByteArray st;
while(socket->bytesAvailable()>0)
{
auto readBytes = socket->readAll();
if (socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE)
{
//readBytes is compressed in gzip format
st.append(UncompressBytes(readBytes));
}else{
st.append(readBytes);
}
st.append(readBytes);
}
QByteArray array = st.toLocal8Bit();

char* _cloudUpdate = array.data(); // would be the cloud update packet, needs parsing


string tcm = _cloudUpdate;

PLOG(logINFO) << "Received TCM from cloud" << tcm << std::endl;
if(tcm.length() == 0)
if(st.size() == 0)
{
PLOG(logERROR) << "Received TCM length is zero, and skipped." << std::endl;
PLOG(logERROR) << "Received TCM is empty, and skipped." << std::endl;
return;
}
PLOG(logINFO) << "Received TCM bytes size: " << st.size()<< std::endl;

std::string tcm = "";
bool isCompressed = socket->headers().keys().contains(CONTENT_ENCODING_KEY) && std::string(socket->headers().constFind(CONTENT_ENCODING_KEY).value().data()) == CONTENT_ENCODING_VALUE;
if (isCompressed)
{
tcm = UncompressBytes(st).data();
}else{
tcm = st.data();
}

//Transform carma-cloud TCM XML to J2735 compatible TCM XML by updating tags
tcm=updateTags(tcm,"<TrafficControlMessage>","<TestMessage05><body>");
tcm=updateTags(tcm,"</TrafficControlMessage>","</body></TestMessage05>");
tcm=updateTags(tcm,"TrafficControlParams","params");
tcm=updateTags(tcm,"TrafficControlGeometry","geometry");
tcm=updateTags(tcm,"TrafficControlPackage","package");

//List of tcm in string format
std::list<std::string> tcm_sl = FilterTCMs(tcm);

std::list<std::string> tcm_sl = {};
if (isCompressed)
{
tcm_sl = FilterTCMs(tcm);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we only call FilterTCMs if the message was originally compressed? If so, it's probably worth at least a comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compressed file has list of TCMs wrapped inside the tags. All these are in a big XML payload. This filter tcms is to remove the list tags and only return a list a TCMs objects. There are comments in the header file:

std::list<std::string> FilterTCMs(const std::string& tcm_response) const;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if not compressed it is guranteed to be a list of a single TCM but if it is compressed it is multiple TCMs in a single XML that have to be parsed out individually? Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes about compressed. For not compressed, it is one TCM at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So then what is this configuration parameter for

sprintf(xml_str,"<?xml version=\"1.0\" encoding=\"UTF-8\"?><TrafficControlRequest port=\"%s\" list=\"%s\"><reqid>%s</reqid><reqseq>%ld</reqseq><scale>%ld</scale>%s</TrafficControlRequest>",std::to_string(webport).c_str(),list_tcm.c_str(),reqid, reqseq,scale,bounds_str);

}else{
tcm_sl.push_back(tcm);
}

for(const auto tcm_s: tcm_sl)
{
tsm5Message tsm5message;
tsm5EncodedMessage tsm5ENC;
tmx::message_container_type container;


std::stringstream ss;
ss << tcm_s;

Expand Down Expand Up @@ -376,7 +377,7 @@ void CARMACloudPlugin::TCMAckCheckAndRebroadcastTCM()
}
else
{
PLOG(logDEBUG) << "NO TCMs to broadcast." << std::endl;
PLOG(logDEBUG4) << "NO TCMs to broadcast." << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to have a status that evaluates to the number of TCMs currently broadcasting rather than this log statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be a nice feature to add. This is an indicator that no more TCMs that is left to broadcast.

_tcm_broadcast_times->clear();
_tcm_broadcast_starting_time->clear();
}
Expand Down Expand Up @@ -604,13 +605,13 @@ void CARMACloudPlugin::ConvertString2Pair(std::pair<string,string> &str_pair, co
QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) const
{
z_stream strm;
strm.zalloc = nullptr;//Refer to zlib docs (https://zlib.net/zlib_how.html)
strm.zfree = nullptr;
strm.opaque = nullptr;
strm.zalloc = Z_NULL;//Refer to zlib docs (https://zlib.net/zlib_how.html)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace in this function looks a little weird, but could just be github's rendering.

strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = compressedBytes.size();
strm.next_in = (Byte *)compressedBytes.data();
strm.next_in = (Byte *)compressedBytes.data();
//checking input z_stream to see if there is any error, eg: invalid data etc.
auto err = inflateInit2(&strm, MAX_WBITS + 16); // gzip input
auto err = inflateInit2(&strm, MAX_WBITS+32); // gzip input https://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
QByteArray outBuf;
//MAX numbers of bytes stored in a buffer
const int BUFFER_SIZE = 4092;
Expand All @@ -623,11 +624,10 @@ QByteArray CARMACloudPlugin::UncompressBytes(const QByteArray compressedBytes) c
char buffer[BUFFER_SIZE] = {0};
strm.avail_out = BUFFER_SIZE;
strm.next_out = (Byte *)buffer;
//Uncompress finished
isDone = inflate(&strm, Z_FINISH);
outBuf.append(buffer);
} while (Z_STREAM_END != isDone); //Reach the end of stream to be uncompressed
}else{
isDone = inflate(&strm, Z_NO_FLUSH);
outBuf.append(buffer, BUFFER_SIZE - strm.avail_out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Indentation looks off.

} while (Z_STREAM_END != isDone); // Reach the end of stream to be uncompressed
paulbourelly999 marked this conversation as resolved.
Show resolved Hide resolved
}else{
PLOG(logWARNING) << "Error initalize stream. Err code = " << err << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want to execute the last line of this method if this error happens?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems useful to know that the stream is not readable and what the actual error is. What do you suggest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am asking whether we want the statements on L634 and L635 to execute in this error case. I have no problem with the logging except if it is an error we may want to make it error level and not warning? Just curious whether we want to stop doing everything in this case or not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to error type and handled it silently by ignoring it for now.

}
//Finished decompress data stream
Expand Down
Loading