From 0297faca9fa5fcab66fcd724a769e8fb0cad2f0d Mon Sep 17 00:00:00 2001 From: Egor Duplenskii Date: Fri, 24 May 2024 12:00:45 +0200 Subject: [PATCH] [CPU] Avoid extra processing of the sync nodes in scope of Infer call (#24227) ### Details: - To reduce Infer overhead - The new flow is: 1) Define sync points from all the nodes perspective. 2) Use sync points in scope of Allocate 3) Define executable sync points from overall sync points but considering only executable nodes. 4) Keep executable sync points ordered to avoid additional logic in scope of Infer call. ### Tickets: - *ticket-id* --- src/plugins/intel_cpu/src/graph.cpp | 157 +++++++++++++++------------- src/plugins/intel_cpu/src/graph.h | 12 +-- 2 files changed, 92 insertions(+), 77 deletions(-) diff --git a/src/plugins/intel_cpu/src/graph.cpp b/src/plugins/intel_cpu/src/graph.cpp index d12fd5f978c415..b3f23d2c6cc824 100644 --- a/src/plugins/intel_cpu/src/graph.cpp +++ b/src/plugins/intel_cpu/src/graph.cpp @@ -253,6 +253,71 @@ void Graph::GroupParallelNodes() { } } +static std::vector IdentifySyncPoints(const std::vector& graphNodes) { + OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::IdentifySyncPoints"); + std::vector syncNodesInds; + + for (size_t i = 0; i < graphNodes.size(); ++i) { + const auto& node = graphNodes[i]; + + if (!node->isDynamicNode()) + continue; + + if (node->outputShapeDataDependency() || + // WA: for convolution plus sum(broadcast). Due to the fact that a convolution with sum use the same memory for second sum term and the output + // tensors (inPlace) resizing the output tensor, may lead to reallocation of this second term memory and possible data lost. The reallocation + // may happen when the second term shape is broadcasted to the output tensor shape. To avoid the data loss, we have a special processing for + // such cases inside the convolution node, but it works properly only when dynamic shapes inference, preparation and execution a called + // for this node sequentially. + (node->getType() == Type::Convolution && node->isInPlace()) || + // Due to the special handling of the internal states and initialization subgraphs, MemoryInput nodes must + // be processed as a internal dynamism node, allowing to hide the aforementioned complexity inside the + // MemoryInput::executeDynamic implementation + (node->getType() == Type::MemoryInput)) { + syncNodesInds.push_back(i); + } + } + + return syncNodesInds; +} + +static std::tuple, std::vector> ExtractExecutableNodesAndSyncPoints(const std::vector& syncNodesInds, + const std::vector& graphNodes) { + OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ExtractExecutableNodesAndSyncPoints"); + std::unordered_map graphIdToExecutableId; + std::vector executableGraphNodes; + for (size_t i = 0; i < graphNodes.size(); i++) { + const auto& graphNode = graphNodes[i]; + if ((!graphNode->isConstant() && CPU_DEBUG_CAPS_ALWAYS_TRUE(graphNode->isExecutable())) || graphNode->isDynamicNode()) { + /* @todo + * Revise implementation. + * With current way it is possible that with debug_caps enabled + * we execute a node, which is not ready to be executed + */ + graphIdToExecutableId[i] = executableGraphNodes.size(); + executableGraphNodes.emplace_back(graphNode); + } + } + + // use set to ensure sorted unique sync entries + std::set uniqueExecutableSyncNodesInds; + for (const auto& syncNodesInd : syncNodesInds) { + auto it = graphIdToExecutableId.find(syncNodesInd); + if (it != graphIdToExecutableId.end()) { + uniqueExecutableSyncNodesInds.insert(it->second); + // since sometimes we need to run the synchronization node alone (for example in the case of internal dynamism) + // let's add another sync index after the sync point node + uniqueExecutableSyncNodesInds.insert(it->second + 1); + } + } + uniqueExecutableSyncNodesInds.insert(executableGraphNodes.size()); + // convert to a vector to reduce runtime overhead + std::vector executableSyncNodesInds(uniqueExecutableSyncNodesInds.begin(), uniqueExecutableSyncNodesInds.end()); + + return std::make_tuple(std::move(executableGraphNodes), + std::move(executableSyncNodesInds)); +} + void Graph::InitGraph(bool optimize) { DEBUG_LOG("Initializing graph with name: ", GetName()); @@ -284,9 +349,10 @@ void Graph::InitGraph(bool optimize) { SortTopologically(); - const auto hasDynNodes = ProcessDynNodes(); + const bool hasDynNodes = ProcessDynNodes(); + const auto syncNodesInds = hasDynNodes ? IdentifySyncPoints(graphNodes) : std::vector{}; - Allocate(); + Allocate(syncNodesInds); CreatePrimitivesAndExecConstants(); @@ -296,7 +362,7 @@ void Graph::InitGraph(bool optimize) { } #endif - ExtractExecutableNodes(); + std::tie(m_executableGraphNodes, m_executableSyncNodesInds) = ExtractExecutableNodesAndSyncPoints(syncNodesInds, graphNodes); status = hasDynNodes ? Status::ReadyDynamic : Status::ReadyStatic; @@ -369,24 +435,6 @@ void Graph::InitOptimalPrimitiveDescriptors() { } } -void Graph::ExtractExecutableNodes() { - OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ExtractExecutableNodes"); - for (const auto& graphNode : graphNodes) { - if ((!graphNode->isConstant() && CPU_DEBUG_CAPS_ALWAYS_TRUE(graphNode->isExecutable())) || graphNode->isDynamicNode()) { - /* @todo - * Revise implementation. - * With current way it is possible that with debug_caps enabled - * we execute a node, which is not ready to be executed - */ - auto itr = syncNodesInds.find(graphNode.get()); - if (itr != syncNodesInds.end()) { - itr->second = executableGraphNodes.size(); - } - executableGraphNodes.emplace_back(graphNode); - } - } -} - void Graph::CreatePrimitivesAndExecConstants() const { OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::CreatePrimitivesAndExecConstants"); dnnl::stream stream(getEngine()); @@ -640,7 +688,7 @@ static edge_clusters_t findEdgeClusters(const std::vector & graphEdges) return edge_clusters; } -void Graph::AllocateWithReuse() { +void Graph::AllocateWithReuse(const std::vector& syncNodesInds) { edge_clusters_t edge_clusters = findEdgeClusters(graphEdges); size_t remaining_edge_clusters_count = edge_clusters.size(); @@ -828,19 +876,14 @@ void Graph::AllocateWithReuse() { if (!syncNodesInds.empty()) { //We have to extend the lifespan of tensors that are crossing a sync point border in order to save //the intermediate computation results from possible loss due to the tensor resize - std::vector vecIntervals = {0}; - for (const auto& item : syncNodesInds) { - vecIntervals.push_back(item.first->execIndex); - } - std::sort(vecIntervals.begin(), vecIntervals.end()); for (auto& box : undefinedBoxes) { if (-1 == box.finish) { continue; } - auto itr_upper = std::upper_bound(vecIntervals.begin(), vecIntervals.end(), box.finish, [](int y, int x) { return y <= x;}); - auto itr_lower = std::lower_bound(vecIntervals.begin(), vecIntervals.end(), box.start); + auto itr_upper = std::upper_bound(syncNodesInds.begin(), syncNodesInds.end(), box.finish, [](int y, int x) { return y <= x;}); + auto itr_lower = std::lower_bound(syncNodesInds.begin(), syncNodesInds.end(), box.start); if (itr_lower != itr_upper) { // across sections - if (itr_upper == vecIntervals.end()) { + if (itr_upper == syncNodesInds.end()) { box.finish = -1; } else { box.finish = *itr_upper; @@ -920,7 +963,7 @@ void Graph::AllocateWithReuse() { } } -void Graph::Allocate() { +void Graph::Allocate(const std::vector& syncNodesInds) { OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::Allocate"); //resolve inplace dead end nodes @@ -944,7 +987,7 @@ void Graph::Allocate() { for (auto& edge : graphEdges) edge->init(); // Allocate memory space for all edges marked with NeedAllocation - AllocateWithReuse(); + AllocateWithReuse(syncNodesInds); // Check all getters. Should work. for (auto& edge : graphEdges) edge->validate(); @@ -953,35 +996,18 @@ void Graph::Allocate() { bool Graph::ProcessDynNodes() { OV_ITT_SCOPE(FIRST_INFERENCE, itt::domains::intel_cpu_LT, "Graph::ProcessDynNodes"); - bool result = false; - for (size_t i = 0; i < graphNodes.size(); ++i) { - const auto& node = graphNodes[i]; - if (node->isDynamicNode()) { - result = true; - if (node->outputShapeDataDependency() || - // WA: for convolution plus sum(broadcast). Due to the fact that a convolution with sum use the same memory for second sum term and the output - // tensors (inPlace) resizing the output tensor, may lead to reallocation of this second term memory and possible data lost. The reallocation - // may happen when the second term shape is broadcasted to the output tensor shape. To avoid the data loss, we have a special processing for - // such cases inside the convolution node, but it works properly only when dynamic shapes inference, preparation and execution a called - // for this node sequentially. - (node->getType() == Type::Convolution && node->isInPlace()) || - // Due to the special handling of the internal states and initialization subgraphs, MemoryInput nodes must - // be processed as a internal dynamism node, allowing to hide the aforementioned complexity inside the - // MemoryInput::executeDynamic implementation - (node->getType() == Type::MemoryInput)) { - syncNodesInds.insert({node.get(), i}); - } - } - } - + const bool containsDynamicNodes = std::any_of(graphNodes.begin(), graphNodes.end(), [](const NodePtr& node) { + return node->isDynamicNode(); + }); // In case of dynamic shapes, tensors may be resized due to the shapes variations. // If the input tensor is included to memory reuse, it means that its memory manager is shared with other tensors in the graph, which in turn may cause data // loss when one of the tensors down the graph requests mem resize, while the input data have not been yet read by the consumers. To avoid such situations // we disable io mem reuse for the case of dynamic shapes. - if (result) { + if (containsDynamicNodes) { this->reuse_io_tensors = false; } - return result; + + return containsDynamicNodes; } void Graph::PushInputData(const std::size_t& index, const ov::SoPtr& input) { @@ -1097,7 +1123,7 @@ void Graph::PullOutputData(std::unordered_map>& void Graph::InferStatic(SyncInferRequest* request) { dnnl::stream stream(getEngine()); - for (const auto& node : executableGraphNodes) { + for (const auto& node : m_executableGraphNodes) { VERBOSE(node, getConfig().debugCaps.verbose); PERF(node, getConfig().collectPerfCounters); @@ -1316,27 +1342,18 @@ class UpdateNodes : public UpdateNodesBase { void Graph::InferDynamic(SyncInferRequest* request) { dnnl::stream stream(getEngine()); - std::set syncIndsWorkSet; - for (const auto& nodeIndx : syncNodesInds) { - syncIndsWorkSet.insert(nodeIndx.second); - //since sometimes we need to run the synchronization node alone (for example in the case of internal dynamism) - //let's add another sync index after the sync point node - syncIndsWorkSet.insert(nodeIndx.second + 1); - } - syncIndsWorkSet.insert(executableGraphNodes.size()); - std::unique_ptr updateNodes{}; if (parallel_get_max_threads() > 1) { - updateNodes.reset(new UpdateNodes(executableGraphNodes)); + updateNodes.reset(new UpdateNodes(m_executableGraphNodes)); } else { - updateNodes.reset(new UpdateNodesSeq(executableGraphNodes)); + updateNodes.reset(new UpdateNodesSeq(m_executableGraphNodes)); } - size_t inferCounter = 0; - for (auto stopIndx : syncIndsWorkSet) { + size_t inferCounter = 0; + for (auto stopIndx : m_executableSyncNodesInds) { updateNodes->run(stopIndx); for (; inferCounter < stopIndx; ++inferCounter) { - auto& node = executableGraphNodes[inferCounter]; + auto& node = m_executableGraphNodes[inferCounter]; VERBOSE(node, getConfig().debugCaps.verbose); PERF(node, getConfig().collectPerfCounters); diff --git a/src/plugins/intel_cpu/src/graph.h b/src/plugins/intel_cpu/src/graph.h index 876f5d571a6015..1a08446b59d9f6 100644 --- a/src/plugins/intel_cpu/src/graph.h +++ b/src/plugins/intel_cpu/src/graph.h @@ -195,7 +195,7 @@ class Graph { outputNodesMap.clear(); graphNodes.clear(); graphEdges.clear(); - syncNodesInds.clear(); + m_executableSyncNodesInds.clear(); } Status status { Status::NotReady }; @@ -223,9 +223,8 @@ class Graph { void ResolveComplexInplaceConflicts(); bool ProcessDynNodes(); void GroupParallelNodes(); - void Allocate(); - void AllocateWithReuse(); - void ExtractExecutableNodes(); + void Allocate(const std::vector& syncNodesInds); + void AllocateWithReuse(const std::vector& syncNodesInds); void ExecuteNode(const NodePtr& node, const dnnl::stream& stream) const; void CreatePrimitivesAndExecConstants() const; void InferStatic(SyncInferRequest* request); @@ -247,9 +246,8 @@ class Graph { // these node pointers (from graphNodes) are to avoid regular checking for // constantness of nodes in Infer methods and calls of // non-executable (optimized out) nodes, such as Input, Reshape, etc. - std::vector executableGraphNodes; - - std::unordered_map syncNodesInds; + std::vector m_executableGraphNodes; + std::vector m_executableSyncNodesInds; GraphContext::CPtr context;