From ead714f245b3e267822fa190e641e0e1925dac30 Mon Sep 17 00:00:00 2001 From: Keukhan Date: Thu, 7 Nov 2024 00:13:26 +0900 Subject: [PATCH 1/2] Fixed the issue where multiple recordings/pushes on the same output stream sometimes failed. --- src/projects/base/info/record.cpp | 3 ++- .../publishers/file/file_application.cpp | 23 +++++++++---------- .../publishers/push/push_application.cpp | 8 +++---- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/projects/base/info/record.cpp b/src/projects/base/info/record.cpp index 0d346f3fd..9ede08194 100755 --- a/src/projects/base/info/record.cpp +++ b/src/projects/base/info/record.cpp @@ -41,6 +41,8 @@ namespace info _schedule = ""; _segmentation_rule = ""; + _session_id = 0; + _state = RecordState::Ready; } @@ -430,7 +432,6 @@ namespace info info.AppendFormat(" tmp_path(%s)", _tmp_path.CStr()); info.AppendFormat(" file_path(%s)", _output_file_path.CStr()); info.AppendFormat(" info_path(%s)", _output_info_path.CStr()); - info.AppendFormat(" bytes(%lld)", _record_bytes); info.AppendFormat(" total_bytes(%lld)", _record_total_bytes); info.AppendFormat(" total_time(%lld)", _record_total_time); info.AppendFormat(" created_time(%s)", ov::Converter::ToString(_created_time).CStr()); diff --git a/src/projects/publishers/file/file_application.cpp b/src/projects/publishers/file/file_application.cpp index 930c75922..728ec7b88 100755 --- a/src/projects/publishers/file/file_application.cpp +++ b/src/projects/publishers/file/file_application.cpp @@ -48,12 +48,10 @@ namespace pub auto records_info = GetRecordInfoFromFile(stream_map_config.GetPath(), info); for (auto record : records_info) { - record->SetByConfig(true); - auto result = RecordStart(record); if (result->GetCode() != FilePublisher::FilePublisherStatusCode::Success) { - logtw("FileStream(%s/%s) - Failed to start record(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record->GetId().CStr(), result->GetCode(), result->GetMessage().CStr()); + logtw("FileStream(%s/%s) - Failed to start record. id(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record->GetId().CStr(), result->GetCode(), result->GetMessage().CStr()); } } } @@ -82,7 +80,7 @@ namespace pub auto result = RecordStop(record_info); if (result->GetCode() != FilePublisher::FilePublisherStatusCode::Success) { - logtw("FileStream(%s/%s) - Failed to start record(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record_info->GetId().CStr(), result->GetCode(), result->GetMessage().CStr()); + logtw("FileStream(%s/%s) - Failed to stop record. id(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record_info->GetId().CStr(), result->GetCode(), result->GetMessage().CStr()); } } @@ -100,10 +98,11 @@ namespace pub { // State of disconnected and ready to connect case pub::Session::SessionState::Ready: - session->Start(); - break; + [[fallthrough]]; case pub::Session::SessionState::Stopped: session->Start(); + logti("Recording Started. %s", session->GetRecord()->GetInfoString().CStr()); + break; // State of Recording case pub::Session::SessionState::Started: @@ -133,6 +132,7 @@ namespace pub { case pub::Session::SessionState::Started: session->Stop(); + logti("Recording Stopped. %s", session->GetRecord()->GetInfoString().CStr()); break; default: break; @@ -149,30 +149,26 @@ namespace pub { // If there is no session, create a new file(record) session. auto session = std::static_pointer_cast(stream->GetSession(userdata->GetSessionId())); - if (session == nullptr) + if (session == nullptr || userdata->GetSessionId() == 0) { session = stream->CreateSession(); if (session == nullptr) { - logte("Could not create session"); + logte("Failed to create session"); return; } userdata->SetSessionId(session->GetId()); - session->SetRecord(userdata); } if (userdata->GetEnable() == true && userdata->GetRemove() == false) { SessionStart(session); - logti("Recording Started. %s", userdata->GetInfoString().CStr()); } if (userdata->GetEnable() == false || userdata->GetRemove() == true) { SessionStop(session); - logti("Recording Completed. %s", userdata->GetInfoString().CStr()); - } } @@ -297,6 +293,8 @@ namespace pub record->SetTransactionId(ov::Random::GenerateString(16)); record->SetEnable(true); record->SetRemove(false); + record->SetByConfig(false); + record->SetSessionId(0); record->SetFilePathSetByUser((record->GetFilePath().IsEmpty() != true) ? true : false); record->SetInfoPathSetByUser((record->GetInfoPath().IsEmpty() != true) ? true : false); @@ -434,6 +432,7 @@ namespace pub record->SetId(ov::Random::GenerateString(16)); record->SetEnable(enable); + record->SetByConfig(true); record->SetVhost(GetVHostAppName().GetVHostName()); record->SetApplication(GetVHostAppName().GetAppName()); diff --git a/src/projects/publishers/push/push_application.cpp b/src/projects/publishers/push/push_application.cpp index 9dca88861..b79d1fbed 100755 --- a/src/projects/publishers/push/push_application.cpp +++ b/src/projects/publishers/push/push_application.cpp @@ -85,7 +85,6 @@ namespace pub auto pushes = GetPushesByStreamMap(stream_map.GetPath(), info); for(auto &push : pushes) { - push->SetByConfig(true); StartPush(push); } } @@ -146,9 +145,9 @@ namespace pub return ov::Error::CreateError(PUSH_PUBLISHER_ERROR_DOMAIN, ErrorCode::FailureDuplicateKey, error_message); } - // 녹화 활성화 push->SetEnable(true); push->SetRemove(false); + push->SetByConfig(false); std::unique_lock lock(_push_map_mutex); _pushes[push->GetId()] = push; @@ -257,7 +256,7 @@ namespace pub { case pub::Session::SessionState::Started: session->Stop(); - logti("Push ended. %s", push->GetInfoString().CStr()); + logti("Push stopped. %s", push->GetInfoString().CStr()); break; case pub::Session::SessionState::Ready: [[fallthrough]]; @@ -320,7 +319,7 @@ namespace pub // Find a session by session ID auto session = stream->GetSession(push->GetSessionId()); - if (session == nullptr) + if (session == nullptr || push->GetSessionId() == 0) { session = stream->CreatePushSession(push); if (session == nullptr) @@ -451,6 +450,7 @@ namespace pub push->SetId(id); push->SetEnable(enable); + push->SetByConfig(true); push->SetVhost(GetVHostAppName().GetVHostName()); push->SetApplication(GetVHostAppName().GetAppName()); push->SetStreamName(stream_info->GetName().CStr()); From 7d1a985be219c8d5410c4dc0b3e0264c962bcb27 Mon Sep 17 00:00:00 2001 From: Keukhan Date: Thu, 7 Nov 2024 00:18:54 +0900 Subject: [PATCH 2/2] Added Push/File publisher information to stream monitoring (#1708) --- src/projects/modules/ffmpeg/ffmpeg_writer.cpp | 8 +++++++- src/projects/modules/ffmpeg/ffmpeg_writer.h | 3 ++- src/projects/publishers/file/file_session.cpp | 12 +++++++++--- src/projects/publishers/push/push_session.cpp | 10 ++++++++-- src/projects/publishers/push/push_stream.cpp | 3 --- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/projects/modules/ffmpeg/ffmpeg_writer.cpp b/src/projects/modules/ffmpeg/ffmpeg_writer.cpp index 2b59f70b0..6c3953850 100644 --- a/src/projects/modules/ffmpeg/ffmpeg_writer.cpp +++ b/src/projects/modules/ffmpeg/ffmpeg_writer.cpp @@ -225,7 +225,7 @@ namespace ffmpeg return true; } - bool Writer::SendPacket(const std::shared_ptr &packet) + bool Writer::SendPacket(const std::shared_ptr &packet, uint64_t *sent_bytes) { if (!packet) { @@ -366,11 +366,17 @@ namespace ffmpeg { av_packet_unref(&av_packet); SetState(WriterStateError); + return false; } _last_packet_sent_time = std::chrono::high_resolution_clock::now(); + if (sent_bytes != nullptr) + { + *sent_bytes = av_packet.size; + } + int error = av_interleaved_write_frame(av_format.get(), &av_packet); if (error != 0) { diff --git a/src/projects/modules/ffmpeg/ffmpeg_writer.h b/src/projects/modules/ffmpeg/ffmpeg_writer.h index e7f9547ae..d8eff50b6 100644 --- a/src/projects/modules/ffmpeg/ffmpeg_writer.h +++ b/src/projects/modules/ffmpeg/ffmpeg_writer.h @@ -48,7 +48,8 @@ namespace ffmpeg bool Stop(); bool AddTrack(const std::shared_ptr &media_track); - bool SendPacket(const std::shared_ptr &packet); + bool SendPacket(const std::shared_ptr &packet, uint64_t *sent_bytes = nullptr); + std::chrono::high_resolution_clock::time_point GetLastPacketSentTime(); void SetTimestampMode(TimestampMode mode); diff --git a/src/projects/publishers/file/file_session.cpp b/src/projects/publishers/file/file_session.cpp index 742481496..dca282ca3 100755 --- a/src/projects/publishers/file/file_session.cpp +++ b/src/projects/publishers/file/file_session.cpp @@ -33,11 +33,13 @@ namespace pub : pub::Session(session_info, application, stream), _writer(nullptr) { + MonitorInstance->OnSessionConnected(*stream, PublisherType::File); } FileSession::~FileSession() { logtd("FileSession(%d) has been terminated finally", GetId()); + MonitorInstance->OnSessionDisconnected(*GetStream(), PublisherType::File); } bool FileSession::Start() @@ -154,7 +156,7 @@ namespace pub if (ffmpeg::Conv::IsSupportCodec(output_format, track->GetCodecId()) == false) { - logtw("%s format does not support the codec(%s)", output_format.CStr(), cmn::GetCodecIdToString(track->GetCodecId()).CStr()); + logtd("%s format does not support the codec(%s)", output_format.CStr(), cmn::GetCodecIdToString(track->GetCodecId()).CStr()); continue; } @@ -376,7 +378,9 @@ namespace pub if (_writer != nullptr) { - bool ret = _writer->SendPacket(session_packet); + uint64_t sent_bytes = 0; + + bool ret = _writer->SendPacket(session_packet, &sent_bytes); if (ret == false) { @@ -390,7 +394,9 @@ namespace pub } GetRecord()->UpdateRecordTime(); - GetRecord()->IncreaseRecordBytes(session_packet->GetData()->GetLength()); + GetRecord()->IncreaseRecordBytes(sent_bytes); + + MonitorInstance->IncreaseBytesOut(*GetStream(), PublisherType::File, sent_bytes); } } diff --git a/src/projects/publishers/push/push_session.cpp b/src/projects/publishers/push/push_session.cpp index 3958e2ed8..8b1822fa8 100755 --- a/src/projects/publishers/push/push_session.cpp +++ b/src/projects/publishers/push/push_session.cpp @@ -34,12 +34,14 @@ namespace pub _push(push), _writer(nullptr) { + MonitorInstance->OnSessionConnected(*stream, PublisherType::Push); } PushSession::~PushSession() { Stop(); logtd("PushSession(%d) has been terminated finally", GetId()); + MonitorInstance->OnSessionDisconnected(*GetStream(), PublisherType::Push); } bool PushSession::Start() @@ -204,7 +206,9 @@ namespace pub return; } - bool ret = writer->SendPacket(session_packet); + uint64_t sent_bytes = 0; + + bool ret = writer->SendPacket(session_packet, &sent_bytes); if (ret == false) { logte("Failed to send packet"); @@ -218,7 +222,9 @@ namespace pub } GetPush()->UpdatePushTime(); - GetPush()->IncreasePushBytes(session_packet->GetData()->GetLength()); + GetPush()->IncreasePushBytes(sent_bytes); + + MonitorInstance->IncreaseBytesOut(*GetStream(), PublisherType::Push, sent_bytes); } std::shared_ptr PushSession::CreateWriter() diff --git a/src/projects/publishers/push/push_stream.cpp b/src/projects/publishers/push/push_stream.cpp index d4e10839d..d12fb3b28 100755 --- a/src/projects/publishers/push/push_stream.cpp +++ b/src/projects/publishers/push/push_stream.cpp @@ -76,9 +76,6 @@ namespace pub auto stream_packet = std::make_any>(media_packet); BroadcastPacket(stream_packet); - - // TODO(Keukhan): Because the transmission size varies for each session, it needs to be improved - MonitorInstance->IncreaseBytesOut(*pub::Stream::GetSharedPtrAs(), PublisherType::Push, media_packet->GetData()->GetLength() * GetSessionCount()); } void PushStream::SendVideoFrame(const std::shared_ptr &media_packet)