Skip to content

Commit

Permalink
File splitting enhancement (#311)
Browse files Browse the repository at this point in the history
* Minor changes to messaging for MDN disposition errors.

* Use find_java to look for JAVA_HOME

* Remove double square brackets for Ubuntu dash compat

* Handle cleanup error better.

* Cater for unusable or questionable file names

* Fix formatting failures

* Handle EOF exception gracefully for unreadable pending info files

* Refresh the partnership variables just before processing document.

* Use find_java for identifying Java

* Use the java executable instead of javac as it is not always installed.
Use bash shell explicitly to cater for Ubuntu mapping sh to dash

* Simplify reconstituting the mime body part.
Provide backwards compat for now.

* Extract more values to properties in preparation for automated upgrades.

* Sample properties file for property driven custom configuration.

* Upgrade notes

* Updated documentation for 3.4.1

* Add the reject_unsigned_meesages attribute as an example.

* New version and updated libraries to latest.

* Extract the file splitting functionality to a separate class.
Support running it as a thread or as a standalone app from command line

* Minor changes to logging to make more sense.

* Fix comment.

* Version change and update libraries to latest releases.

* Merge with upstream

* Update mvnw

* Fixes to accommodate RestAPI shortcomings with Java 8
  • Loading branch information
uhurusurfa authored Dec 26, 2022
1 parent 7e2a5cf commit 641eb93
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 101 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ jobs:
with:
java-version: ${{ matrix.java_version }}
distribution: 'adopt'
- name: Remove Rest API for Java 8 because of library incompatibility
if: contains(matrix.java_version, '8') == true && contains(matrix.os, 'win') == true
run: |
rmdir -Recurse -Force Server/src/main/java/org/openas2/cmd/processor/restapi
del -Force Server/src/main/java/org/openas2/cmd/processor/RestCommandProcessor.java
- name: Remove Rest API for Java 8 because of library incompatibility
if: contains(matrix.java_version, '8') == true && contains(matrix.os, 'win') == false
run: |
rm -rf Server/src/main/java/org/openas2/cmd/processor/restapi
rm -rf Server/src/main/java/org/openas2/cmd/processor/RestCommandProcessor.java
- name: Make Maven Wrapper and Java finder executable. Copy cacerts
if: contains(matrix.os, 'win') == false
run: |
Expand Down
2 changes: 1 addition & 1 deletion Remote/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>net.sf.openas2</groupId>
<artifactId>OpenAS2</artifactId>
<version>3.4.1</version>
<version>3.5.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion Server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<!-- DO NOT CHANGE THIS "groupId" WITHOUT CHANGING XMLSession.getManifestAttributes.MANIFEST_VENDOR_ID_ATTRIB -->
<groupId>net.sf.openas2</groupId>
<artifactId>OpenAS2</artifactId>
<version>3.4.1</version>
<version>3.5.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions Server/src/main/java/CheckCertificate.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private CommandLine parseCommandLine(String[] args) {
try {
line = parser.parse(options, args);
} catch (ParseException e) {
System.out.println("Unexpected exception:" + e.getMessage());
System.out.println("Command line parsing error: " + e.getMessage());
usage(options);
}
return line;
Expand Down Expand Up @@ -323,7 +323,7 @@ public static void main(String[] args) {
CheckCertificate mgr = new CheckCertificate();
mgr.process(args);
} catch (Exception e) {
System.out.println("Unexpected exception:" + e.getMessage());
System.out.println("Processing error occurred: " + e.getMessage());
}
}
}
116 changes: 116 additions & 0 deletions Server/src/main/java/SplitCsvFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.openas2.util.FileUtil;
import java.io.File;

/**
* Class used to add the server's certificate to the KeyStore with your trusted
* certificates.
*/
public class SplitCsvFile {

public static final String SOURCE_FILE = "s";
public static final String OUT_FILENAME_PREFIX = "p";
public static final String OUTPUT_DIR = "o";
public static final String MAX_FILE_SIZE = "m";
public static final String HAS_HEADER_ROW = "h";
public static final String DEBUG = "d";
public static final String HELP_OPT = "h";

/*
* Options in this format: short-opt, long-opt, has-argument, required,
* description
*/
public String[][] opts = {
{SOURCE_FILE, "source", "true", "true", "the source file name including path"},
{MAX_FILE_SIZE, "max_size", "true", "true", "the maximum size of the files"},
{HAS_HEADER_ROW, "has_header", "false", "false", "if set, the file has a header row that wil be replicated into every file"},
{OUT_FILENAME_PREFIX, "out_file_prefix", "true", "false", "the prefix for the split file names"},
{OUTPUT_DIR, "out_dir", "true", "false", "output directory for the split files - defaults to current dir"},
{DEBUG, "debug", "true", "false", "Enabling debug logging"},
{HELP_OPT, "help", "false", "false", "print this help"}
};

private void usage(Options options) {
String header = "Splits CSV file." + "\nReads the file as a line based file creating new files that will not exceed the specified maximum size.";
String footer = "NOTE: The file is expected to contain lines separated by newline characters.";

HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(this.getClass().getName(), header, options, footer, true);
}

private CommandLine parseCommandLine(String[] args) {
// create the command line parser
CommandLineParser parser = new DefaultParser();

// create the Options
Options options = new Options();
for (String[] opt : opts) {
Option option = Option.builder(opt[0]).longOpt(opt[1]).hasArg("true".equalsIgnoreCase(opt[2])).desc(opt[4]).build();
option.setRequired("true".equalsIgnoreCase(opt[3]));
options.addOption(option);
}

// parse the command line arguments
CommandLine line = null;
try {
line = parser.parse(options, args);
} catch (ParseException e) {
System.out.println("Command line parsing error: " + e.getMessage());
usage(options);
System.exit(-1);
}
return line;
}

private void process(String[] args) throws Exception {
CommandLine options = parseCommandLine(args);
if (options == null) {
return;
}
String sourceFileName = options.getOptionValue(SOURCE_FILE);
File sourceFile = new File(sourceFileName);
if (!sourceFile.exists()) {
throw new Exception("File does not exist: " + sourceFileName);
}
long maxFileSize = Long.parseLong(options.getOptionValue(MAX_FILE_SIZE));
String outputDirName = null;
if (options.hasOption(OUTPUT_DIR)) {
outputDirName = options.getOptionValue(OUTPUT_DIR);
} else {
outputDirName = System.getProperty("user.dir");
}
String prefix = (options.hasOption(OUT_FILENAME_PREFIX)) ? options.getOptionValue(OUT_FILENAME_PREFIX) : "";
boolean hasHeaderRow = options.hasOption(HAS_HEADER_ROW);

if (options.hasOption(DEBUG) && "true".equalsIgnoreCase(options.getOptionValue(DEBUG))) {
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.SimpleLog");
System.setProperty("org.apache.commons.logging.simplelog.showdatetime", "true");
System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.http", "DEBUG");
System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.http.wire", "ERROR");
}
try {
FileUtil.splitLineBasedFile(sourceFile, outputDirName, maxFileSize, hasHeaderRow, sourceFile.getName(), prefix);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Processing error occurred: " + e.getMessage());
System.exit(-1);
}
}

public static void main(String[] args) {
try {
SplitCsvFile mgr = new SplitCsvFile();
mgr.process(args);
System.exit(0);
} catch (Exception e) {
System.out.println("Processing error occurred: " + e.getMessage());
System.exit(-1);
}
}
}
2 changes: 1 addition & 1 deletion Server/src/main/java/org/openas2/XMLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void loadProcessorModule(Processor proc, Node moduleNode) throws OpenAS2
String partnershipName = null;
Node defaultsNode = moduleNode.getAttributes().getNamedItem("defaults");
if (defaultsNode == null) {
// If there is a format nodethen this is a generic poller module
// If there is a format node then this is a generic poller module
Node formatNode = moduleNode.getAttributes().getNamedItem("format");
if (formatNode == null) {
throw new OpenAS2Exception("Invalid poller module coniguration: " + moduleNode.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.openas2.processor.receiver;

import java.io.File;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.openas2.OpenAS2Exception;
import org.openas2.util.FileUtil;
import org.openas2.util.IOUtil;

public class FileSplitter implements Runnable {
private File sourceFile;
private String outputDir;
private long maxFileSize;
private boolean containsHeaderRow;
private String newFileBaseName;
private String filenamePrefix;

private static final Log logger = LogFactory.getLog(FileUtil.class.getSimpleName());

public FileSplitter(File sourceFile, String outputDir, long maxFileSize, boolean containsHeaderRow, String newFileBaseName, String filenamePrefix) {
this.sourceFile = sourceFile;
this.outputDir = outputDir;
this.maxFileSize = maxFileSize;
this.containsHeaderRow = containsHeaderRow;
this.newFileBaseName = newFileBaseName;
this.filenamePrefix = filenamePrefix;
}

public void run(){
if (logger.isDebugEnabled()) {
logger.debug("File splitter thread invoked for file: " + this.sourceFile.getAbsolutePath());
}
try {
FileUtil.splitLineBasedFile(this.sourceFile, this.outputDir, this.maxFileSize, this.containsHeaderRow, this.newFileBaseName, this.filenamePrefix);
if (logger.isDebugEnabled()) {
logger.debug("Successfully split the file: " + this.sourceFile.getAbsolutePath());
}
// Must have been successful so remove the original file
try {
IOUtil.deleteFile(sourceFile);
} catch (IOException e) {
throw new OpenAS2Exception("Failed to delete file after split processing: " + sourceFile.getAbsolutePath(), e);
}
} catch (OpenAS2Exception e) {
logger.error("Failed to successfully split the file: " + this.sourceFile.getAbsolutePath() + " - " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,17 @@
import org.openas2.processor.sender.SenderModule;
import org.openas2.util.AS2Util;
import org.openas2.util.IOUtil;
import org.openas2.util.StringUtil;
import org.openas2.util.Properties;

import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.activation.FileDataSource;
import javax.mail.MessagingException;
import javax.mail.internet.MimeBodyPart;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
Expand Down Expand Up @@ -90,96 +87,18 @@ protected Message processDocument(File fileToSend, String filename) throws OpenA
newFileNamePrefix = "";
}
boolean containsHeaderRow = "true".equals(msg.getPartnership().getAttribute(Partnership.PA_SPLIT_FILE_CONTAINS_HEADER_ROW));
FileReader fileReader = new FileReader(fileToSend);
BufferedReader bufferedReader = new BufferedReader(fileReader);
String preprocessDir = Properties.getProperty("storageBaseDir", fileToSend.getParent()) + File.separator + "preprocess";
// Move the file to a holding folder so it is not processed by the directory poller anymore
String movedFilePath = preprocessDir + File.separator + filename;
File movedFile = new File(movedFilePath);
try {
byte[] headerRow = new byte[0];
if (containsHeaderRow) {
try {
String headerLine = bufferedReader.readLine();
if (headerLine != null) {
headerRow = (headerLine + "\n").getBytes();
}
} catch (IOException e1) {
throw new OpenAS2Exception("Failed to read header row from input file.", e1);
}
}
long headerRowByteCount = headerRow.length;
if (fileSizeThreshold < headerRowByteCount) {
// Would just write header repeatedly so throw error
throw new OpenAS2Exception("Split file size is less than the header row size.");
}
long expectedFileCnt = Math.floorDiv(fileToSend.length(), fileSizeThreshold);
// Figure out how many digits to pad the filename with - add 1 to cater for header row
int fileCntDigits = Long.toString(expectedFileCnt).length() +1;
int fileCount = 0;
boolean notEof = true;
while (notEof) {
fileCount += 1;
long fileSize = 0;
String newFilename = newFileNamePrefix + StringUtil.padLeftZeros(Integer.toString(fileCount), fileCntDigits) + "-" + filename;
addMessageMetadata(msg, newFilename);
File pendingFile = new File(msg.getAttribute(FileAttribute.MA_PENDINGFILE));
BufferedOutputStream fos = null;
try {
try {
fos = new BufferedOutputStream(new FileOutputStream(pendingFile));
} catch (IOException e) {
throw new OpenAS2Exception("Failed to initialise output file for file splitting on file " + fileCount, e);
}
if (containsHeaderRow) {
try {
fos.write(headerRow);
} catch (IOException e) {
throw new OpenAS2Exception("Failed to write header row to output file for file splitting on file " + fileCount, e);
}
fileSize += headerRowByteCount;
}
while (fileSize < fileSizeThreshold) {
String line = null;
try {
line = bufferedReader.readLine();
} catch (IOException e) {
throw new OpenAS2Exception("Failed to write output file for file splitting on file " + fileCount, e);
}
if (line == null) {
notEof = false;
break;
}
byte[] lineBytes = (line + "\n").getBytes();
fos.write(lineBytes);
fileSize += lineBytes.length;
}
fos.flush();
fos.close();
// Update the message's partnership with any additional attributes since initial call in case dynamic variables were not set initially
getSession().getPartnershipFactory().updatePartnership(msg, true);
processDocument(pendingFile, msg);
} catch (IOException e) {
throw new OpenAS2Exception("Failed to write output file for file splitting on file " + fileCount, e);
} finally {
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
}
}
}
} finally {
try {
bufferedReader.close();
} catch (IOException e) {
throw new OpenAS2Exception("Failed to close reader for input file.", e);
}
}
// Must have been successful so remove the original file
try {
IOUtil.deleteFile(fileToSend);
} catch (IOException e) {
throw new OpenAS2Exception("Failed to delete file after split processing: " + fileToSend.getAbsolutePath(), e);
IOUtil.moveFile(fileToSend, movedFile, false);
} catch (IOException e1) {
throw new OpenAS2Exception("Failed to move file for split processing: " + fileToSend.getAbsolutePath(), e1);
}
return msg;
FileSplitter fileSplitter = new FileSplitter(movedFile, fileToSend.getParent(), fileSizeThreshold, containsHeaderRow, filename, newFileNamePrefix);
new Thread(fileSplitter).start();
return null;
} else {
addMessageMetadata(msg, filename);
File pendingFile = new File(msg.getAttribute(FileAttribute.MA_PENDINGFILE));
Expand Down
Loading

0 comments on commit 641eb93

Please sign in to comment.