Skip to content

Commit

Permalink
Allow delete by query to be "batched" by running them async
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-hotz committed Nov 12, 2024
1 parent 827539e commit 61fb7ad
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,11 @@ public void synchronizeDbWithIndex(ServiceContext context) throws Exception {
}

// remove from index metadata not in DBMS
for (String id : docs.keySet()) {
getSearchManager().deleteByQuery(String.format("+id:%s", id));
LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id);
try (BatchingDeletionSubmittor submittor = new BatchingDeletionSubmittor()) {
for (String id : docs.keySet()) {
getSearchManager().deleteByQuery(String.format("+id:%s", id), submittor);
LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id);
}
}
}

Expand Down Expand Up @@ -342,7 +344,7 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS
// --- update search criteria
getSearchManager().deleteByUuid(findOne.getUuid(), submittor);
} else {
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), submittor);
}
}

Expand Down Expand Up @@ -384,7 +386,7 @@ public void purgeMetadata(ServiceContext context, String metadataId, boolean wit
@Override
public void deleteMetadataGroup(ServiceContext context, String metadataId) throws Exception {
deleteMetadataFromDB(context, metadataId);
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), DirectDeletionSubmittor.INSTANCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS
deleteMetadataFromDB(context, metadataId);
getSearchManager().deleteByUuid(findOne.getUuid(), submittor);
} else {
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), submittor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public void cancelEditingSession(ServiceContext context, String id) throws Excep

// --- remove metadata
xmlSerializer.delete(id, ServiceContext.get());
searchManager.deleteByQuery(String.format("+id:%s", id));
searchManager.deleteByQuery(String.format("+id:%s", id), DirectDeletionSubmittor.INSTANCE);

// Unset METADATA_EDITING_CREATED_DRAFT flag
context.getUserSession().removeProperty(Geonet.Session.METADATA_EDITING_CREATED_DRAFT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.fao.geonet.kernel.datamanager.IMetadataIndexer;
import org.fao.geonet.kernel.datamanager.IMetadataUtils;
import org.fao.geonet.kernel.search.index.OverviewIndexFieldUpdater;
import org.fao.geonet.kernel.search.submission.DirectDeletionSubmittor;
import org.fao.geonet.kernel.search.submission.IDeletionSubmittor;
import org.fao.geonet.kernel.search.submission.batch.BatchingIndexSubmittor;
import org.fao.geonet.kernel.search.submission.IIndexSubmittor;
Expand Down Expand Up @@ -479,6 +480,18 @@ public void handleDeletionResponse(BulkResponse bulkResponse, List<String> docum
}
}

public void handleDeletionResponse(DeleteByQueryResponse deleteByQueryResponse, String query) {
if (!deleteByQueryResponse.failures().isEmpty()) {
StringBuilder stringBuilder = new StringBuilder();

deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString()));

throw new RuntimeException(String.format(
"Error during removal of query %s. Errors are '%s'.", query, stringBuilder.toString()
));
}
}

private void checkIndexResponse(BulkResponse bulkItemResponses,
Map<String, String> documents) throws IOException {
if (bulkItemResponses.errors()) {
Expand Down Expand Up @@ -810,7 +823,7 @@ public Map<String, String> getFieldsValues(String id, Set<String> fields, String


public void clearIndex() throws Exception {
client.deleteByQuery(defaultIndex, "*:*");
deleteByQuery("*:*", DirectDeletionSubmittor.INSTANCE);
}

static ImmutableSet<String> docsChangeIncludedFields;
Expand Down Expand Up @@ -868,13 +881,13 @@ public ISODate getDocChangeDate(String mdId) throws Exception {
}

@Override
public void deleteByQuery(String txt) throws Exception {
client.deleteByQuery(defaultIndex, txt);
public void deleteByQuery(String query, IDeletionSubmittor submittor) throws Exception {
submittor.submitQueryToIndex(query, this);
}

@Override
public void deleteByUuid(String uuid, IDeletionSubmittor submittor) throws Exception {
submittor.submitToIndex(uuid, this);
submittor.submitUUIDToIndex(uuid, this);
}

public long getNumDocs(String query) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ boolean rebuildIndex(ServiceContext context,
/**
* deletes a document by a query.
*/
void deleteByQuery(String query) throws Exception;
void deleteByQuery(String query, IDeletionSubmittor submittor) throws Exception;

/**
* deletes a document by its uuid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import org.fao.geonet.index.es.EsRestClient;
import org.fao.geonet.kernel.search.EsSearchManager;

Expand All @@ -15,7 +17,7 @@ public class DirectDeletionSubmittor implements IDeletionSubmittor {
private DirectDeletionSubmittor() {}

@Override
public void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException {
public void submitUUIDToIndex(String uuid, EsSearchManager searchManager) throws IOException {
EsRestClient restClient = searchManager.getClient();
List<String> documents = Collections.singletonList(uuid);

Expand All @@ -24,4 +26,14 @@ public void submitToIndex(String uuid, EsSearchManager searchManager) throws IOE

searchManager.handleDeletionResponse(bulkItemResponses, documents);
}

@Override
public void submitQueryToIndex(String query, EsSearchManager searchManager) throws IOException {
EsRestClient restClient = searchManager.getClient();

DeleteByQueryRequest deleteByQueryRequest = restClient.buildDeleteByQuery(searchManager.getDefaultIndex(), query);
final DeleteByQueryResponse deleteByQueryResponse = restClient.getClient().deleteByQuery(deleteByQueryRequest);

searchManager.handleDeletionResponse(deleteByQueryResponse, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@

public interface IDeletionSubmittor {

void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException;
void submitUUIDToIndex(String uuid, EsSearchManager searchManager) throws IOException;

void submitQueryToIndex(String query, EsSearchManager searchManager) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.fao.geonet.kernel.search.submission.batch;

import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import org.fao.geonet.index.es.EsRestClient;
import org.fao.geonet.kernel.search.EsSearchManager;
import org.fao.geonet.kernel.search.submission.IDeletionSubmittor;
Expand Down Expand Up @@ -30,6 +32,15 @@ private void deleteDocumentsFromIndex(List<String> toDelete) {
.thenAcceptAsync(bulkItemResponses -> searchManager.handleDeletionResponse(bulkItemResponses, toDelete));
queueFuture(currentIndexFuture);
}

private void deleteDocumentByQuery(String query) {
EsRestClient restClient = searchManager.getClient();

DeleteByQueryRequest deleteByQueryRequest = restClient.buildDeleteByQuery(searchManager.getDefaultIndex(), query);
final CompletableFuture<Void> deleteByQueryFuture = restClient.getAsyncClient().deleteByQuery(deleteByQueryRequest)
.thenAcceptAsync(response -> searchManager.handleDeletionResponse(response, query));
queueFuture(deleteByQueryFuture);
}
}

public BatchingDeletionSubmittor() {
Expand All @@ -41,7 +52,7 @@ public BatchingDeletionSubmittor(int estimatedTotalSize) {
}

@Override
public void submitToIndex(String uuid, EsSearchManager searchManager) {
public void submitUUIDToIndex(String uuid, EsSearchManager searchManager) {
if (state.closed) {
throw new IllegalStateException("Attempting to use a closed " + this.getClass().getSimpleName());
}
Expand All @@ -55,4 +66,14 @@ public void submitToIndex(String uuid, EsSearchManager searchManager) {
state.deleteDocumentsFromIndex(toDelete);
}
}

@Override
public void submitQueryToIndex(String query, EsSearchManager searchManager) {
if (state.closed) {
throw new IllegalStateException("Attempting to use a closed " + this.getClass().getSimpleName());
}

state.searchManager = searchManager;
state.deleteDocumentByQuery(query);
}
}
18 changes: 2 additions & 16 deletions index/src/main/java/org/fao/geonet/index/es/EsRestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,13 @@ public BulkRequest buildDeleteBulkRequest(String index, List<String> deletionUUI
return requestBuilder.build();
}

public void deleteByQuery(String index, String query) throws IOException {
public DeleteByQueryRequest buildDeleteByQuery(String index, String query) {
checkActivated();

DeleteByQueryRequest request = DeleteByQueryRequest.of(
return DeleteByQueryRequest.of(
b -> b.index(index)
.q(query)
.refresh(true));

final DeleteByQueryResponse deleteByQueryResponse =
client.deleteByQuery(request);


if (!deleteByQueryResponse.failures().isEmpty()) {
StringBuilder stringBuilder = new StringBuilder();

deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString()));

throw new IOException(String.format(
"Error during removal. Errors are '%s'.", stringBuilder.toString()
));
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.util.BinaryData;
import co.elastic.clients.util.ContentType;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -224,11 +221,11 @@ public void deleteFeatures(String url, String typeName, EsRestClient client) {
new Object[]{url, typeName, index, indexType});
try {
long begin = System.currentTimeMillis();
client.deleteByQuery(index, String.format("+featureTypeId:\"%s\"", getIdentifier(url, typeName)));
deleteByQuery(String.format("+featureTypeId:\"%s\"", getIdentifier(url, typeName)));
LOGGER.info(" Features deleted in {} ms.", System.currentTimeMillis() - begin);

begin = System.currentTimeMillis();
client.deleteByQuery(index, String.format("+id:\"%s\"",
deleteByQuery(String.format("+id:\"%s\"",
getIdentifier(url, typeName)));
LOGGER.info(" Report deleted in {} ms.", System.currentTimeMillis() - begin);

Expand All @@ -238,6 +235,21 @@ public void deleteFeatures(String url, String typeName, EsRestClient client) {
}
}

private void deleteByQuery(String query) throws IOException {
DeleteByQueryRequest deleteByQueryRequest = client.buildDeleteByQuery(index, query);
DeleteByQueryResponse deleteByQueryResponse = client.getClient().deleteByQuery(deleteByQueryRequest);
if (!deleteByQueryResponse.failures().isEmpty()) {
StringBuilder stringBuilder = new StringBuilder();

deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString()));

throw new RuntimeException(String.format(
"Error during removal of query %s. Errors are '%s'.", query, stringBuilder.toString()
));
}

}

interface TitleResolver {
void setTitle(ObjectNode objectNode, SimpleFeature simpleFeature);
}
Expand Down

0 comments on commit 61fb7ad

Please sign in to comment.