Skip to content

Commit

Permalink
[CPU] Avoid extra processing of the sync nodes in scope of Infer call (
Browse files Browse the repository at this point in the history
…openvinotoolkit#24227)

### Details:
 - To reduce Infer overhead
 - The new flow is:
   1) Define sync points from all the nodes perspective.
   2) Use sync points in scope of Allocate
3) Define executable sync points from overall sync points but
considering only executable nodes.
4) Keep executable sync points ordered to avoid additional logic in
scope of Infer call.

### Tickets:
 - *ticket-id*
  • Loading branch information
EgorDuplensky authored May 24, 2024
1 parent cfb05bf commit 0297fac
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 77 deletions.
157 changes: 87 additions & 70 deletions src/plugins/intel_cpu/src/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,71 @@ void Graph::GroupParallelNodes() {
}
}

static std::vector<size_t> IdentifySyncPoints(const std::vector<NodePtr>& graphNodes) {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::IdentifySyncPoints");
std::vector<size_t> syncNodesInds;

for (size_t i = 0; i < graphNodes.size(); ++i) {
const auto& node = graphNodes[i];

if (!node->isDynamicNode())
continue;

if (node->outputShapeDataDependency() ||
// WA: for convolution plus sum(broadcast). Due to the fact that a convolution with sum use the same memory for second sum term and the output
// tensors (inPlace) resizing the output tensor, may lead to reallocation of this second term memory and possible data lost. The reallocation
// may happen when the second term shape is broadcasted to the output tensor shape. To avoid the data loss, we have a special processing for
// such cases inside the convolution node, but it works properly only when dynamic shapes inference, preparation and execution a called
// for this node sequentially.
(node->getType() == Type::Convolution && node->isInPlace()) ||
// Due to the special handling of the internal states and initialization subgraphs, MemoryInput nodes must
// be processed as a internal dynamism node, allowing to hide the aforementioned complexity inside the
// MemoryInput::executeDynamic implementation
(node->getType() == Type::MemoryInput)) {
syncNodesInds.push_back(i);
}
}

return syncNodesInds;
}

static std::tuple<std::vector<NodePtr>, std::vector<size_t>> ExtractExecutableNodesAndSyncPoints(const std::vector<size_t>& syncNodesInds,
const std::vector<NodePtr>& graphNodes) {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ExtractExecutableNodesAndSyncPoints");
std::unordered_map<size_t, size_t> graphIdToExecutableId;
std::vector<NodePtr> executableGraphNodes;
for (size_t i = 0; i < graphNodes.size(); i++) {
const auto& graphNode = graphNodes[i];
if ((!graphNode->isConstant() && CPU_DEBUG_CAPS_ALWAYS_TRUE(graphNode->isExecutable())) || graphNode->isDynamicNode()) {
/* @todo
* Revise implementation.
* With current way it is possible that with debug_caps enabled
* we execute a node, which is not ready to be executed
*/
graphIdToExecutableId[i] = executableGraphNodes.size();
executableGraphNodes.emplace_back(graphNode);
}
}

// use set to ensure sorted unique sync entries
std::set<size_t> uniqueExecutableSyncNodesInds;
for (const auto& syncNodesInd : syncNodesInds) {
auto it = graphIdToExecutableId.find(syncNodesInd);
if (it != graphIdToExecutableId.end()) {
uniqueExecutableSyncNodesInds.insert(it->second);
// since sometimes we need to run the synchronization node alone (for example in the case of internal dynamism)
// let's add another sync index after the sync point node
uniqueExecutableSyncNodesInds.insert(it->second + 1);
}
}
uniqueExecutableSyncNodesInds.insert(executableGraphNodes.size());
// convert to a vector to reduce runtime overhead
std::vector<size_t> executableSyncNodesInds(uniqueExecutableSyncNodesInds.begin(), uniqueExecutableSyncNodesInds.end());

return std::make_tuple(std::move(executableGraphNodes),
std::move(executableSyncNodesInds));
}

void Graph::InitGraph(bool optimize) {
DEBUG_LOG("Initializing graph with name: ", GetName());

Expand Down Expand Up @@ -284,9 +349,10 @@ void Graph::InitGraph(bool optimize) {

SortTopologically();

const auto hasDynNodes = ProcessDynNodes();
const bool hasDynNodes = ProcessDynNodes();
const auto syncNodesInds = hasDynNodes ? IdentifySyncPoints(graphNodes) : std::vector<size_t>{};

Allocate();
Allocate(syncNodesInds);

CreatePrimitivesAndExecConstants();

Expand All @@ -296,7 +362,7 @@ void Graph::InitGraph(bool optimize) {
}
#endif

ExtractExecutableNodes();
std::tie(m_executableGraphNodes, m_executableSyncNodesInds) = ExtractExecutableNodesAndSyncPoints(syncNodesInds, graphNodes);

status = hasDynNodes ? Status::ReadyDynamic : Status::ReadyStatic;

Expand Down Expand Up @@ -369,24 +435,6 @@ void Graph::InitOptimalPrimitiveDescriptors() {
}
}

void Graph::ExtractExecutableNodes() {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ExtractExecutableNodes");
for (const auto& graphNode : graphNodes) {
if ((!graphNode->isConstant() && CPU_DEBUG_CAPS_ALWAYS_TRUE(graphNode->isExecutable())) || graphNode->isDynamicNode()) {
/* @todo
* Revise implementation.
* With current way it is possible that with debug_caps enabled
* we execute a node, which is not ready to be executed
*/
auto itr = syncNodesInds.find(graphNode.get());
if (itr != syncNodesInds.end()) {
itr->second = executableGraphNodes.size();
}
executableGraphNodes.emplace_back(graphNode);
}
}
}

void Graph::CreatePrimitivesAndExecConstants() const {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::CreatePrimitivesAndExecConstants");
dnnl::stream stream(getEngine());
Expand Down Expand Up @@ -640,7 +688,7 @@ static edge_clusters_t findEdgeClusters(const std::vector<EdgePtr> & graphEdges)
return edge_clusters;
}

void Graph::AllocateWithReuse() {
void Graph::AllocateWithReuse(const std::vector<size_t>& syncNodesInds) {
edge_clusters_t edge_clusters = findEdgeClusters(graphEdges);

size_t remaining_edge_clusters_count = edge_clusters.size();
Expand Down Expand Up @@ -828,19 +876,14 @@ void Graph::AllocateWithReuse() {
if (!syncNodesInds.empty()) {
//We have to extend the lifespan of tensors that are crossing a sync point border in order to save
//the intermediate computation results from possible loss due to the tensor resize
std::vector<int> vecIntervals = {0};
for (const auto& item : syncNodesInds) {
vecIntervals.push_back(item.first->execIndex);
}
std::sort(vecIntervals.begin(), vecIntervals.end());
for (auto& box : undefinedBoxes) {
if (-1 == box.finish) {
continue;
}
auto itr_upper = std::upper_bound(vecIntervals.begin(), vecIntervals.end(), box.finish, [](int y, int x) { return y <= x;});
auto itr_lower = std::lower_bound(vecIntervals.begin(), vecIntervals.end(), box.start);
auto itr_upper = std::upper_bound(syncNodesInds.begin(), syncNodesInds.end(), box.finish, [](int y, int x) { return y <= x;});
auto itr_lower = std::lower_bound(syncNodesInds.begin(), syncNodesInds.end(), box.start);
if (itr_lower != itr_upper) { // across sections
if (itr_upper == vecIntervals.end()) {
if (itr_upper == syncNodesInds.end()) {
box.finish = -1;
} else {
box.finish = *itr_upper;
Expand Down Expand Up @@ -920,7 +963,7 @@ void Graph::AllocateWithReuse() {
}
}

void Graph::Allocate() {
void Graph::Allocate(const std::vector<size_t>& syncNodesInds) {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::Allocate");

//resolve inplace dead end nodes
Expand All @@ -944,7 +987,7 @@ void Graph::Allocate() {
for (auto& edge : graphEdges) edge->init();

// Allocate memory space for all edges marked with NeedAllocation
AllocateWithReuse();
AllocateWithReuse(syncNodesInds);

// Check all getters. Should work.
for (auto& edge : graphEdges) edge->validate();
Expand All @@ -953,35 +996,18 @@ void Graph::Allocate() {
bool Graph::ProcessDynNodes() {
OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ProcessDynNodes");

bool result = false;
for (size_t i = 0; i < graphNodes.size(); ++i) {
const auto& node = graphNodes[i];
if (node->isDynamicNode()) {
result = true;
if (node->outputShapeDataDependency() ||
// WA: for convolution plus sum(broadcast). Due to the fact that a convolution with sum use the same memory for second sum term and the output
// tensors (inPlace) resizing the output tensor, may lead to reallocation of this second term memory and possible data lost. The reallocation
// may happen when the second term shape is broadcasted to the output tensor shape. To avoid the data loss, we have a special processing for
// such cases inside the convolution node, but it works properly only when dynamic shapes inference, preparation and execution a called
// for this node sequentially.
(node->getType() == Type::Convolution && node->isInPlace()) ||
// Due to the special handling of the internal states and initialization subgraphs, MemoryInput nodes must
// be processed as a internal dynamism node, allowing to hide the aforementioned complexity inside the
// MemoryInput::executeDynamic implementation
(node->getType() == Type::MemoryInput)) {
syncNodesInds.insert({node.get(), i});
}
}
}

const bool containsDynamicNodes = std::any_of(graphNodes.begin(), graphNodes.end(), [](const NodePtr& node) {
return node->isDynamicNode();
});
// In case of dynamic shapes, tensors may be resized due to the shapes variations.
// If the input tensor is included to memory reuse, it means that its memory manager is shared with other tensors in the graph, which in turn may cause data
// loss when one of the tensors down the graph requests mem resize, while the input data have not been yet read by the consumers. To avoid such situations
// we disable io mem reuse for the case of dynamic shapes.
if (result) {
if (containsDynamicNodes) {
this->reuse_io_tensors = false;
}
return result;

return containsDynamicNodes;
}

void Graph::PushInputData(const std::size_t& index, const ov::SoPtr<ITensor>& input) {
Expand Down Expand Up @@ -1097,7 +1123,7 @@ void Graph::PullOutputData(std::unordered_map<std::size_t, ov::SoPtr<ITensor>>&
void Graph::InferStatic(SyncInferRequest* request) {
dnnl::stream stream(getEngine());

for (const auto& node : executableGraphNodes) {
for (const auto& node : m_executableGraphNodes) {
VERBOSE(node, getConfig().debugCaps.verbose);
PERF(node, getConfig().collectPerfCounters);

Expand Down Expand Up @@ -1316,27 +1342,18 @@ class UpdateNodes : public UpdateNodesBase {
void Graph::InferDynamic(SyncInferRequest* request) {
dnnl::stream stream(getEngine());

std::set<size_t> syncIndsWorkSet;
for (const auto& nodeIndx : syncNodesInds) {
syncIndsWorkSet.insert(nodeIndx.second);
//since sometimes we need to run the synchronization node alone (for example in the case of internal dynamism)
//let's add another sync index after the sync point node
syncIndsWorkSet.insert(nodeIndx.second + 1);
}
syncIndsWorkSet.insert(executableGraphNodes.size());

std::unique_ptr<IUpdateNodes> updateNodes{};
if (parallel_get_max_threads() > 1) {
updateNodes.reset(new UpdateNodes(executableGraphNodes));
updateNodes.reset(new UpdateNodes(m_executableGraphNodes));
} else {
updateNodes.reset(new UpdateNodesSeq(executableGraphNodes));
updateNodes.reset(new UpdateNodesSeq(m_executableGraphNodes));
}
size_t inferCounter = 0;

for (auto stopIndx : syncIndsWorkSet) {
size_t inferCounter = 0;
for (auto stopIndx : m_executableSyncNodesInds) {
updateNodes->run(stopIndx);
for (; inferCounter < stopIndx; ++inferCounter) {
auto& node = executableGraphNodes[inferCounter];
auto& node = m_executableGraphNodes[inferCounter];
VERBOSE(node, getConfig().debugCaps.verbose);
PERF(node, getConfig().collectPerfCounters);

Expand Down
12 changes: 5 additions & 7 deletions src/plugins/intel_cpu/src/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class Graph {
outputNodesMap.clear();
graphNodes.clear();
graphEdges.clear();
syncNodesInds.clear();
m_executableSyncNodesInds.clear();
}
Status status { Status::NotReady };

Expand Down Expand Up @@ -223,9 +223,8 @@ class Graph {
void ResolveComplexInplaceConflicts();
bool ProcessDynNodes();
void GroupParallelNodes();
void Allocate();
void AllocateWithReuse();
void ExtractExecutableNodes();
void Allocate(const std::vector<size_t>& syncNodesInds);
void AllocateWithReuse(const std::vector<size_t>& syncNodesInds);
void ExecuteNode(const NodePtr& node, const dnnl::stream& stream) const;
void CreatePrimitivesAndExecConstants() const;
void InferStatic(SyncInferRequest* request);
Expand All @@ -247,9 +246,8 @@ class Graph {
// these node pointers (from graphNodes) are to avoid regular checking for
// constantness of nodes in Infer methods and calls of
// non-executable (optimized out) nodes, such as Input, Reshape, etc.
std::vector<NodePtr> executableGraphNodes;

std::unordered_map<Node*, size_t> syncNodesInds;
std::vector<NodePtr> m_executableGraphNodes;
std::vector<size_t> m_executableSyncNodesInds;

GraphContext::CPtr context;

Expand Down

0 comments on commit 0297fac

Please sign in to comment.