Skip to content

Commit

Permalink
Refactoring to monitor the usage of transcoder threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keukhan committed Oct 16, 2024
1 parent f37ed41 commit 76e918f
Show file tree
Hide file tree
Showing 73 changed files with 933 additions and 913 deletions.
84 changes: 84 additions & 0 deletions src/projects/base/ovlibrary/future.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//==============================================================================
//
// OvenMediaEngine
//
// Created by getroot
// Copyright (c) 2018 AirenSoft. All rights reserved.
//
//==============================================================================
#pragma once

#include <mutex>
#include <condition_variable>

namespace ov
{

class Future
{
public:
void Stop()
{
std::unique_lock<decltype(_mutex)> lock(_mutex);

_stop_flag = true;

_condition.notify_all();
}

// template <typename T>
bool Submit(bool result)
{
std::unique_lock<decltype(_mutex)> lock(_mutex);

_result = result;
_condition.notify_all();

return _result;
}

// template <typename T>
bool Get()
{
std::unique_lock<decltype(_mutex)> lock(_mutex);

_condition.wait(lock);

if (_stop_flag)
{
return false;
}

return _result;
}

// return false : timed out, return true : signalled
// template <typename T>
bool GetFor(uint32_t timeout_delta_msec)
{
std::unique_lock<decltype(_mutex)> lock(_mutex);

while (!_stop_flag)
{
auto result = _condition.wait_for(lock, std::chrono::milliseconds(timeout_delta_msec));
if (result == std::cv_status::timeout)
{
return false;
}
}

if (_stop_flag)
{
return false;
}

return _result;
}

private:
std::mutex _mutex;
std::condition_variable _condition;
bool _stop_flag = false;
bool _result = false;
};
}
1 change: 1 addition & 0 deletions src/projects/base/ovlibrary/ovlibrary.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "./random.h"
#include "./regex.h"
#include "./semaphore.h"
#include "./future.h"
#include "./singleton.h"
#include "./stack_trace.h"
#include "./stop_watch.h"
Expand Down
36 changes: 9 additions & 27 deletions src/projects/transcoder/codec/decoder/decoder_aac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,8 @@
#include "../../transcoder_private.h"
#include "base/info/application.h"

bool DecoderAAC::Configure(std::shared_ptr<MediaTrack> context)
bool DecoderAAC::InitCodec()
{
if (TranscodeDecoder::Configure(context) == false)
{
return false;
}

const AVCodec *_codec = ::avcodec_find_decoder(GetCodecID());
if (_codec == nullptr)
{
Expand All @@ -40,7 +35,6 @@ bool DecoderAAC::Configure(std::shared_ptr<MediaTrack> context)
return false;
}

// Create packet parser
_parser = ::av_parser_init(GetCodecID());
if (_parser == nullptr)
{
Expand All @@ -49,27 +43,17 @@ bool DecoderAAC::Configure(std::shared_ptr<MediaTrack> context)
}

_parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;

// Generates a thread that reads and encodes frames in the input_buffer queue and places them in the output queue.
try
{
_kill_flag = false;

_codec_thread = std::thread(&TranscodeDecoder::CodecThread, this);
pthread_setname_np(_codec_thread.native_handle(), ov::String::FormatString("Dec%s", avcodec_get_name(GetCodecID())).CStr());
}
catch (const std::system_error &e)
{
logte("Failed to start decoder thread");
_kill_flag = true;
return false;
}

return true;
}

void DecoderAAC::CodecThread()
{
// Initialize the codec and notify the main thread.
if(_codec_init_event.Submit(InitCodec()) == false)
{
return;
}

bool no_data_to_encode = false;

while (!_kill_flag)
Expand Down Expand Up @@ -104,7 +88,7 @@ void DecoderAAC::CodecThread()
if (_pkt_offset < _cur_data->GetLength())
{
_pkt->size = 0;

int32_t parsed_size = ::av_parser_parse2(
_parser,
_context,
Expand Down Expand Up @@ -143,7 +127,6 @@ void DecoderAAC::CodecThread()
int ret = ::avcodec_send_packet(_context, _pkt);
if (ret == AVERROR(EAGAIN))
{

}
else if (ret == AVERROR_EOF)
{
Expand Down Expand Up @@ -220,9 +203,8 @@ void DecoderAAC::CodecThread()
_frame->pkt_duration = ffmpeg::Conv::GetDurationPerFrame(cmn::MediaType::Audio, GetRefTrack(), _frame);
}


// If the decoded audio frame does not have a PTS, Increase frame duration time in PTS of previous frame
if(_frame->pts == AV_NOPTS_VALUE)
if (_frame->pts == AV_NOPTS_VALUE)
{
_frame->pts = _last_pkt_pts + _frame->pkt_duration;
}
Expand Down
2 changes: 1 addition & 1 deletion src/projects/transcoder/codec/decoder/decoder_aac.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DecoderAAC : public TranscodeDecoder

int64_t _last_pkt_pts = 0;

bool Configure(std::shared_ptr<MediaTrack> context) override;
bool InitCodec();

void CodecThread() override;

Expand Down
66 changes: 24 additions & 42 deletions src/projects/transcoder/codec/decoder/decoder_avc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,6 @@
#include "../../transcoder_private.h"
#include "base/info/application.h"

bool DecoderAVC::Configure(std::shared_ptr<MediaTrack> context)
{
if (TranscodeDecoder::Configure(context) == false)
{
return false;
}

// Create packet parser
_parser = ::av_parser_init(GetCodecID());
if (_parser == nullptr)
{
logte("Parser not found");
return false;
}
_parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;

// Initialize codec
if (InitCodec() == false)
{
return false;
}

try
{
_kill_flag = false;

_codec_thread = std::thread(&TranscodeDecoder::CodecThread, this);
pthread_setname_np(_codec_thread.native_handle(), ov::String::FormatString("Dec%s", avcodec_get_name(GetCodecID())).CStr());
}
catch (const std::system_error &e)
{
logte("Failed to start decoder thread");
_kill_flag = true;
return false;
}

return true;
}


bool DecoderAVC::InitCodec()
{
Expand Down Expand Up @@ -84,6 +45,14 @@ bool DecoderAVC::InitCodec()
return false;
}

_parser = ::av_parser_init(GetCodecID());
if (_parser == nullptr)
{
logte("Parser not found");
return false;
}
_parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;


_change_format = false;

Expand All @@ -92,10 +61,17 @@ bool DecoderAVC::InitCodec()

void DecoderAVC::UninitCodec()
{
::avcodec_close(_context);
::avcodec_free_context(&_context);

if (_context != nullptr)
{
::avcodec_free_context(&_context);
}
_context = nullptr;

if (_parser != nullptr)
{
::av_parser_close(_parser);
}
_parser = nullptr;
}

bool DecoderAVC::ReinitCodecIfNeed()
Expand All @@ -117,6 +93,12 @@ bool DecoderAVC::ReinitCodecIfNeed()

void DecoderAVC::CodecThread()
{
// Initialize the codec and notify the main thread.
if(_codec_init_event.Submit(InitCodec()) == false)
{
return;
}

while (!_kill_flag)
{
auto obj = _input_buffer.Dequeue();
Expand Down
2 changes: 0 additions & 2 deletions src/projects/transcoder/codec/decoder/decoder_avc.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class DecoderAVC : public TranscodeDecoder
return AV_CODEC_ID_H264;
}

bool Configure(std::shared_ptr<MediaTrack> context) override;

bool InitCodec();
void UninitCodec();
bool ReinitCodecIfNeed();
Expand Down
Loading

0 comments on commit 76e918f

Please sign in to comment.