diff --git a/pom.xml b/pom.xml index 22e03f6..e371fc6 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 costosys - 1.2.2 + 1.2.3 Corpus Storage System A utility for managing documents stored in a PostgreSQL database. The documents are imported into a PostgreSQL DB as full texts with the goal to be able to retrieve the documents by their PubMedID efficiently. diff --git a/src/main/java/de/julielab/xmlData/dataBase/DataBaseConnector.java b/src/main/java/de/julielab/xmlData/dataBase/DataBaseConnector.java index b94c1f1..0c24919 100644 --- a/src/main/java/de/julielab/xmlData/dataBase/DataBaseConnector.java +++ b/src/main/java/de/julielab/xmlData/dataBase/DataBaseConnector.java @@ -2395,6 +2395,7 @@ public void updateFromRowIterator(Iterator> it, String table boolean wasAutoCommit = true; try { wasAutoCommit = conn.getAutoCommit(); + LOG.trace("Retrieving mirror subsets of table {}", tableName); LinkedHashMap mirrorNames = getMirrorSubsetNames(conn, tableName); List mirrorStatements = null; @@ -2405,92 +2406,67 @@ public void updateFromRowIterator(Iterator> it, String table } } - int i = 0; - PreparedStatement ps = conn.prepareStatement(statementString); - List> fields = fieldConfig.getFields(); - String[] primaryKey = fieldConfig.getPrimaryKey(); + final int sliceSize = 10000; + LOG.trace("Reading update data slice up to {} documents. Within this slice, duplicate document IDs will be handled by only taking the last document into account.", sliceSize); - // Set alreadyUpdatedIds = new HashSet<>(); - int numAlreadyExisted = 0; - // This map will assemble for each primary key only the NEWEST (in - // XML the latest in Medline) row. Its size is an approximation of - // Medline blob XML files. - // TODO we should actually check for the PMID version and take the - // highest - Map> rowsByPk = new HashMap<>(commitBatchSize * 10); - while (it.hasNext()) { - Map row = it.next(); - StringBuilder rowPrimaryKey = new StringBuilder(); - for (int j = 0; j < primaryKey.length; j++) { - String keyFieldName = primaryKey[j]; - Object key = row.get(keyFieldName); - rowPrimaryKey.append(key); + String[] primaryKey = fieldConfig.getPrimaryKey(); + // This is an outer loop to help us cut the documents we get from the iterator in slices. This is very + // useful or even required when reading large archives from a single iterator. + while(it.hasNext()) { + // This map will assemble for each primary key only the NEWEST (in + // XML the latest in Medline) row. Its size is an approximation of + // Medline blob XML files. + // TODO we should actually check for the PMID version and take the highest + Map> rowsByPk = new HashMap<>(); + while (it.hasNext() && rowsByPk.size() < sliceSize) { + Map row = it.next(); + StringBuilder rowPrimaryKey = new StringBuilder(); + for (int j = 0; j < primaryKey.length; j++) { + String keyFieldName = primaryKey[j]; + Object key = row.get(keyFieldName); + rowPrimaryKey.append(key); + } + String pk = rowPrimaryKey.toString(); + rowsByPk.put(pk, row); } - String pk = rowPrimaryKey.toString(); - if (rowsByPk.containsKey(pk)) - ++numAlreadyExisted; - rowsByPk.put(pk, row); - } - List> cache = new ArrayList>(commitBatchSize); - // while (it.hasNext()) { - // Map row = it.next(); - for (Map row : rowsByPk.values()) { - // StringBuilder rowPrimaryKey = new StringBuilder(); - - for (int j = 0; j < fields.size() + primaryKey.length; j++) { - // for (int j = 0; j < fields.size(); j++) { - if (j < fields.size()) { - Map field = fields.get(j); - String fieldName = field.get(JulieXMLConstants.NAME); - setPreparedStatementParameterWithType(j + 1, ps, row.get(fieldName), null, null); - } else { - String key = primaryKey[j - fields.size()]; - Object keyValue = row.get(key); - // rowPrimaryKey.append(keyValue); - setPreparedStatementParameterWithType(j + 1, ps, keyValue, null, null); + PreparedStatement ps = conn.prepareStatement(statementString); + List> fields = fieldConfig.getFields(); + List> cache = new ArrayList<>(commitBatchSize); + int i = 0; + for (Map row : rowsByPk.values()) { + + for (int j = 0; j < fields.size() + primaryKey.length; j++) { + if (j < fields.size()) { + Map field = fields.get(j); + String fieldName = field.get(JulieXMLConstants.NAME); + setPreparedStatementParameterWithType(j + 1, ps, row.get(fieldName), null, null); + } else { + String key = primaryKey[j - fields.size()]; + Object keyValue = row.get(key); + setPreparedStatementParameterWithType(j + 1, ps, keyValue, null, null); + } + } + ps.addBatch(); + cache.add(row); + + ++i; + if (i >= commitBatchSize) { + LOG.trace("Committing batch of size {}", i); + executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames, + mirrorStatements, ps, cache); + cache.clear(); + i = 0; } } - // if (alreadyUpdatedIds.add(rowPrimaryKey.toString())) { - ps.addBatch(); - cache.add(row); - - // } else { - // ps.clearParameters(); - // LOG.warn( - // "Primary key {} exists multiple times in one batch of data. - // All but the first occurence will be ignored.", - // rowPrimaryKey.toString()); - // numAlreadyExisted++; - // } - - ++i; - if (i >= commitBatchSize) { - LOG.trace("Committing batch of size {}", i); + if (i > 0) { + LOG.trace("Committing last batch of size {}", i); executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames, mirrorStatements, ps, cache); - cache.clear(); - i = 0; } + String msg = "Updated {} documents."; } - if (i > 0) { - LOG.trace("Committing last batch of size {}", i); - executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames, - mirrorStatements, ps, cache); - } - // LOG.info( - // "Updated {} documents. {} documents were skipped because there - // existed documents with same primary keys multiple times in the - // data.", - // alreadyUpdatedIds.size(), numAlreadyExisted); - String msg = "Updated {} documents. {} documents were skipped because there existed documents with same primary keys multiple times in the data. In those cases, the last occurrence of the document was inserted into the database"; - if (numAlreadyExisted == 0) - LOG.debug( - msg, - rowsByPk.size(), numAlreadyExisted); - else - LOG.warn(msg, rowsByPk.size(), numAlreadyExisted); } catch (SQLException e) { LOG.error( "SQL error while updating table {}. Database configuration is: {}. Table schema configuration is: {}",