Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/dev' into recursive_audit_scan
Browse files Browse the repository at this point in the history
  • Loading branch information
attiasas committed Oct 22, 2023
2 parents 9d4a8a2 + eae9714 commit f8795ad
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 38 deletions.
7 changes: 4 additions & 3 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type transferDelayAction func(phase phaseBase, addedDelayFiles []string) error

// Transfer files using the 'producer-consumer' mechanism and apply a delay action.
func (ftm *transferManager) doTransferWithProducerConsumer(transferAction transferActionWithProducerConsumerType, delayAction transferDelayAction) error {
ftm.pcDetails = newProducerConsumerWrapper()
// Set the producer-consumer value into the referenced value. This allow the Graceful Stop mechanism to access ftm.pcDetails when needed to stop the transfer.
*ftm.pcDetails = newProducerConsumerWrapper()
return ftm.doTransfer(ftm.pcDetails, transferAction, delayAction)
}

Expand Down Expand Up @@ -203,14 +204,14 @@ type producerConsumerWrapper struct {
errorsQueue *clientUtils.ErrorsQueue
}

func newProducerConsumerWrapper() *producerConsumerWrapper {
func newProducerConsumerWrapper() producerConsumerWrapper {
chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer.SetFinishedNotification(true)
chunkBuilderProducerConsumer.SetFinishedNotification(true)
errorsQueue := clientUtils.NewErrorsQueue(1)

return &producerConsumerWrapper{
return producerConsumerWrapper{
chunkUploaderProducerConsumer: chunkUploaderProducerConsumer,
chunkBuilderProducerConsumer: chunkBuilderProducerConsumer,
errorsQueue: errorsQueue,
Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func (pb *phaseBase) setStopSignal(stopSignal chan os.Signal) {
}

func createTransferPhase(i int) transferPhase {
curPhaseBase := phaseBase{phaseId: i}
// Initialize a pointer to an empty producerConsumerWrapper to allow access the real value in StopGracefully
curPhaseBase := phaseBase{phaseId: i, pcDetails: &producerConsumerWrapper{}}
switch i {
case api.Phase1:
return &fullTransferPhase{phaseBase: curPhaseBase}
Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/phase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
const zeroUint32 uint32 = 0

func TestStopGracefully(t *testing.T) {
pBase := &phaseBase{pcDetails: newProducerConsumerWrapper()}
pcWrapper := newProducerConsumerWrapper()
pBase := &phaseBase{pcDetails: &pcWrapper}
chunkUploaderProducerConsumer := pBase.pcDetails.chunkUploaderProducerConsumer
chunkBuilderProducerConsumer := pBase.pcDetails.chunkBuilderProducerConsumer
go func() {
Expand Down
5 changes: 4 additions & 1 deletion artifactory/commands/transferfiles/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,10 @@ func interruptIfRequested(stopSignal chan os.Signal) error {
return err
}
if exist {
stopSignal <- os.Interrupt
select {
case stopSignal <- os.Interrupt:
default:
}
}
return nil
}
Expand Down
81 changes: 49 additions & 32 deletions utils/reposnapshot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,26 @@ func (node *Node) action(action ActionOnNodeFunc) error {

// Convert node to wrapper in order to save it to file.
func (node *Node) convertToWrapper() (wrapper *NodeExportWrapper, err error) {
var children []*Node
err = node.action(func(node *Node) error {
wrapper = &NodeExportWrapper{
Name: node.name,
Completed: node.NodeStatus == Completed,
}
for i := range node.children {
converted, err := node.children[i].convertToWrapper()
if err != nil {
return err
}
wrapper.Children = append(wrapper.Children, converted)
}
children = node.children
return nil
})
if err != nil {
return
}

for i := range children {
converted, err := children[i].convertToWrapper()
if err != nil {
return nil, err
}
wrapper.Children = append(wrapper.Children, converted)
}
return
}

Expand Down Expand Up @@ -107,17 +113,19 @@ func (node *Node) getActualPath() (actualPath string, err error) {
}

// Sets node as completed, clear its contents, notifies parent to check completion.
func (node *Node) setCompleted() error {
return node.action(func(node *Node) error {
func (node *Node) setCompleted() (err error) {
var parent *Node
err = node.action(func(node *Node) error {
node.NodeStatus = Completed
node.children = nil
parent := node.parent
parent = node.parent
node.parent = nil
if parent != nil {
return parent.CheckCompleted()
}
return nil
})
if err == nil && parent != nil {
return parent.CheckCompleted()
}
return
}

// Check if node completed - if done exploring, done handling files, children are completed.
Expand Down Expand Up @@ -233,30 +241,39 @@ func (node *Node) RestartExploring() error {
// For a structure such as repo->dir1->dir2->dir3
// The initial call will be to the root, and for an input of ({"dir1","dir2"}), and the final output will be a pointer to dir2.
func (node *Node) findMatchingNode(childrenDirs []string) (matchingNode *Node, err error) {
err = node.action(func(node *Node) error {
// The node was found in the cache. Let's return it.
if len(childrenDirs) == 0 {
matchingNode = node
return nil
}
// The node was found in the cache. Let's return it.
if len(childrenDirs) == 0 {
matchingNode = node
return
}

// Check if any of the current node's children are parents of the current node.
for i := range node.children {
if node.children[i].name == childrenDirs[0] {
matchingNode, err = node.children[i].findMatchingNode(childrenDirs[1:])
return err
}
// Check if any of the current node's children are parents of the current node.
var children []*Node
err = node.action(func(node *Node) error {
children = node.children
return nil
})
if err != nil {
return
}
for i := range children {
if children[i].name == childrenDirs[0] {
matchingNode, err = children[i].findMatchingNode(childrenDirs[1:])
return
}
}

// None of the current node's children are parents of the current node.
// This means we need to start creating the searched node parents.
newNode := CreateNewNode(childrenDirs[0], node)
newNode.parent = node
// None of the current node's children are parents of the current node.
// This means we need to start creating the searched node parents.
newNode := CreateNewNode(childrenDirs[0], node)
err = node.action(func(node *Node) error {
node.children = append(node.children, newNode)
matchingNode, err = newNode.findMatchingNode(childrenDirs[1:])
return err
return nil
})
return
if err != nil {
return
}
return newNode.findMatchingNode(childrenDirs[1:])
}

func CreateNewNode(dirName string, parent *Node) *Node {
Expand Down

0 comments on commit f8795ad

Please sign in to comment.