From 50db42fb80286fae7d6c68f36108c500575bf6f2 Mon Sep 17 00:00:00 2001 From: yahavi Date: Sun, 22 Oct 2023 08:40:45 +0300 Subject: [PATCH] Transfer - Graceful stop does not stop immediately --- artifactory/commands/transferfiles/manager.go | 7 ++++--- artifactory/commands/transferfiles/phase.go | 3 ++- artifactory/commands/transferfiles/phase_test.go | 3 ++- artifactory/commands/transferfiles/utils.go | 5 ++++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/artifactory/commands/transferfiles/manager.go b/artifactory/commands/transferfiles/manager.go index e9904b592..b6777ca3b 100644 --- a/artifactory/commands/transferfiles/manager.go +++ b/artifactory/commands/transferfiles/manager.go @@ -40,7 +40,8 @@ type transferDelayAction func(phase phaseBase, addedDelayFiles []string) error // Transfer files using the 'producer-consumer' mechanism and apply a delay action. func (ftm *transferManager) doTransferWithProducerConsumer(transferAction transferActionWithProducerConsumerType, delayAction transferDelayAction) error { - ftm.pcDetails = newProducerConsumerWrapper() + // Set the producer-consumer value into the referenced value. This allow the Graceful Stop mechanism to access ftm.pcDetails when needed to stop the transfer. + *ftm.pcDetails = newProducerConsumerWrapper() return ftm.doTransfer(ftm.pcDetails, transferAction, delayAction) } @@ -203,14 +204,14 @@ type producerConsumerWrapper struct { errorsQueue *clientUtils.ErrorsQueue } -func newProducerConsumerWrapper() *producerConsumerWrapper { +func newProducerConsumerWrapper() producerConsumerWrapper { chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) chunkUploaderProducerConsumer.SetFinishedNotification(true) chunkBuilderProducerConsumer.SetFinishedNotification(true) errorsQueue := clientUtils.NewErrorsQueue(1) - return &producerConsumerWrapper{ + return producerConsumerWrapper{ chunkUploaderProducerConsumer: chunkUploaderProducerConsumer, chunkBuilderProducerConsumer: chunkBuilderProducerConsumer, errorsQueue: errorsQueue, diff --git a/artifactory/commands/transferfiles/phase.go b/artifactory/commands/transferfiles/phase.go index d204c64d4..c9e9f2bbd 100644 --- a/artifactory/commands/transferfiles/phase.go +++ b/artifactory/commands/transferfiles/phase.go @@ -145,7 +145,8 @@ func (pb *phaseBase) setStopSignal(stopSignal chan os.Signal) { } func createTransferPhase(i int) transferPhase { - curPhaseBase := phaseBase{phaseId: i} + // Initialize a pointer to an empty producerConsumerWrapper to allow access the real value in StopGracefully + curPhaseBase := phaseBase{phaseId: i, pcDetails: &producerConsumerWrapper{}} switch i { case api.Phase1: return &fullTransferPhase{phaseBase: curPhaseBase} diff --git a/artifactory/commands/transferfiles/phase_test.go b/artifactory/commands/transferfiles/phase_test.go index dede22a24..8b1d7044a 100644 --- a/artifactory/commands/transferfiles/phase_test.go +++ b/artifactory/commands/transferfiles/phase_test.go @@ -13,7 +13,8 @@ import ( const zeroUint32 uint32 = 0 func TestStopGracefully(t *testing.T) { - pBase := &phaseBase{pcDetails: newProducerConsumerWrapper()} + pcWrapper := newProducerConsumerWrapper() + pBase := &phaseBase{pcDetails: &pcWrapper} chunkUploaderProducerConsumer := pBase.pcDetails.chunkUploaderProducerConsumer chunkBuilderProducerConsumer := pBase.pcDetails.chunkBuilderProducerConsumer go func() { diff --git a/artifactory/commands/transferfiles/utils.go b/artifactory/commands/transferfiles/utils.go index 902bd5421..14ebd3380 100644 --- a/artifactory/commands/transferfiles/utils.go +++ b/artifactory/commands/transferfiles/utils.go @@ -374,7 +374,10 @@ func interruptIfRequested(stopSignal chan os.Signal) error { return err } if exist { - stopSignal <- os.Interrupt + select { + case stopSignal <- os.Interrupt: + default: + } } return nil }