Skip to content

Commit

Permalink
fix: use released pipeline pointer (#1978)
Browse files Browse the repository at this point in the history
* fix: use released pipeline pointer

* fix
  • Loading branch information
Abingcbc authored Dec 18, 2024
1 parent 95f2530 commit 1d02270
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
13 changes: 12 additions & 1 deletion core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ void ProcessorRunner::Run(uint32_t threadNo) {
sInGroupDataSizeBytes->Add(item->mEventGroup.DataSize());

shared_ptr<Pipeline>& pipeline = item->mPipeline;
if (!pipeline) {
bool hasOldPipeline = pipeline != nullptr;
if (!hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
}
if (!pipeline) {
Expand All @@ -139,6 +140,16 @@ void ProcessorRunner::Run(uint32_t threadNo) {
vector<PipelineEventGroup> eventGroupList;
eventGroupList.emplace_back(std::move(item->mEventGroup));
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
"discard data")("config", configName));
continue;
}
}

if (pipeline->IsFlushingThroughGoPipeline()) {
// TODO:
Expand Down
1 change: 1 addition & 0 deletions core/runner/sink/http/HttpSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) {
CURL* handler = msg->easy_handle;
HttpSinkRequest* request = nullptr;
curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request);
auto pipelinePlaceHolder = request->mItem->mPipeline; // keep pipeline alive
auto responseTime = chrono::system_clock::now() - request->mLastSendTime;
auto responseTimeMs = chrono::duration_cast<chrono::milliseconds>(responseTime).count();
switch (msg->data.result) {
Expand Down

0 comments on commit 1d02270

Please sign in to comment.