From 45dca6a9266cf20d1566a0fb7187c7e9853cab27 Mon Sep 17 00:00:00 2001 From: Yahav Itzhak Date: Sun, 22 Oct 2023 09:31:09 +0300 Subject: [PATCH 1/2] Transfer - Remove recursive locks in snapshot node (#1004) --- utils/reposnapshot/node.go | 81 +++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/utils/reposnapshot/node.go b/utils/reposnapshot/node.go index eadeb3bb4..33d862244 100644 --- a/utils/reposnapshot/node.go +++ b/utils/reposnapshot/node.go @@ -52,20 +52,26 @@ func (node *Node) action(action ActionOnNodeFunc) error { // Convert node to wrapper in order to save it to file. func (node *Node) convertToWrapper() (wrapper *NodeExportWrapper, err error) { + var children []*Node err = node.action(func(node *Node) error { wrapper = &NodeExportWrapper{ Name: node.name, Completed: node.NodeStatus == Completed, } - for i := range node.children { - converted, err := node.children[i].convertToWrapper() - if err != nil { - return err - } - wrapper.Children = append(wrapper.Children, converted) - } + children = node.children return nil }) + if err != nil { + return + } + + for i := range children { + converted, err := children[i].convertToWrapper() + if err != nil { + return nil, err + } + wrapper.Children = append(wrapper.Children, converted) + } return } @@ -107,17 +113,19 @@ func (node *Node) getActualPath() (actualPath string, err error) { } // Sets node as completed, clear its contents, notifies parent to check completion. -func (node *Node) setCompleted() error { - return node.action(func(node *Node) error { +func (node *Node) setCompleted() (err error) { + var parent *Node + err = node.action(func(node *Node) error { node.NodeStatus = Completed node.children = nil - parent := node.parent + parent = node.parent node.parent = nil - if parent != nil { - return parent.CheckCompleted() - } return nil }) + if err == nil && parent != nil { + return parent.CheckCompleted() + } + return } // Check if node completed - if done exploring, done handling files, children are completed. @@ -233,30 +241,39 @@ func (node *Node) RestartExploring() error { // For a structure such as repo->dir1->dir2->dir3 // The initial call will be to the root, and for an input of ({"dir1","dir2"}), and the final output will be a pointer to dir2. func (node *Node) findMatchingNode(childrenDirs []string) (matchingNode *Node, err error) { - err = node.action(func(node *Node) error { - // The node was found in the cache. Let's return it. - if len(childrenDirs) == 0 { - matchingNode = node - return nil - } + // The node was found in the cache. Let's return it. + if len(childrenDirs) == 0 { + matchingNode = node + return + } - // Check if any of the current node's children are parents of the current node. - for i := range node.children { - if node.children[i].name == childrenDirs[0] { - matchingNode, err = node.children[i].findMatchingNode(childrenDirs[1:]) - return err - } + // Check if any of the current node's children are parents of the current node. + var children []*Node + err = node.action(func(node *Node) error { + children = node.children + return nil + }) + if err != nil { + return + } + for i := range children { + if children[i].name == childrenDirs[0] { + matchingNode, err = children[i].findMatchingNode(childrenDirs[1:]) + return } + } - // None of the current node's children are parents of the current node. - // This means we need to start creating the searched node parents. - newNode := CreateNewNode(childrenDirs[0], node) - newNode.parent = node + // None of the current node's children are parents of the current node. + // This means we need to start creating the searched node parents. + newNode := CreateNewNode(childrenDirs[0], node) + err = node.action(func(node *Node) error { node.children = append(node.children, newNode) - matchingNode, err = newNode.findMatchingNode(childrenDirs[1:]) - return err + return nil }) - return + if err != nil { + return + } + return newNode.findMatchingNode(childrenDirs[1:]) } func CreateNewNode(dirName string, parent *Node) *Node { From eae97142de810310bd0b59a7881897ffe75d1ca0 Mon Sep 17 00:00:00 2001 From: Yahav Itzhak Date: Sun, 22 Oct 2023 09:45:45 +0300 Subject: [PATCH 2/2] Transfer - Graceful stop does not stop immediately (#1005) --- artifactory/commands/transferfiles/manager.go | 7 ++++--- artifactory/commands/transferfiles/phase.go | 3 ++- artifactory/commands/transferfiles/phase_test.go | 3 ++- artifactory/commands/transferfiles/utils.go | 5 ++++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/artifactory/commands/transferfiles/manager.go b/artifactory/commands/transferfiles/manager.go index e9904b592..b6777ca3b 100644 --- a/artifactory/commands/transferfiles/manager.go +++ b/artifactory/commands/transferfiles/manager.go @@ -40,7 +40,8 @@ type transferDelayAction func(phase phaseBase, addedDelayFiles []string) error // Transfer files using the 'producer-consumer' mechanism and apply a delay action. func (ftm *transferManager) doTransferWithProducerConsumer(transferAction transferActionWithProducerConsumerType, delayAction transferDelayAction) error { - ftm.pcDetails = newProducerConsumerWrapper() + // Set the producer-consumer value into the referenced value. This allow the Graceful Stop mechanism to access ftm.pcDetails when needed to stop the transfer. + *ftm.pcDetails = newProducerConsumerWrapper() return ftm.doTransfer(ftm.pcDetails, transferAction, delayAction) } @@ -203,14 +204,14 @@ type producerConsumerWrapper struct { errorsQueue *clientUtils.ErrorsQueue } -func newProducerConsumerWrapper() *producerConsumerWrapper { +func newProducerConsumerWrapper() producerConsumerWrapper { chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) chunkUploaderProducerConsumer.SetFinishedNotification(true) chunkBuilderProducerConsumer.SetFinishedNotification(true) errorsQueue := clientUtils.NewErrorsQueue(1) - return &producerConsumerWrapper{ + return producerConsumerWrapper{ chunkUploaderProducerConsumer: chunkUploaderProducerConsumer, chunkBuilderProducerConsumer: chunkBuilderProducerConsumer, errorsQueue: errorsQueue, diff --git a/artifactory/commands/transferfiles/phase.go b/artifactory/commands/transferfiles/phase.go index d204c64d4..c9e9f2bbd 100644 --- a/artifactory/commands/transferfiles/phase.go +++ b/artifactory/commands/transferfiles/phase.go @@ -145,7 +145,8 @@ func (pb *phaseBase) setStopSignal(stopSignal chan os.Signal) { } func createTransferPhase(i int) transferPhase { - curPhaseBase := phaseBase{phaseId: i} + // Initialize a pointer to an empty producerConsumerWrapper to allow access the real value in StopGracefully + curPhaseBase := phaseBase{phaseId: i, pcDetails: &producerConsumerWrapper{}} switch i { case api.Phase1: return &fullTransferPhase{phaseBase: curPhaseBase} diff --git a/artifactory/commands/transferfiles/phase_test.go b/artifactory/commands/transferfiles/phase_test.go index dede22a24..8b1d7044a 100644 --- a/artifactory/commands/transferfiles/phase_test.go +++ b/artifactory/commands/transferfiles/phase_test.go @@ -13,7 +13,8 @@ import ( const zeroUint32 uint32 = 0 func TestStopGracefully(t *testing.T) { - pBase := &phaseBase{pcDetails: newProducerConsumerWrapper()} + pcWrapper := newProducerConsumerWrapper() + pBase := &phaseBase{pcDetails: &pcWrapper} chunkUploaderProducerConsumer := pBase.pcDetails.chunkUploaderProducerConsumer chunkBuilderProducerConsumer := pBase.pcDetails.chunkBuilderProducerConsumer go func() { diff --git a/artifactory/commands/transferfiles/utils.go b/artifactory/commands/transferfiles/utils.go index 902bd5421..14ebd3380 100644 --- a/artifactory/commands/transferfiles/utils.go +++ b/artifactory/commands/transferfiles/utils.go @@ -374,7 +374,10 @@ func interruptIfRequested(stopSignal chan os.Signal) error { return err } if exist { - stopSignal <- os.Interrupt + select { + case stopSignal <- os.Interrupt: + default: + } } return nil }