diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java index 58470fdd8..b1ab06fe5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java @@ -271,7 +271,7 @@ public boolean hasNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.hasNext(): RecordReader must be initialized before being used. rootRecordBuilder is null, hasNext() failed."); } int nextByte = -1; @@ -299,7 +299,7 @@ public boolean hasNext() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(e); + throw new HpccFileException("BinaryRecordReader.hasNext(): failed to peek at the next byte in the input stream:" + e.getMessage(),e); } return nextByte >= 0; @@ -314,7 +314,7 @@ public Object getNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordReader must be initialized before being used, rootRecordBuilder is null."); } if (!this.hasNext()) throw new NoSuchElementException("No next record!"); @@ -325,13 +325,13 @@ public Object getNext() throws HpccFileException record = parseRecord(this.rootRecordDefinition, this.rootRecordBuilder, this.defaultLE); if (record == null) { - throw new HpccFileException("RecordContent not found, or invalid record structure. Check logs for more information."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordContent not found, or invalid record structure. Check logs for more information."); } } catch (Exception e) { - throw new HpccFileException("Failed to parse next record: " + e.getMessage(), e); + throw new HpccFileException("BinaryRecordReader.getNext(): Failed to parse next record: " + e.getMessage(), e); } this.streamPosAfterLastRecord = this.inputStream.getStreamPosition(); @@ -370,7 +370,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars if (fd.isFixed() && fd.getDataLen() > Integer.MAX_VALUE) { - throw new UnparsableContentException("Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); + throw new UnparsableContentException("BinaryRecordReader.parseFlatField(): Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); } // Embedded field lengths are little endian diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index a846969e0..75eb5c1e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -43,6 +43,7 @@ public class DataPartition implements Serializable private String fileAccessBlob; private FileType fileType; private boolean isTLK; + private String fileName; public static enum FileType { @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i * the file type */ private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, - String fileAccessBlob, FileType fileType) + String fileAccessBlob, FileType fileType) { + this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null); + } + /** + * Construct the data part, used by makeParts. + * + * @param copylocations + * locations of all copies of this file part + * @param copyPaths + * the copy paths + * @param this_part + * part number + * @param num_parts + * number of parts + * @param clearport + * port number of clear communications + * @param sslport + * port number of ssl communications + * @param filter + * the file filter object + * @param fileAccessBlob + * file access token + * @param fileType + * the file type + * @param fileName + * the file name + */ + private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, + String fileAccessBlob, FileType fileType, String fileName) { this.this_part = this_part; this.num_parts = num_parts; this.rowservicePort = clearport; this.useSSL = sslport; this.fileFilter = filter; + this.fileName=fileName; if (this.fileFilter == null) { this.fileFilter = new FileFilter(); @@ -348,6 +378,16 @@ public boolean getUseSsl() return useSSL; } + /** + * File name being read + * + * @return filename + */ + public String getFileName() + { + return fileName; + } + /** * Copy Path. * @@ -415,8 +455,7 @@ public DataPartition setFilter(FileFilter filter) public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(this.getThisPart()); - sb.append(" copy locations: {"); + sb.append("part ").append(this.getThisPart()).append(", copy locations: {"); for (int copyindex = 0; copyindex < getCopyCount(); copyindex++) { if (copyindex > 0) sb.append(", "); @@ -471,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT); } + + /** + * Creates the partitions. + * + * @param dfuparts + * the dfuparts + * @param clusterremapper + * the clusterremapper + * @param max_parts + * the max parts + * @param filter + * the filter + * @param fileAccessBlob + * the file access blob + * @param fileType + * the file type + * @return the data partition[] + * @throws HpccFileException + * the hpcc file exception + */ + public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, + String fileAccessBlob, FileType fileType) throws HpccFileException { + return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null); + } + /** * Creates the partitions. * @@ -486,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl * the file access blob * @param fileType * the file type + * @param fileName + * the file name * @return the data partition[] * @throws HpccFileException * the hpcc file exception */ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, - String fileAccessBlob, FileType fileType) throws HpccFileException + String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException { DataPartition[] rslt = new DataPartition[dfuparts.length]; @@ -508,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(), dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob, - fileType); + fileType,fileName); new_dp.isTLK = dfuparts[i].isTopLevelKey(); rslt[i] = new_dp; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index 8df2ba73e..98bf4d47d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException { ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread); this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper, - /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType); + /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName()); // Check to see if this file has a TLK. The TLK will always be the last partition. // If we do have a TLK remove it from the standard list of data partitions. diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 1a4d0dc54..a1d161fe3 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -193,7 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -280,8 +280,8 @@ private boolean retryRead() try { this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); + this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, + this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -434,7 +434,7 @@ public boolean hasNext() if (!retryRead()) { canReadNext = false; - log.error("Read failure for " + this.dataPartition.toString(), e); + log.error("Read failure for " + this.dataPartition.toString() +":" + e.getMessage(),e); java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); exception.initCause(e); throw exception; @@ -505,7 +505,7 @@ public void close() throws Exception long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; - log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName() + " read time: " + readTimeS + "s " + " records read: " + recordsRead); } @@ -550,8 +550,8 @@ public void report() { if (getRemoteReadMessageCount() > 0) { - log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n"); + log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n"); log.warn(getRemoteReadMessages()); } } -} +} \ No newline at end of file diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index ab3c38a6e..2399ef1e6 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -61,7 +61,6 @@ public class RowServiceInputStream extends InputStream implements IProfilable private String projectedJsonRecordDefinition = null; private java.io.DataInputStream dis = null; private java.io.DataOutputStream dos = null; - private String rowServiceVersion = ""; private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct @@ -370,7 +369,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = restartInfo.tokenBin; this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; - } + } + String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -389,7 +389,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } catch (Exception e) { - prefetchException = new HpccFileException("Error while batch fetch warm starting: " + e.getMessage()); + prefetchException = new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage()); } blockingRequestFinished.set(true); @@ -734,6 +734,8 @@ private int startFetch() { return -1; } + String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the @@ -779,7 +781,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -814,7 +816,7 @@ private int startFetch() if (response.errorCode != RFCCodes.RFCStreamNoError) { - prefetchException = new HpccFileException(response.errorMessage); + prefetchException = new HpccFileException(prefix + response.errorMessage); try { close(); @@ -834,7 +836,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(e.getMessage()); + prefetchException = new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage()); } return -1; } @@ -858,13 +860,13 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failed on remote read read retry", e); + prefetchException = new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e); return -1; } } else if (this.handle == 0) { - prefetchException = new HpccFileException("Read retry failed"); + prefetchException = new HpccFileException(prefix + "response.handle was null, Read retry failed"); try { close(); @@ -898,7 +900,7 @@ else if (this.handle == 0) } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -911,6 +913,7 @@ else if (this.handle == 0) private void readDataInFetch() { + String prefix = "RowServiceInputStream.readDataInFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -948,7 +951,7 @@ private void readDataInFetch() bytesToRead = this.dis.available(); if (bytesToRead < 0) { - throw new IOException("Encountered unexpected end of stream mid fetch."); + throw new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); } // Either due to a bug in the JVM or due to a design issue @@ -966,7 +969,7 @@ private void readDataInFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -990,6 +993,7 @@ private void readDataInFetch() private void finishFetch() { + String prefix = "RowServiceInputStream.finishFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1026,7 +1030,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e); try { close(); @@ -1053,7 +1057,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -1203,12 +1207,14 @@ private void compactBuffer() @Override public int available() throws IOException { + String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + // Do the check for closed first here to avoid data races if (this.closed.get()) { if (this.prefetchException != null) { - throw new IOException("Prefetch thread exited early exception.", this.prefetchException); + throw new IOException("Prefetch thread exited early exception:" + prefetchException.getMessage(), this.prefetchException); } int bufferLen = this.readBufferDataLen.get(); @@ -1216,7 +1222,7 @@ public int available() throws IOException if (availBytes == 0) { // this.bufferWriteMutex.release(); - throw new IOException("End of input stream."); + throw new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); } } @@ -1338,7 +1344,7 @@ public int read() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),this.prefetchException); } // We are waiting on a single byte so hot loop @@ -1426,7 +1432,7 @@ public int read(byte[] b, int off, int len) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } int available = 0; @@ -1466,7 +1472,7 @@ public void reset() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } if (this.markPos < 0) @@ -1490,7 +1496,7 @@ public long skip(long n) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } // Have to read the data if we need to skip @@ -1550,6 +1556,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; + String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -1597,11 +1604,11 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); + throw new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); } catch (java.io.IOException e) { - throw new HpccFileException(e); + throw new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); } try @@ -1611,7 +1618,7 @@ private void makeActive() throws HpccFileException } catch (java.io.IOException e) { - throw new HpccFileException("Failed to create streams", e); + throw new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); } //------------------------------------------------------------------------------ @@ -1629,7 +1636,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read trans", e); + throw new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); } RowServiceResponse response = readResponse(); @@ -1648,7 +1655,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); } rowServiceVersion = new String(versionBytes, HPCCCharSet); @@ -1678,7 +1685,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read read trans", e); + throw new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); } if (CompileTimeConstants.PROFILE_CODE) @@ -1690,14 +1697,12 @@ private void makeActive() throws HpccFileException } catch (Exception e) { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); + log.error(prefix + ": Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + ":" + e.getMessage(),e); needsRetry = true; if (!setNextFilePartCopy()) { - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + throw new HpccFileException(prefix + " Unsuccessfuly attempted to connect to all file part copies", e); } } } while (needsRetry); @@ -2111,6 +2116,8 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + if (useOldProtocol) { return; @@ -2129,7 +2136,7 @@ private void sendCloseFileRequest() throws IOException } catch (IOException e) { - throw new IOException("Failed on close file with error: ", e); + throw new IOException(prefix + " Failed on close file with error: " + e.getMessage(), e); } try @@ -2138,13 +2145,14 @@ private void sendCloseFileRequest() throws IOException } catch (HpccFileException e) { - throw new IOException("Failed to close file. Unable to read response with error: ", e); + throw new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); } } private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); + String prefix="RowServiceInputStream.readResponse(): , file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); @@ -2187,7 +2195,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.socketOpTimeoutMs); break; default: break; @@ -2200,7 +2208,7 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { - throw new HpccFileException("Early data termination, no handle"); + throw new HpccFileException(prefix + "Early data termination, no handle. response length < 4"); } response.handle = dis.readInt(); @@ -2208,7 +2216,7 @@ private RowServiceResponse readResponse() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read row service response: ", e); + throw new HpccFileException(prefix + "Error while attempting to read row service response: " + e.getMessage(), e); } return response; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index f6ea288cd..be85d89e8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -438,7 +438,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.sockOpTimeoutMs); break; default: break; diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 896fa35c0..7da3d72aa 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -57,6 +57,8 @@ import org.junit.runners.MethodSorters; import org.junit.experimental.categories.Category; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.DEFAULT_READ_SIZE_OPTION; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.NO_RECORD_LIMIT; import static org.junit.Assert.*; @Category(org.hpccsystems.commons.annotations.RemoteTests.class) @@ -64,7 +66,11 @@ public class DFSReadWriteTest extends BaseRemoteTest { private static final String[] datasets = { "~benchmark::integer::20kb", "~unit_test::all_types::thor", "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; - private static final int[] expectedCounts = { 1250, 10000, 10000, 10000, 10000, 10000}; + private static final int[] expectedCounts = { 1250, 5600, 10000, 10000, 10000, 10000}; + + //use until standard test is working + // private static final String[] datasets = { "~benchmark::integer::20kb", "~benchmark::all_types::200kb"};//, "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; + // private static final int[] expectedCounts = { 1250, 5600 };//, 10000, 10000, 10000, 10000}; private static final Version newProtocolVersion = new Version(8,12,10); @@ -301,7 +307,8 @@ public void readResumeTest() throws Exception for (int i = 0; i < resumeInfo.size(); i++) { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, -1, -1, true, readSizeKB, resumeInfo.get(i)); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader( + fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, 1000000, -1, true, readSizeKB, resumeInfo.get(i)); if (fileReader.hasNext()) { @@ -1407,7 +1414,7 @@ public List readFile(HPCCFile file, Integer connectTimeoutMillis, bo try { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8); fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags); fileReaders.add(fileReader); diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java index 1e3cd5859..f01883489 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java @@ -319,7 +319,7 @@ public List getTopologyGroups(String kind) throws HpccContaineri } catch (RemoteException e) { - throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException.", e); + throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException for topology kind " + kind, e); } if (response == null) diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java index 3c7b74424..829644a84 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java @@ -271,7 +271,7 @@ public void initWsWorkUnitsClientStub(Connection conn) } catch (AxisFault e) { - initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub objct"; + initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub object:" + e.getMessage(); } } else diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 402e2782d..ac571bb24 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -212,6 +212,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } catch (Exception e) { + e.printStackTrace(); Assert.fail("Error executing test data generation scripts with error: " + e.getMessage()); } }