Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Nov 7, 2024
2 parents 0e35bea + 7d1a985 commit 963b27a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 27 deletions.
3 changes: 2 additions & 1 deletion src/projects/base/info/record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ namespace info
_schedule = "";
_segmentation_rule = "";

_session_id = 0;

_state = RecordState::Ready;
}

Expand Down Expand Up @@ -430,7 +432,6 @@ namespace info
info.AppendFormat(" tmp_path(%s)", _tmp_path.CStr());
info.AppendFormat(" file_path(%s)", _output_file_path.CStr());
info.AppendFormat(" info_path(%s)", _output_info_path.CStr());
info.AppendFormat(" bytes(%lld)", _record_bytes);
info.AppendFormat(" total_bytes(%lld)", _record_total_bytes);
info.AppendFormat(" total_time(%lld)", _record_total_time);
info.AppendFormat(" created_time(%s)", ov::Converter::ToString(_created_time).CStr());
Expand Down
8 changes: 7 additions & 1 deletion src/projects/modules/ffmpeg/ffmpeg_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ namespace ffmpeg
return true;
}

bool Writer::SendPacket(const std::shared_ptr<MediaPacket> &packet)
bool Writer::SendPacket(const std::shared_ptr<MediaPacket> &packet, uint64_t *sent_bytes)
{
if (!packet)
{
Expand Down Expand Up @@ -366,11 +366,17 @@ namespace ffmpeg
{
av_packet_unref(&av_packet);
SetState(WriterStateError);

return false;
}

_last_packet_sent_time = std::chrono::high_resolution_clock::now();

if (sent_bytes != nullptr)
{
*sent_bytes = av_packet.size;
}

int error = av_interleaved_write_frame(av_format.get(), &av_packet);
if (error != 0)
{
Expand Down
3 changes: 2 additions & 1 deletion src/projects/modules/ffmpeg/ffmpeg_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ namespace ffmpeg
bool Stop();

bool AddTrack(const std::shared_ptr<MediaTrack> &media_track);
bool SendPacket(const std::shared_ptr<MediaPacket> &packet);
bool SendPacket(const std::shared_ptr<MediaPacket> &packet, uint64_t *sent_bytes = nullptr);

std::chrono::high_resolution_clock::time_point GetLastPacketSentTime();

void SetTimestampMode(TimestampMode mode);
Expand Down
23 changes: 11 additions & 12 deletions src/projects/publishers/file/file_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ namespace pub
auto records_info = GetRecordInfoFromFile(stream_map_config.GetPath(), info);
for (auto record : records_info)
{
record->SetByConfig(true);

auto result = RecordStart(record);
if (result->GetCode() != FilePublisher::FilePublisherStatusCode::Success)
{
logtw("FileStream(%s/%s) - Failed to start record(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record->GetId().CStr(), result->GetCode(), result->GetMessage().CStr());
logtw("FileStream(%s/%s) - Failed to start record. id(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record->GetId().CStr(), result->GetCode(), result->GetMessage().CStr());
}
}
}
Expand Down Expand Up @@ -82,7 +80,7 @@ namespace pub
auto result = RecordStop(record_info);
if (result->GetCode() != FilePublisher::FilePublisherStatusCode::Success)
{
logtw("FileStream(%s/%s) - Failed to start record(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record_info->GetId().CStr(), result->GetCode(), result->GetMessage().CStr());
logtw("FileStream(%s/%s) - Failed to stop record. id(%s) status(%d) description(%s)", GetVHostAppName().CStr(), info->GetName().CStr(), record_info->GetId().CStr(), result->GetCode(), result->GetMessage().CStr());
}
}

Expand All @@ -100,10 +98,11 @@ namespace pub
{
// State of disconnected and ready to connect
case pub::Session::SessionState::Ready:
session->Start();
break;
[[fallthrough]];
case pub::Session::SessionState::Stopped:
session->Start();
logti("Recording Started. %s", session->GetRecord()->GetInfoString().CStr());

break;
// State of Recording
case pub::Session::SessionState::Started:
Expand Down Expand Up @@ -133,6 +132,7 @@ namespace pub
{
case pub::Session::SessionState::Started:
session->Stop();
logti("Recording Stopped. %s", session->GetRecord()->GetInfoString().CStr());
break;
default:
break;
Expand All @@ -149,30 +149,26 @@ namespace pub
{
// If there is no session, create a new file(record) session.
auto session = std::static_pointer_cast<FileSession>(stream->GetSession(userdata->GetSessionId()));
if (session == nullptr)
if (session == nullptr || userdata->GetSessionId() == 0)
{
session = stream->CreateSession();
if (session == nullptr)
{
logte("Could not create session");
logte("Failed to create session");
return;
}
userdata->SetSessionId(session->GetId());

session->SetRecord(userdata);
}

if (userdata->GetEnable() == true && userdata->GetRemove() == false)
{
SessionStart(session);
logti("Recording Started. %s", userdata->GetInfoString().CStr());
}

if (userdata->GetEnable() == false || userdata->GetRemove() == true)
{
SessionStop(session);
logti("Recording Completed. %s", userdata->GetInfoString().CStr());

}
}

Expand Down Expand Up @@ -297,6 +293,8 @@ namespace pub
record->SetTransactionId(ov::Random::GenerateString(16));
record->SetEnable(true);
record->SetRemove(false);
record->SetByConfig(false);
record->SetSessionId(0);
record->SetFilePathSetByUser((record->GetFilePath().IsEmpty() != true) ? true : false);
record->SetInfoPathSetByUser((record->GetInfoPath().IsEmpty() != true) ? true : false);

Expand Down Expand Up @@ -434,6 +432,7 @@ namespace pub

record->SetId(ov::Random::GenerateString(16));
record->SetEnable(enable);
record->SetByConfig(true);
record->SetVhost(GetVHostAppName().GetVHostName());
record->SetApplication(GetVHostAppName().GetAppName());

Expand Down
12 changes: 9 additions & 3 deletions src/projects/publishers/file/file_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ namespace pub
: pub::Session(session_info, application, stream),
_writer(nullptr)
{
MonitorInstance->OnSessionConnected(*stream, PublisherType::File);
}

FileSession::~FileSession()
{
logtd("FileSession(%d) has been terminated finally", GetId());
MonitorInstance->OnSessionDisconnected(*GetStream(), PublisherType::File);
}

bool FileSession::Start()
Expand Down Expand Up @@ -154,7 +156,7 @@ namespace pub

if (ffmpeg::Conv::IsSupportCodec(output_format, track->GetCodecId()) == false)
{
logtw("%s format does not support the codec(%s)", output_format.CStr(), cmn::GetCodecIdToString(track->GetCodecId()).CStr());
logtd("%s format does not support the codec(%s)", output_format.CStr(), cmn::GetCodecIdToString(track->GetCodecId()).CStr());
continue;
}

Expand Down Expand Up @@ -376,7 +378,9 @@ namespace pub

if (_writer != nullptr)
{
bool ret = _writer->SendPacket(session_packet);
uint64_t sent_bytes = 0;

bool ret = _writer->SendPacket(session_packet, &sent_bytes);

if (ret == false)
{
Expand All @@ -390,7 +394,9 @@ namespace pub
}

GetRecord()->UpdateRecordTime();
GetRecord()->IncreaseRecordBytes(session_packet->GetData()->GetLength());
GetRecord()->IncreaseRecordBytes(sent_bytes);

MonitorInstance->IncreaseBytesOut(*GetStream(), PublisherType::File, sent_bytes);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/projects/publishers/push/push_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ namespace pub
auto pushes = GetPushesByStreamMap(stream_map.GetPath(), info);
for(auto &push : pushes)
{
push->SetByConfig(true);
StartPush(push);
}
}
Expand Down Expand Up @@ -146,9 +145,9 @@ namespace pub
return ov::Error::CreateError(PUSH_PUBLISHER_ERROR_DOMAIN, ErrorCode::FailureDuplicateKey, error_message);
}

// 녹화 활성화
push->SetEnable(true);
push->SetRemove(false);
push->SetByConfig(false);

std::unique_lock<std::shared_mutex> lock(_push_map_mutex);
_pushes[push->GetId()] = push;
Expand Down Expand Up @@ -257,7 +256,7 @@ namespace pub
{
case pub::Session::SessionState::Started:
session->Stop();
logti("Push ended. %s", push->GetInfoString().CStr());
logti("Push stopped. %s", push->GetInfoString().CStr());
break;
case pub::Session::SessionState::Ready:
[[fallthrough]];
Expand Down Expand Up @@ -320,7 +319,7 @@ namespace pub

// Find a session by session ID
auto session = stream->GetSession(push->GetSessionId());
if (session == nullptr)
if (session == nullptr || push->GetSessionId() == 0)
{
session = stream->CreatePushSession(push);
if (session == nullptr)
Expand Down Expand Up @@ -451,6 +450,7 @@ namespace pub

push->SetId(id);
push->SetEnable(enable);
push->SetByConfig(true);
push->SetVhost(GetVHostAppName().GetVHostName());
push->SetApplication(GetVHostAppName().GetAppName());
push->SetStreamName(stream_info->GetName().CStr());
Expand Down
10 changes: 8 additions & 2 deletions src/projects/publishers/push/push_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ namespace pub
_push(push),
_writer(nullptr)
{
MonitorInstance->OnSessionConnected(*stream, PublisherType::Push);
}

PushSession::~PushSession()
{
Stop();
logtd("PushSession(%d) has been terminated finally", GetId());
MonitorInstance->OnSessionDisconnected(*GetStream(), PublisherType::Push);
}

bool PushSession::Start()
Expand Down Expand Up @@ -204,7 +206,9 @@ namespace pub
return;
}

bool ret = writer->SendPacket(session_packet);
uint64_t sent_bytes = 0;

bool ret = writer->SendPacket(session_packet, &sent_bytes);
if (ret == false)
{
logte("Failed to send packet");
Expand All @@ -218,7 +222,9 @@ namespace pub
}

GetPush()->UpdatePushTime();
GetPush()->IncreasePushBytes(session_packet->GetData()->GetLength());
GetPush()->IncreasePushBytes(sent_bytes);

MonitorInstance->IncreaseBytesOut(*GetStream(), PublisherType::Push, sent_bytes);
}

std::shared_ptr<ffmpeg::Writer> PushSession::CreateWriter()
Expand Down
3 changes: 0 additions & 3 deletions src/projects/publishers/push/push_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ namespace pub
auto stream_packet = std::make_any<std::shared_ptr<MediaPacket>>(media_packet);

BroadcastPacket(stream_packet);

// TODO(Keukhan): Because the transmission size varies for each session, it needs to be improved
MonitorInstance->IncreaseBytesOut(*pub::Stream::GetSharedPtrAs<info::Stream>(), PublisherType::Push, media_packet->GetData()->GetLength() * GetSessionCount());
}

void PushStream::SendVideoFrame(const std::shared_ptr<MediaPacket> &media_packet)
Expand Down

0 comments on commit 963b27a

Please sign in to comment.