Skip to content

Commit

Permalink
Version 1.2.3. Updates of large collections now needs less memory.
Browse files Browse the repository at this point in the history
Until this version, for updates the complete iterator, resulting in
all documents contained in one file, was exhaustively read to
avoid duplicate document IDs in the same update batch which can lead
to primary key violation errors. This has now been broken down
in updating batches up to 10k documents.
  • Loading branch information
khituras committed Dec 6, 2018
1 parent 0c3f08d commit 863ce99
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>costosys</artifactId>
<version>1.2.2</version>
<version>1.2.3</version>
<name>Corpus Storage System</name>
<description>A utility for managing documents stored in a PostgreSQL database. The documents are imported into a
PostgreSQL DB as full texts with the goal to be able to retrieve the documents by their PubMedID efficiently.
Expand Down
128 changes: 52 additions & 76 deletions src/main/java/de/julielab/xmlData/dataBase/DataBaseConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,7 @@ public void updateFromRowIterator(Iterator<Map<String, Object>> it, String table
boolean wasAutoCommit = true;
try {
wasAutoCommit = conn.getAutoCommit();
LOG.trace("Retrieving mirror subsets of table {}", tableName);
LinkedHashMap<String, Boolean> mirrorNames = getMirrorSubsetNames(conn, tableName);

List<PreparedStatement> mirrorStatements = null;
Expand All @@ -2405,92 +2406,67 @@ public void updateFromRowIterator(Iterator<Map<String, Object>> it, String table
}
}

int i = 0;
PreparedStatement ps = conn.prepareStatement(statementString);
List<Map<String, String>> fields = fieldConfig.getFields();
String[] primaryKey = fieldConfig.getPrimaryKey();
final int sliceSize = 10000;
LOG.trace("Reading update data slice up to {} documents. Within this slice, duplicate document IDs will be handled by only taking the last document into account.", sliceSize);

// Set<String> alreadyUpdatedIds = new HashSet<>();
int numAlreadyExisted = 0;
// This map will assemble for each primary key only the NEWEST (in
// XML the latest in Medline) row. Its size is an approximation of
// Medline blob XML files.
// TODO we should actually check for the PMID version and take the
// highest
Map<String, Map<String, Object>> rowsByPk = new HashMap<>(commitBatchSize * 10);
while (it.hasNext()) {
Map<String, Object> row = it.next();
StringBuilder rowPrimaryKey = new StringBuilder();
for (int j = 0; j < primaryKey.length; j++) {
String keyFieldName = primaryKey[j];
Object key = row.get(keyFieldName);
rowPrimaryKey.append(key);
String[] primaryKey = fieldConfig.getPrimaryKey();
// This is an outer loop to help us cut the documents we get from the iterator in slices. This is very
// useful or even required when reading large archives from a single iterator.
while(it.hasNext()) {
// This map will assemble for each primary key only the NEWEST (in
// XML the latest in Medline) row. Its size is an approximation of
// Medline blob XML files.
// TODO we should actually check for the PMID version and take the highest
Map<String, Map<String, Object>> rowsByPk = new HashMap<>();
while (it.hasNext() && rowsByPk.size() < sliceSize) {
Map<String, Object> row = it.next();
StringBuilder rowPrimaryKey = new StringBuilder();
for (int j = 0; j < primaryKey.length; j++) {
String keyFieldName = primaryKey[j];
Object key = row.get(keyFieldName);
rowPrimaryKey.append(key);

}
String pk = rowPrimaryKey.toString();
rowsByPk.put(pk, row);
}
String pk = rowPrimaryKey.toString();
if (rowsByPk.containsKey(pk))
++numAlreadyExisted;
rowsByPk.put(pk, row);
}

List<Map<String, Object>> cache = new ArrayList<Map<String, Object>>(commitBatchSize);
// while (it.hasNext()) {
// Map<String, Object> row = it.next();
for (Map<String, Object> row : rowsByPk.values()) {
// StringBuilder rowPrimaryKey = new StringBuilder();

for (int j = 0; j < fields.size() + primaryKey.length; j++) {
// for (int j = 0; j < fields.size(); j++) {
if (j < fields.size()) {
Map<String, String> field = fields.get(j);
String fieldName = field.get(JulieXMLConstants.NAME);
setPreparedStatementParameterWithType(j + 1, ps, row.get(fieldName), null, null);
} else {
String key = primaryKey[j - fields.size()];
Object keyValue = row.get(key);
// rowPrimaryKey.append(keyValue);
setPreparedStatementParameterWithType(j + 1, ps, keyValue, null, null);
PreparedStatement ps = conn.prepareStatement(statementString);
List<Map<String, String>> fields = fieldConfig.getFields();
List<Map<String, Object>> cache = new ArrayList<>(commitBatchSize);
int i = 0;
for (Map<String, Object> row : rowsByPk.values()) {

for (int j = 0; j < fields.size() + primaryKey.length; j++) {
if (j < fields.size()) {
Map<String, String> field = fields.get(j);
String fieldName = field.get(JulieXMLConstants.NAME);
setPreparedStatementParameterWithType(j + 1, ps, row.get(fieldName), null, null);
} else {
String key = primaryKey[j - fields.size()];
Object keyValue = row.get(key);
setPreparedStatementParameterWithType(j + 1, ps, keyValue, null, null);
}
}
ps.addBatch();
cache.add(row);

++i;
if (i >= commitBatchSize) {
LOG.trace("Committing batch of size {}", i);
executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames,
mirrorStatements, ps, cache);
cache.clear();
i = 0;
}
}
// if (alreadyUpdatedIds.add(rowPrimaryKey.toString())) {
ps.addBatch();
cache.add(row);

// } else {
// ps.clearParameters();
// LOG.warn(
// "Primary key {} exists multiple times in one batch of data.
// All but the first occurence will be ignored.",
// rowPrimaryKey.toString());
// numAlreadyExisted++;
// }

++i;
if (i >= commitBatchSize) {
LOG.trace("Committing batch of size {}", i);
if (i > 0) {
LOG.trace("Committing last batch of size {}", i);
executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames,
mirrorStatements, ps, cache);
cache.clear();
i = 0;
}
String msg = "Updated {} documents.";
}
if (i > 0) {
LOG.trace("Committing last batch of size {}", i);
executeAndCommitUpdate(tableName, conn, commit, schemaName, fieldConfig, mirrorNames,
mirrorStatements, ps, cache);
}
// LOG.info(
// "Updated {} documents. {} documents were skipped because there
// existed documents with same primary keys multiple times in the
// data.",
// alreadyUpdatedIds.size(), numAlreadyExisted);
String msg = "Updated {} documents. {} documents were skipped because there existed documents with same primary keys multiple times in the data. In those cases, the last occurrence of the document was inserted into the database";
if (numAlreadyExisted == 0)
LOG.debug(
msg,
rowsByPk.size(), numAlreadyExisted);
else
LOG.warn(msg, rowsByPk.size(), numAlreadyExisted);
} catch (SQLException e) {
LOG.error(
"SQL error while updating table {}. Database configuration is: {}. Table schema configuration is: {}",
Expand Down

0 comments on commit 863ce99

Please sign in to comment.