Skip to content

Commit

Permalink
Fix: Add mutex lock for state completion check in gRPC streaming to p…
Browse files Browse the repository at this point in the history
…revent race condition (#7617)
  • Loading branch information
pskiran1 committed Sep 17, 2024
1 parent 29c7a28 commit 9246e2c
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/grpc/stream_infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,18 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok)
} else if (state->step_ == Steps::WRITEREADY) {
// Finish the state if all the transactions associated with
// the state have completed.
if (state->IsComplete()) {
state->context_->DecrementRequestCounter();
finished = Finish(state);
} else {
LOG_ERROR << "Should not print this! Decoupled should NOT write via "
"WRITEREADY!";
// Remove the state from the completion queue
std::lock_guard<std::recursive_mutex> lock(state->step_mtx_);
state->step_ = Steps::ISSUED;
std::lock_guard<std::recursive_mutex> lk1(state->context_->mu_);
{
if (state->IsComplete()) {
state->context_->DecrementRequestCounter();
finished = Finish(state);
} else {
LOG_ERROR << "Should not print this! Decoupled should NOT write via "
"WRITEREADY!";
// Remove the state from the completion queue
std::lock_guard<std::recursive_mutex> lock(state->step_mtx_);
state->step_ = Steps::ISSUED;
}
}
}
}
Expand Down

0 comments on commit 9246e2c

Please sign in to comment.