diff --git a/src/de/pangaea/metadataportal/processor/DocumentProcessor.java b/src/de/pangaea/metadataportal/processor/DocumentProcessor.java index df48ced..7a2a78c 100644 --- a/src/de/pangaea/metadataportal/processor/DocumentProcessor.java +++ b/src/de/pangaea/metadataportal/processor/DocumentProcessor.java @@ -34,15 +34,21 @@ import org.apache.commons.logging.LogFactory; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse.Result; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -51,6 +57,9 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilders; import de.pangaea.metadataportal.config.HarvesterConfig; import de.pangaea.metadataportal.utils.KeyValuePairs; @@ -92,7 +101,7 @@ public final class DocumentProcessor { public static final int DEFAULT_MAX_QUEUE = 100; public static final int DEFAULT_CONCURRENT_BULK_REQUESTS = 1; public static final int DEFAULT_NUM_THREADS = 1; - public static final int DEFAULT_DELETE_UNSEEN_BULK_SIZE = 1000; + public static final int DEFAULT_DELETE_UNSEEN_BULK_SIZE = 10_000; public static final XContentType DEFAULT_CONTENT_TYPE = XContentType.CBOR; DocumentProcessor(Client client, HarvesterConfig iconfig, String targetIndex) { @@ -211,19 +220,55 @@ public void addDocument(MetadataDocument mdoc) throws BackgroundFailure { */ private void deleteUnseenDocuments(Set validIdentifiers) { log.info("Removing metadata items not seen while harvesting..."); - - final IdsQueryBuilder bld = QueryBuilders.idsQuery(iconfig.root.typeName); - bld.ids().addAll(validIdentifiers); - final QueryBuilder query = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id)) - .mustNot(bld); - - final BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) - .filter(query) - .source(targetIndex) - .get(); + long deleted = 0L; - log.info("Deleted a total number of " + response.getDeleted() + " metadata items."); + if (validIdentifiers.size() < DEFAULT_DELETE_UNSEEN_BULK_SIZE) { + final IdsQueryBuilder bld = QueryBuilders.idsQuery(iconfig.root.typeName); + bld.ids().addAll(validIdentifiers); + final QueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id)) + .mustNot(bld); + + final BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) + .filter(query) + .source(targetIndex) + .get(); + deleted += response.getDeleted(); + } else { + final TimeValue time = TimeValue.timeValueMinutes(10); + SearchResponse scrollResp = client.prepareSearch(targetIndex) + .setTypes(iconfig.root.typeName) + .setQuery(QueryBuilders.termQuery(iconfig.root.fieldnameSource, iconfig.id)) + .setFetchSource(false) + .setSize(DEFAULT_DELETE_UNSEEN_BULK_SIZE) + .addSort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME)) + .setScroll(time) + .get(); + do { + final BulkRequestBuilder bulk = client.prepareBulk(); + for (final SearchHit hit : scrollResp.getHits()) { + final String id = hit.getId(); + if (!validIdentifiers.contains(id)) { + bulk.add(new DeleteRequest(targetIndex, iconfig.root.typeName, id)); + } + } + if (bulk.numberOfActions() > 0) { + final BulkResponse bulkResp = bulk.get(); + for (BulkItemResponse resp : bulkResp) { + final DeleteResponse delResp = resp.getResponse(); + if (delResp.getResult() != Result.DELETED) { + log.warn("Metadata item with ID '" + delResp.getId() + "' was not found when we tried to delete it."); + } else { + deleted++; + } + } + } + if (scrollResp.getScrollId() == null) break; + scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(time).get(); + } while (scrollResp.getHits().getHits().length > 0); + } + + log.info("Deleted a total number of " + deleted + " metadata items."); } private Runnable getRunnable(final MetadataDocument mdoc) {