Skip to content

Commit

Permalink
For huge validIdentifiers sequentially delete (to not allocate the hu…
Browse files Browse the repository at this point in the history
…ge terms query) in addition to the Set<String>
  • Loading branch information
uschindler committed Dec 1, 2020
1 parent 8015f74 commit bd6fa86
Showing 1 changed file with 58 additions and 13 deletions.
71 changes: 58 additions & 13 deletions src/de/pangaea/metadataportal/processor/DocumentProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse.Result;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -51,6 +57,9 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;

import de.pangaea.metadataportal.config.HarvesterConfig;
import de.pangaea.metadataportal.utils.KeyValuePairs;
Expand Down Expand Up @@ -92,7 +101,7 @@ public final class DocumentProcessor {
public static final int DEFAULT_MAX_QUEUE = 100;
public static final int DEFAULT_CONCURRENT_BULK_REQUESTS = 1;
public static final int DEFAULT_NUM_THREADS = 1;
public static final int DEFAULT_DELETE_UNSEEN_BULK_SIZE = 1000;
public static final int DEFAULT_DELETE_UNSEEN_BULK_SIZE = 10_000;
public static final XContentType DEFAULT_CONTENT_TYPE = XContentType.CBOR;

DocumentProcessor(Client client, HarvesterConfig iconfig, String targetIndex) {
Expand Down Expand Up @@ -211,19 +220,55 @@ public void addDocument(MetadataDocument mdoc) throws BackgroundFailure {
*/
private void deleteUnseenDocuments(Set<String> validIdentifiers) {
log.info("Removing metadata items not seen while harvesting...");

final IdsQueryBuilder bld = QueryBuilders.idsQuery(iconfig.root.typeName);
bld.ids().addAll(validIdentifiers);
final QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id))
.mustNot(bld);

final BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(query)
.source(targetIndex)
.get();
long deleted = 0L;

log.info("Deleted a total number of " + response.getDeleted() + " metadata items.");
if (validIdentifiers.size() < DEFAULT_DELETE_UNSEEN_BULK_SIZE) {
final IdsQueryBuilder bld = QueryBuilders.idsQuery(iconfig.root.typeName);
bld.ids().addAll(validIdentifiers);
final QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id))
.mustNot(bld);

final BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(query)
.source(targetIndex)
.get();
deleted += response.getDeleted();
} else {
final TimeValue time = TimeValue.timeValueMinutes(10);
SearchResponse scrollResp = client.prepareSearch(targetIndex)
.setTypes(iconfig.root.typeName)
.setQuery(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id))
.setFetchSource(false)
.setSize(DEFAULT_DELETE_UNSEEN_BULK_SIZE)
.addSort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))
.setScroll(time)
.get();
do {
final BulkRequestBuilder bulk = client.prepareBulk();
for (final SearchHit hit : scrollResp.getHits()) {
final String id = hit.getId();
if (!validIdentifiers.contains(id)) {
bulk.add(new DeleteRequest(targetIndex, iconfig.root.typeName, id));
}
}
if (bulk.numberOfActions() > 0) {
final BulkResponse bulkResp = bulk.get();
for (BulkItemResponse resp : bulkResp) {
final DeleteResponse delResp = resp.<DeleteResponse>getResponse();
if (delResp.getResult() != Result.DELETED) {
log.warn("Metadata item with ID '" + delResp.getId() + "' was not found when we tried to delete it.");
} else {
deleted++;
}
}
}
if (scrollResp.getScrollId() == null) break;
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(time).get();
} while (scrollResp.getHits().getHits().length > 0);
}

log.info("Deleted a total number of " + deleted + " metadata items.");
}

private Runnable getRunnable(final MetadataDocument mdoc) {
Expand Down

0 comments on commit bd6fa86

Please sign in to comment.