From a63645c453319d13d7ef8b245a9f07c10e708477 Mon Sep 17 00:00:00 2001 From: LeedDX Date: Thu, 18 Apr 2024 08:42:59 -0400 Subject: [PATCH 1/7] add exceptions --- .../dfs/client/BinaryRecordReader.java | 12 ++-- .../dfs/client/RowServiceInputStream.java | 68 +++++++++++-------- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java index 58470fdd8..b1ab06fe5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java @@ -271,7 +271,7 @@ public boolean hasNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.hasNext(): RecordReader must be initialized before being used. rootRecordBuilder is null, hasNext() failed."); } int nextByte = -1; @@ -299,7 +299,7 @@ public boolean hasNext() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(e); + throw new HpccFileException("BinaryRecordReader.hasNext(): failed to peek at the next byte in the input stream:" + e.getMessage(),e); } return nextByte >= 0; @@ -314,7 +314,7 @@ public Object getNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordReader must be initialized before being used, rootRecordBuilder is null."); } if (!this.hasNext()) throw new NoSuchElementException("No next record!"); @@ -325,13 +325,13 @@ public Object getNext() throws HpccFileException record = parseRecord(this.rootRecordDefinition, this.rootRecordBuilder, this.defaultLE); if (record == null) { - throw new HpccFileException("RecordContent not found, or invalid record structure. Check logs for more information."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordContent not found, or invalid record structure. Check logs for more information."); } } catch (Exception e) { - throw new HpccFileException("Failed to parse next record: " + e.getMessage(), e); + throw new HpccFileException("BinaryRecordReader.getNext(): Failed to parse next record: " + e.getMessage(), e); } this.streamPosAfterLastRecord = this.inputStream.getStreamPosition(); @@ -370,7 +370,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars if (fd.isFixed() && fd.getDataLen() > Integer.MAX_VALUE) { - throw new UnparsableContentException("Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); + throw new UnparsableContentException("BinaryRecordReader.parseFlatField(): Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); } // Embedded field lengths are little endian diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index ab3c38a6e..67d265762 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -328,6 +328,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co */ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { + this.recordDefinition = rd; this.projectedRecordDefinition = pRd; this.inFetchingMode = isFetching; @@ -371,6 +372,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; } + String prefix = "RowServiceInputStream constructor, file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -389,7 +391,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } catch (Exception e) { - prefetchException = new HpccFileException("Error while batch fetch warm starting: " + e.getMessage()); + prefetchException = new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage()); } blockingRequestFinished.set(true); @@ -734,6 +736,8 @@ private int startFetch() { return -1; } + String prefix = "RowServiceInputStream.startFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the @@ -779,7 +783,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -814,7 +818,7 @@ private int startFetch() if (response.errorCode != RFCCodes.RFCStreamNoError) { - prefetchException = new HpccFileException(response.errorMessage); + prefetchException = new HpccFileException(prefix + response.errorMessage); try { close(); @@ -834,7 +838,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(e.getMessage()); + prefetchException = new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage()); } return -1; } @@ -858,13 +862,13 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failed on remote read read retry", e); + prefetchException = new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e); return -1; } } else if (this.handle == 0) { - prefetchException = new HpccFileException("Read retry failed"); + prefetchException = new HpccFileException(prefix + "response.handle was null, Read retry failed"); try { close(); @@ -898,7 +902,7 @@ else if (this.handle == 0) } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -911,6 +915,7 @@ else if (this.handle == 0) private void readDataInFetch() { + String prefix = "RowServiceInputStream.readDataInFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -948,7 +953,7 @@ private void readDataInFetch() bytesToRead = this.dis.available(); if (bytesToRead < 0) { - throw new IOException("Encountered unexpected end of stream mid fetch."); + throw new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); } // Either due to a bug in the JVM or due to a design issue @@ -966,7 +971,7 @@ private void readDataInFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -990,6 +995,7 @@ private void readDataInFetch() private void finishFetch() { + String prefix = "RowServiceInputStream.finishFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1026,7 +1032,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e); try { close(); @@ -1053,7 +1059,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -1203,12 +1209,14 @@ private void compactBuffer() @Override public int available() throws IOException { + String prefix = "RowServiceInputStream.available(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + // Do the check for closed first here to avoid data races if (this.closed.get()) { if (this.prefetchException != null) { - throw new IOException("Prefetch thread exited early exception.", this.prefetchException); + throw new IOException("Prefetch thread exited early exception:" + prefetchException.getMessage(), this.prefetchException); } int bufferLen = this.readBufferDataLen.get(); @@ -1216,7 +1224,7 @@ public int available() throws IOException if (availBytes == 0) { // this.bufferWriteMutex.release(); - throw new IOException("End of input stream."); + throw new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); } } @@ -1338,7 +1346,7 @@ public int read() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),this.prefetchException); } // We are waiting on a single byte so hot loop @@ -1426,7 +1434,7 @@ public int read(byte[] b, int off, int len) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } int available = 0; @@ -1466,7 +1474,7 @@ public void reset() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } if (this.markPos < 0) @@ -1490,7 +1498,7 @@ public long skip(long n) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } // Have to read the data if we need to skip @@ -1550,6 +1558,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; + String prefix = "RowServiceInputStream.makeActive, file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -1597,11 +1606,11 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); + throw new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); } catch (java.io.IOException e) { - throw new HpccFileException(e); + throw new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); } try @@ -1611,7 +1620,7 @@ private void makeActive() throws HpccFileException } catch (java.io.IOException e) { - throw new HpccFileException("Failed to create streams", e); + throw new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); } //------------------------------------------------------------------------------ @@ -1629,7 +1638,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read trans", e); + throw new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); } RowServiceResponse response = readResponse(); @@ -1648,7 +1657,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); } rowServiceVersion = new String(versionBytes, HPCCCharSet); @@ -1678,7 +1687,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read read trans", e); + throw new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); } if (CompileTimeConstants.PROFILE_CODE) @@ -1697,7 +1706,7 @@ private void makeActive() throws HpccFileException needsRetry = true; if (!setNextFilePartCopy()) { - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + throw new HpccFileException(prefix + " Unsuccessfuly attempted to connect to all file part copies", e); } } } while (needsRetry); @@ -2111,6 +2120,8 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + if (useOldProtocol) { return; @@ -2129,7 +2140,7 @@ private void sendCloseFileRequest() throws IOException } catch (IOException e) { - throw new IOException("Failed on close file with error: ", e); + throw new IOException(prefix + " Failed on close file with error: " + e.getMessage(), e); } try @@ -2138,13 +2149,14 @@ private void sendCloseFileRequest() throws IOException } catch (HpccFileException e) { - throw new IOException("Failed to close file. Unable to read response with error: ", e); + throw new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); } } private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); + String prefix="RowServiceInputStream.readResponse(): , file part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); @@ -2200,7 +2212,7 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { - throw new HpccFileException("Early data termination, no handle"); + throw new HpccFileException(prefix + "Early data termination, no handle. response length < 4"); } response.handle = dis.readInt(); @@ -2208,7 +2220,7 @@ private RowServiceResponse readResponse() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read row service response: ", e); + throw new HpccFileException(prefix + "Error while attempting to read row service response: " + e.getMessage(), e); } return response; From 77350e0d18afa0c307c010cf72c985f12b7c62f9 Mon Sep 17 00:00:00 2001 From: LeedDX Date: Thu, 25 Apr 2024 11:33:47 -0400 Subject: [PATCH 2/7] add exceptions --- .../java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java | 2 +- .../java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java index 1e3cd5859..f01883489 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java @@ -319,7 +319,7 @@ public List getTopologyGroups(String kind) throws HpccContaineri } catch (RemoteException e) { - throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException.", e); + throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException for topology kind " + kind, e); } if (response == null) diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java index 3c7b74424..829644a84 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java @@ -271,7 +271,7 @@ public void initWsWorkUnitsClientStub(Connection conn) } catch (AxisFault e) { - initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub objct"; + initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub object:" + e.getMessage(); } } else From d3882985d72ec529a9a07a6cbd4ed0cfe193bede Mon Sep 17 00:00:00 2001 From: LeedDX Date: Fri, 10 May 2024 23:49:31 -0400 Subject: [PATCH 3/7] jira-595: better error logging --- .../hpccsystems/dfs/client/DataPartition.java | 3 +- .../dfs/client/RowServiceInputStream.java | 58 ++++++++++++++----- .../dfs/client/DFSReadWriteTest.java | 16 +++-- .../hpccsystems/ws/client/BaseRemoteTest.java | 7 ++- 4 files changed, 59 insertions(+), 25 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index a846969e0..534de4b51 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -415,8 +415,7 @@ public DataPartition setFilter(FileFilter filter) public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(this.getThisPart()); - sb.append(" copy locations: {"); + sb.append("part ").append(this.getThisPart()).append(", copy locations: {"); for (int copyindex = 0; copyindex < getCopyCount(); copyindex++) { if (copyindex > 0) sb.append(", "); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 67d265762..33658c34e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -61,7 +61,7 @@ public class RowServiceInputStream extends InputStream implements IProfilable private String projectedJsonRecordDefinition = null; private java.io.DataInputStream dis = null; private java.io.DataOutputStream dos = null; - + private String fileName = null; private String rowServiceVersion = ""; private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct @@ -326,10 +326,42 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co * @throws Exception * general exception */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception - { + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { + this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS,null); + } + /** + * A plain socket connect to a THOR node for remote read + * + * @param dp + * the data partition to read + * @param rd + * the JSON definition for the read input and output + * @param pRd + * the projected record definition + * @param connectTimeout + * the connection timeout in milliseconds + * @param limit + * the record limit to use for reading the dataset. -1 implies no limit + * @param createPrefetchThread + * Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally + * @param maxReadSizeInKB + * max readsize in kilobytes + * @param restartInfo + * information used to restart a read from a particular stream position + * @param isFetching + * Will this input stream be used to serviced batched fetch requests + * @param socketOpTimeoutMS + * Socket (read / write) operation timeout in milliseconds + * @param fileName + * fileName being read + * @throws Exception + * general exception + */ + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, String fileName) throws Exception + { this.recordDefinition = rd; + this.fileName =fileName; this.projectedRecordDefinition = pRd; this.inFetchingMode = isFetching; @@ -372,7 +404,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; } - String prefix = "RowServiceInputStream constructor, file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream constructor, file " + dataPart.getCopyPath(0) + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -736,7 +768,7 @@ private int startFetch() { return -1; } - String prefix = "RowServiceInputStream.startFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.startFetch(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; //------------------------------------------------------------------------------ @@ -915,7 +947,7 @@ else if (this.handle == 0) private void readDataInFetch() { - String prefix = "RowServiceInputStream.readDataInFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.readDataInFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -995,7 +1027,7 @@ private void readDataInFetch() private void finishFetch() { - String prefix = "RowServiceInputStream.finishFetch(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.finishFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1209,7 +1241,7 @@ private void compactBuffer() @Override public int available() throws IOException { - String prefix = "RowServiceInputStream.available(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.available(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; // Do the check for closed first here to avoid data races if (this.closed.get()) @@ -1558,7 +1590,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; - String prefix = "RowServiceInputStream.makeActive, file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.makeActive, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -1699,9 +1731,7 @@ private void makeActive() throws HpccFileException } catch (Exception e) { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); + log.error(prefix + ": Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + ":" + e.getMessage(),e); needsRetry = true; if (!setNextFilePartCopy()) @@ -2120,7 +2150,7 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { - String prefix = "RowServiceInputStream.sendCloseFileRequest(), file part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (useOldProtocol) { @@ -2156,7 +2186,7 @@ private void sendCloseFileRequest() throws IOException private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); - String prefix="RowServiceInputStream.readResponse(): , file part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; + String prefix="RowServiceInputStream.readResponse(): , file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 896fa35c0..e9db9da6d 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -57,14 +57,16 @@ import org.junit.runners.MethodSorters; import org.junit.experimental.categories.Category; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.DEFAULT_READ_SIZE_OPTION; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.NO_RECORD_LIMIT; import static org.junit.Assert.*; @Category(org.hpccsystems.commons.annotations.RemoteTests.class) @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class DFSReadWriteTest extends BaseRemoteTest { - private static final String[] datasets = { "~benchmark::integer::20kb", "~unit_test::all_types::thor", "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; - private static final int[] expectedCounts = { 1250, 10000, 10000, 10000, 10000, 10000}; + private static final String[] datasets = { "~benchmark::integer::20kb", "~benchmark::all_types::200kb"};//, "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; + private static final int[] expectedCounts = { 1250, 5600 };//, 10000, 10000, 10000, 10000}; private static final Version newProtocolVersion = new Version(8,12,10); @@ -139,7 +141,7 @@ public void nullCharTests() throws Exception writeFile(records, fileName, recordDef, connTO); HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass); - List readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS); + List readRecords = readFile(file, 100000, false, false, BinaryRecordReader.TRIM_STRINGS); for (int i = 0; i < records.size(); i++) { @@ -186,7 +188,7 @@ public void nullCharTests() throws Exception writeFile(records, fileName, recordDef, connTO); HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass); - List readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS); + List readRecords = readFile(file, 100000, false, false, BinaryRecordReader.TRIM_STRINGS); for (int i = 0; i < records.size(); i++) { @@ -301,7 +303,8 @@ public void readResumeTest() throws Exception for (int i = 0; i < resumeInfo.size(); i++) { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, -1, -1, true, readSizeKB, resumeInfo.get(i)); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader( + fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, 1000000, -1, true, readSizeKB, resumeInfo.get(i)); if (fileReader.hasNext()) { @@ -1407,7 +1410,8 @@ public List readFile(HPCCFile file, Integer connectTimeoutMillis, bo try { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS, + file.getFileName()); fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8); fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags); fileReaders.add(fileReader); diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 402e2782d..0282cd77e 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -57,10 +57,10 @@ public abstract class BaseRemoteTest protected final static String connString = System.getProperty("hpccconn", "http://localhost:8010"); protected static String thorClusterFileGroup = System.getProperty("thorgroupname"); - protected final static String thorclustername = System.getProperty("thorclustername", "thor"); + protected final static String thorclustername = System.getProperty("thorclustername", "thor_120"); protected static String roxieClusterGroup = System.getProperty("roxiegroupname"); - protected final static String roxieclustername = System.getProperty("roxieclustername", "roxie"); + protected final static String roxieclustername = System.getProperty("roxieclustername", "roxie_120"); protected final static String defaultUserName = "JunitUser"; protected static Connection connection = null; @@ -208,10 +208,11 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) // Run the generate-datasets.ecl script if present in the project resources try { - executeECLScript("generate-datasets.ecl"); + // executeECLScript("generate-datasets.ecl"); } catch (Exception e) { + e.printStackTrace(); Assert.fail("Error executing test data generation scripts with error: " + e.getMessage()); } } From c2293654d7c8e0cb8b4c8a4c5da9baa3dfb8499d Mon Sep 17 00:00:00 2001 From: LeedDX Date: Mon, 13 May 2024 09:26:18 -0400 Subject: [PATCH 4/7] jira-595: better error logging --- .../dfs/client/HpccRemoteFileReader.java | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 1a4d0dc54..fe7c73268 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -193,7 +193,36 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { + this(dp, originalRD, recBuilder, connectTimeout, limit, createPrefetchThread, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS,null); + } + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param dp + * the part of the file, name and location + * @param originalRD + * the record defintion for the dataset + * @param recBuilder + * the IRecordBuilder used to construct records + * @param connectTimeout + * the connection timeout in milliseconds, -1 for default + * @param limit + * the maximum number of records to read from the provided data partition, -1 specifies no limit + * @param createPrefetchThread + * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. + * @param readSizeKB + * read request size in KB, -1 specifies use default value + * @param resumeInfo + * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start + * @param socketOpTimeoutMs + * Socket (read / write) operation timeout in milliseconds + * @param fileName + * filename to read + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs, String fileName) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -223,7 +252,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, fileName); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -280,8 +309,8 @@ private boolean retryRead() try { this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); + this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, + this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -434,7 +463,7 @@ public boolean hasNext() if (!retryRead()) { canReadNext = false; - log.error("Read failure for " + this.dataPartition.toString(), e); + log.error("Read failure for " + this.dataPartition.toString() +":" + e.getMessage(),e); java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); exception.initCause(e); throw exception; @@ -554,4 +583,4 @@ public void report() log.warn(getRemoteReadMessages()); } } -} +} \ No newline at end of file From 680baa37d11d9110eb2cbfafe3aea56a4d026908 Mon Sep 17 00:00:00 2001 From: LeedDX Date: Wed, 15 May 2024 09:37:15 -0400 Subject: [PATCH 5/7] jira-595: better error logging --- .../org/hpccsystems/dfs/client/RowServiceInputStream.java | 4 ++-- .../org/hpccsystems/dfs/client/RowServiceOutputStream.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 33658c34e..7e74e97cb 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -404,7 +404,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; } - String prefix = "RowServiceInputStream constructor, file " + dataPart.getCopyPath(0) + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream constructor, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -2229,7 +2229,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.socketOpTimeoutMs); break; default: break; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index f6ea288cd..be85d89e8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -438,7 +438,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.sockOpTimeoutMs); break; default: break; From 33b18238093a93cceb9a6e367ad8b0225989c3a6 Mon Sep 17 00:00:00 2001 From: LeedDX Date: Wed, 15 May 2024 13:56:40 -0400 Subject: [PATCH 6/7] jira-595: add filename to datapartition --- .../hpccsystems/dfs/client/DataPartition.java | 73 ++++++++++++++++++- .../org/hpccsystems/dfs/client/HPCCFile.java | 2 +- .../dfs/client/HpccRemoteFileReader.java | 37 +--------- .../dfs/client/RowServiceInputStream.java | 54 +++----------- .../dfs/client/DFSReadWriteTest.java | 3 +- 5 files changed, 86 insertions(+), 83 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index 534de4b51..75eb5c1e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -43,6 +43,7 @@ public class DataPartition implements Serializable private String fileAccessBlob; private FileType fileType; private boolean isTLK; + private String fileName; public static enum FileType { @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i * the file type */ private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, - String fileAccessBlob, FileType fileType) + String fileAccessBlob, FileType fileType) { + this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null); + } + /** + * Construct the data part, used by makeParts. + * + * @param copylocations + * locations of all copies of this file part + * @param copyPaths + * the copy paths + * @param this_part + * part number + * @param num_parts + * number of parts + * @param clearport + * port number of clear communications + * @param sslport + * port number of ssl communications + * @param filter + * the file filter object + * @param fileAccessBlob + * file access token + * @param fileType + * the file type + * @param fileName + * the file name + */ + private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, + String fileAccessBlob, FileType fileType, String fileName) { this.this_part = this_part; this.num_parts = num_parts; this.rowservicePort = clearport; this.useSSL = sslport; this.fileFilter = filter; + this.fileName=fileName; if (this.fileFilter == null) { this.fileFilter = new FileFilter(); @@ -348,6 +378,16 @@ public boolean getUseSsl() return useSSL; } + /** + * File name being read + * + * @return filename + */ + public String getFileName() + { + return fileName; + } + /** * Copy Path. * @@ -470,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT); } + + /** + * Creates the partitions. + * + * @param dfuparts + * the dfuparts + * @param clusterremapper + * the clusterremapper + * @param max_parts + * the max parts + * @param filter + * the filter + * @param fileAccessBlob + * the file access blob + * @param fileType + * the file type + * @return the data partition[] + * @throws HpccFileException + * the hpcc file exception + */ + public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, + String fileAccessBlob, FileType fileType) throws HpccFileException { + return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null); + } + /** * Creates the partitions. * @@ -485,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl * the file access blob * @param fileType * the file type + * @param fileName + * the file name * @return the data partition[] * @throws HpccFileException * the hpcc file exception */ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, - String fileAccessBlob, FileType fileType) throws HpccFileException + String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException { DataPartition[] rslt = new DataPartition[dfuparts.length]; @@ -507,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(), dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob, - fileType); + fileType,fileName); new_dp.isTLK = dfuparts[i].isTopLevelKey(); rslt[i] = new_dp; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index 8df2ba73e..98bf4d47d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException { ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread); this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper, - /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType); + /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName()); // Check to see if this file has a TLK. The TLK will always be the last partition. // If we do have a TLK remove it from the standard list of data partitions. diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index fe7c73268..a1d161fe3 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -193,36 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { - this(dp, originalRD, recBuilder, connectTimeout, limit, createPrefetchThread, readSizeKB, resumeInfo, RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS,null); - } - /** - * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. - * - * @param dp - * the part of the file, name and location - * @param originalRD - * the record defintion for the dataset - * @param recBuilder - * the IRecordBuilder used to construct records - * @param connectTimeout - * the connection timeout in milliseconds, -1 for default - * @param limit - * the maximum number of records to read from the provided data partition, -1 specifies no limit - * @param createPrefetchThread - * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. - * @param readSizeKB - * read request size in KB, -1 specifies use default value - * @param resumeInfo - * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start - * @param socketOpTimeoutMs - * Socket (read / write) operation timeout in milliseconds - * @param fileName - * filename to read - * @throws Exception - * general exception - */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs, String fileName) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -252,7 +223,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, fileName); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -534,7 +505,7 @@ public void close() throws Exception long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; - log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName() + " read time: " + readTimeS + "s " + " records read: " + recordsRead); } @@ -579,7 +550,7 @@ public void report() { if (getRemoteReadMessageCount() > 0) { - log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n"); + log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n"); log.warn(getRemoteReadMessages()); } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 7e74e97cb..2399ef1e6 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -61,7 +61,6 @@ public class RowServiceInputStream extends InputStream implements IProfilable private String projectedJsonRecordDefinition = null; private java.io.DataInputStream dis = null; private java.io.DataOutputStream dos = null; - private String fileName = null; private String rowServiceVersion = ""; private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct @@ -326,42 +325,9 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co * @throws Exception * general exception */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { - this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS,null); - } - - /** - * A plain socket connect to a THOR node for remote read - * - * @param dp - * the data partition to read - * @param rd - * the JSON definition for the read input and output - * @param pRd - * the projected record definition - * @param connectTimeout - * the connection timeout in milliseconds - * @param limit - * the record limit to use for reading the dataset. -1 implies no limit - * @param createPrefetchThread - * Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally - * @param maxReadSizeInKB - * max readsize in kilobytes - * @param restartInfo - * information used to restart a read from a particular stream position - * @param isFetching - * Will this input stream be used to serviced batched fetch requests - * @param socketOpTimeoutMS - * Socket (read / write) operation timeout in milliseconds - * @param fileName - * fileName being read - * @throws Exception - * general exception - */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, String fileName) throws Exception + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception { this.recordDefinition = rd; - this.fileName =fileName; this.projectedRecordDefinition = pRd; this.inFetchingMode = isFetching; @@ -403,8 +369,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = restartInfo.tokenBin; this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; - } - String prefix = "RowServiceInputStream constructor, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + } + String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -768,7 +734,7 @@ private int startFetch() { return -1; } - String prefix = "RowServiceInputStream.startFetch(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; //------------------------------------------------------------------------------ @@ -947,7 +913,7 @@ else if (this.handle == 0) private void readDataInFetch() { - String prefix = "RowServiceInputStream.readDataInFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.readDataInFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1027,7 +993,7 @@ private void readDataInFetch() private void finishFetch() { - String prefix = "RowServiceInputStream.finishFetch(), file " + fileName + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.finishFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1241,7 +1207,7 @@ private void compactBuffer() @Override public int available() throws IOException { - String prefix = "RowServiceInputStream.available(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; // Do the check for closed first here to avoid data races if (this.closed.get()) @@ -1590,7 +1556,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; - String prefix = "RowServiceInputStream.makeActive, file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -2150,7 +2116,7 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { - String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (useOldProtocol) { @@ -2186,7 +2152,7 @@ private void sendCloseFileRequest() throws IOException private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); - String prefix="RowServiceInputStream.readResponse(): , file " + fileName + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; + String prefix="RowServiceInputStream.readResponse(): , file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index e9db9da6d..7b0634156 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1410,8 +1410,7 @@ public List readFile(HPCCFile file, Integer connectTimeoutMillis, bo try { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS, - file.getFileName()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8); fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags); fileReaders.add(fileReader); From 3d47b987a8d58fe3f684bdc8f5d7b3b74906bb40 Mon Sep 17 00:00:00 2001 From: LeedDX Date: Thu, 16 May 2024 07:49:36 -0400 Subject: [PATCH 7/7] jira-595: revert test changes --- .../org/hpccsystems/dfs/client/DFSReadWriteTest.java | 12 ++++++++---- .../org/hpccsystems/ws/client/BaseRemoteTest.java | 6 +++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 7b0634156..7da3d72aa 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -65,8 +65,12 @@ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class DFSReadWriteTest extends BaseRemoteTest { - private static final String[] datasets = { "~benchmark::integer::20kb", "~benchmark::all_types::200kb"};//, "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; - private static final int[] expectedCounts = { 1250, 5600 };//, 10000, 10000, 10000, 10000}; + private static final String[] datasets = { "~benchmark::integer::20kb", "~unit_test::all_types::thor", "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; + private static final int[] expectedCounts = { 1250, 5600, 10000, 10000, 10000, 10000}; + + //use until standard test is working + // private static final String[] datasets = { "~benchmark::integer::20kb", "~benchmark::all_types::200kb"};//, "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; + // private static final int[] expectedCounts = { 1250, 5600 };//, 10000, 10000, 10000, 10000}; private static final Version newProtocolVersion = new Version(8,12,10); @@ -141,7 +145,7 @@ public void nullCharTests() throws Exception writeFile(records, fileName, recordDef, connTO); HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass); - List readRecords = readFile(file, 100000, false, false, BinaryRecordReader.TRIM_STRINGS); + List readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS); for (int i = 0; i < records.size(); i++) { @@ -188,7 +192,7 @@ public void nullCharTests() throws Exception writeFile(records, fileName, recordDef, connTO); HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass); - List readRecords = readFile(file, 100000, false, false, BinaryRecordReader.TRIM_STRINGS); + List readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS); for (int i = 0; i < records.size(); i++) { diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 0282cd77e..ac571bb24 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -57,10 +57,10 @@ public abstract class BaseRemoteTest protected final static String connString = System.getProperty("hpccconn", "http://localhost:8010"); protected static String thorClusterFileGroup = System.getProperty("thorgroupname"); - protected final static String thorclustername = System.getProperty("thorclustername", "thor_120"); + protected final static String thorclustername = System.getProperty("thorclustername", "thor"); protected static String roxieClusterGroup = System.getProperty("roxiegroupname"); - protected final static String roxieclustername = System.getProperty("roxieclustername", "roxie_120"); + protected final static String roxieclustername = System.getProperty("roxieclustername", "roxie"); protected final static String defaultUserName = "JunitUser"; protected static Connection connection = null; @@ -208,7 +208,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) // Run the generate-datasets.ecl script if present in the project resources try { - // executeECLScript("generate-datasets.ecl"); + executeECLScript("generate-datasets.ecl"); } catch (Exception e) {