diff --git a/snap-core/src/main/java/org/esa/snap/core/dataio/ProductIO.java b/snap-core/src/main/java/org/esa/snap/core/dataio/ProductIO.java index 4c25ec998d8..0466cfacffd 100644 --- a/snap-core/src/main/java/org/esa/snap/core/dataio/ProductIO.java +++ b/snap-core/src/main/java/org/esa/snap/core/dataio/ProductIO.java @@ -38,9 +38,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; @@ -504,23 +504,21 @@ private static void writeAllBands(Product product, ProgressMonitor pm) throws IO private static void writeBandsConcurrent(ProgressMonitor pm, ArrayList bandsToWrite) throws IOException { final int numBands = bandsToWrite.size(); - final int numThreads = Runtime.getRuntime().availableProcessors(); - final int threadsPerBand = numThreads / numBands; - final int executorSize = threadsPerBand == 0 ? 1 : threadsPerBand; - Semaphore semaphore = new Semaphore(numThreads); + final CountDownLatch bandsCountDown = new CountDownLatch(numBands); + + final int numThreads = Config.instance().load().preferences().getInt("snap.parallelism", Runtime.getRuntime().availableProcessors()); + final ExecutorService executor = Executors.newFixedThreadPool(numThreads); List ioExceptionCollector = Collections.unmodifiableList(new ArrayList<>()); for (Band band : bandsToWrite) { if (pm.isCanceled()) { + bandsCountDown.countDown(); break; } - ExecutorService executor = null; - semaphore.acquireUninterruptibly(); - executor = Executors.newFixedThreadPool(executorSize); pm.setSubTaskName("Writing band '" + band.getName() + "'"); ProgressMonitor subPM = SubProgressMonitor.create(pm, 1); - writeRasterDataFully(subPM, band, executor, semaphore, ioExceptionCollector); + writeRasterDataFully(subPM, band, executor, bandsCountDown, ioExceptionCollector); } - while (semaphore.availablePermits() < numThreads) { + while (bandsCountDown.getCount() > 0) { try { Thread.sleep(200); } catch (InterruptedException e) { @@ -528,11 +526,13 @@ private static void writeBandsConcurrent(ProgressMonitor pm, ArrayList ban "Method ProductIO.writeAllBands(...)' unexpected termination", e); } } + executor.shutdown(); for (IOException e : ioExceptionCollector) { SystemUtils.LOG.log(Level.SEVERE, e.getMessage(), e); } if (ioExceptionCollector.size() > 0) { - throw ioExceptionCollector.get(0); + IOException ioException = ioExceptionCollector.get(0); + throw ioException; } } @@ -553,12 +553,16 @@ private static void writeBandsSequentially(ProgressMonitor pm, ArrayList b private ProductIO() { } - private static void writeRasterDataFully(ProgressMonitor pm, Band band, ExecutorService executor, Semaphore semaphore, List ioExceptionCollector) throws IOException { + private static void writeRasterDataFully(ProgressMonitor pm, Band band, ExecutorService executor, CountDownLatch bandsCountDown, List ioExceptionCollector) throws IOException { if (band.hasRasterData()) { - band.writeRasterData(0, 0, band.getRasterWidth(), band.getRasterHeight(), band.getRasterData(), pm); - band.removeCachedImageData(); - if (semaphore != null) { - semaphore.release(); + try { + band.writeRasterData(0, 0, band.getRasterWidth(), band.getRasterHeight(), band.getRasterData(), pm); + band.removeCachedImageData(); + } finally { + pm.done(); + if (bandsCountDown != null) { + bandsCountDown.countDown(); + } } } else { final PlanarImage sourceImage = band.getSourceImage(); @@ -567,7 +571,7 @@ private static void writeRasterDataFully(ProgressMonitor pm, Band band, Executor int numTiles = tileIndices.length; pm.beginTask("Writing raster data...", numTiles); if (executor != null) { - Finisher finisher = new Finisher(pm, semaphore, executor, numTiles, band); + Finisher finisher = new Finisher(pm, bandsCountDown, numTiles, band); for (Point tileIndex : tileIndices) { executor.execute(() -> { try { @@ -620,6 +624,7 @@ private static void writeTile(PlanarImage sourceImage, Point tileIndex, Band ban * @param lvlSupport defines the level (resolution) within the level image pyramid which shall be read * @param destRect the rectangular area which shall be filled with data * @param destBuffer the buffer where to put the data + * * @throws IOException in case an error occurs during reading */ // Todo mp 2020-07-03 - https://senbox.atlassian.net/browse/SNAP-1134 @@ -639,19 +644,16 @@ public static void readLevelBandRasterData(AbstractProductReader reader, private static class Finisher { private final ProgressMonitor pm; - private final Semaphore semaphore; - private final ExecutorService executor; + private final CountDownLatch bandsCountDown; private final int work; private final Band band; private int counter; - public Finisher(ProgressMonitor pm, Semaphore semaphore, ExecutorService executor, int counter, Band band) { + public Finisher(ProgressMonitor pm, CountDownLatch bandsCountDown, int counter, Band band) { this.pm = pm; - this.semaphore = semaphore; - this.executor = executor; + this.bandsCountDown = bandsCountDown; this.work = counter; this.band = band; - } public synchronized void worked() { @@ -660,8 +662,7 @@ public synchronized void worked() { } finally { counter++; if (counter == work) { - semaphore.release(); - executor.shutdown(); + bandsCountDown.countDown(); pm.done(); band.removeCachedImageData(); }