Skip to content

Commit

Permalink
using code from master branch to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
marpet committed Jun 22, 2021
1 parent d6f2d5b commit 9d86414
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions snap-core/src/main/java/org/esa/snap/core/dataio/ProductIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -504,35 +504,35 @@ private static void writeAllBands(Product product, ProgressMonitor pm) throws IO

private static void writeBandsConcurrent(ProgressMonitor pm, ArrayList<Band> bandsToWrite) throws IOException {
final int numBands = bandsToWrite.size();
final int numThreads = Runtime.getRuntime().availableProcessors();
final int threadsPerBand = numThreads / numBands;
final int executorSize = threadsPerBand == 0 ? 1 : threadsPerBand;
Semaphore semaphore = new Semaphore(numThreads);
final CountDownLatch bandsCountDown = new CountDownLatch(numBands);

final int numThreads = Config.instance().load().preferences().getInt("snap.parallelism", Runtime.getRuntime().availableProcessors());
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<IOException> ioExceptionCollector = Collections.unmodifiableList(new ArrayList<>());
for (Band band : bandsToWrite) {
if (pm.isCanceled()) {
bandsCountDown.countDown();
break;
}
ExecutorService executor = null;
semaphore.acquireUninterruptibly();
executor = Executors.newFixedThreadPool(executorSize);
pm.setSubTaskName("Writing band '" + band.getName() + "'");
ProgressMonitor subPM = SubProgressMonitor.create(pm, 1);
writeRasterDataFully(subPM, band, executor, semaphore, ioExceptionCollector);
writeRasterDataFully(subPM, band, executor, bandsCountDown, ioExceptionCollector);
}
while (semaphore.availablePermits() < numThreads) {
while (bandsCountDown.getCount() > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
EngineConfig.instance().logger().log(Level.WARNING,
"Method ProductIO.writeAllBands(...)' unexpected termination", e);
}
}
executor.shutdown();
for (IOException e : ioExceptionCollector) {
SystemUtils.LOG.log(Level.SEVERE, e.getMessage(), e);
}
if (ioExceptionCollector.size() > 0) {
throw ioExceptionCollector.get(0);
IOException ioException = ioExceptionCollector.get(0);
throw ioException;
}
}

Expand All @@ -553,12 +553,16 @@ private static void writeBandsSequentially(ProgressMonitor pm, ArrayList<Band> b
private ProductIO() {
}

private static void writeRasterDataFully(ProgressMonitor pm, Band band, ExecutorService executor, Semaphore semaphore, List<IOException> ioExceptionCollector) throws IOException {
private static void writeRasterDataFully(ProgressMonitor pm, Band band, ExecutorService executor, CountDownLatch bandsCountDown, List<IOException> ioExceptionCollector) throws IOException {
if (band.hasRasterData()) {
band.writeRasterData(0, 0, band.getRasterWidth(), band.getRasterHeight(), band.getRasterData(), pm);
band.removeCachedImageData();
if (semaphore != null) {
semaphore.release();
try {
band.writeRasterData(0, 0, band.getRasterWidth(), band.getRasterHeight(), band.getRasterData(), pm);
band.removeCachedImageData();
} finally {
pm.done();
if (bandsCountDown != null) {
bandsCountDown.countDown();
}
}
} else {
final PlanarImage sourceImage = band.getSourceImage();
Expand All @@ -567,7 +571,7 @@ private static void writeRasterDataFully(ProgressMonitor pm, Band band, Executor
int numTiles = tileIndices.length;
pm.beginTask("Writing raster data...", numTiles);
if (executor != null) {
Finisher finisher = new Finisher(pm, semaphore, executor, numTiles, band);
Finisher finisher = new Finisher(pm, bandsCountDown, numTiles, band);
for (Point tileIndex : tileIndices) {
executor.execute(() -> {
try {
Expand Down Expand Up @@ -620,6 +624,7 @@ private static void writeTile(PlanarImage sourceImage, Point tileIndex, Band ban
* @param lvlSupport defines the level (resolution) within the level image pyramid which shall be read
* @param destRect the rectangular area which shall be filled with data
* @param destBuffer the buffer where to put the data
*
* @throws IOException in case an error occurs during reading
*/
// Todo mp 2020-07-03 - https://senbox.atlassian.net/browse/SNAP-1134
Expand All @@ -639,19 +644,16 @@ public static void readLevelBandRasterData(AbstractProductReader reader,
private static class Finisher {

private final ProgressMonitor pm;
private final Semaphore semaphore;
private final ExecutorService executor;
private final CountDownLatch bandsCountDown;
private final int work;
private final Band band;
private int counter;

public Finisher(ProgressMonitor pm, Semaphore semaphore, ExecutorService executor, int counter, Band band) {
public Finisher(ProgressMonitor pm, CountDownLatch bandsCountDown, int counter, Band band) {
this.pm = pm;
this.semaphore = semaphore;
this.executor = executor;
this.bandsCountDown = bandsCountDown;
this.work = counter;
this.band = band;

}

public synchronized void worked() {
Expand All @@ -660,8 +662,7 @@ public synchronized void worked() {
} finally {
counter++;
if (counter == work) {
semaphore.release();
executor.shutdown();
bandsCountDown.countDown();
pm.done();
band.removeCachedImageData();
}
Expand Down

0 comments on commit 9d86414

Please sign in to comment.