Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #147 from o19s/log-failures
Browse files Browse the repository at this point in the history
#146 Logging bulk indexing failures
  • Loading branch information
epugh authored Mar 28, 2024
2 parents 9480b7d + bd9b55f commit b1fe7fa
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
45 changes: 40 additions & 5 deletions src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import com.o19s.ubi.utils.UbiUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.rest.action.RestActionListener;

import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -48,11 +51,12 @@ public class OpenSearchDataManager extends DataManager {

/**
* Gets a singleton instance of the manager.
*
* @param client An OpenSearch {@link Client}.
* @return An instance of {@link OpenSearchDataManager}.
*/
public static OpenSearchDataManager getInstance(Client client) {
if(openSearchEventManager == null) {
if (openSearchEventManager == null) {
openSearchEventManager = new OpenSearchDataManager(client);
}
return openSearchEventManager;
Expand All @@ -77,8 +81,25 @@ public void processEvents() {
.map(event -> new IndexRequest(event.getIndexName()).source(event.getEvent(), XContentType.JSON))
.forEach(eventsBulkRequest::add);

if(eventsBulkRequest.numberOfActions() > 0) {
client.bulk(eventsBulkRequest);
if (eventsBulkRequest.numberOfActions() > 0) {

LOGGER.trace("UBI bulk indexing " + eventsBulkRequest.numberOfActions() + " events.");

client.bulk(eventsBulkRequest, new RestActionListener<>(null) {

@Override
public void processResponse(final BulkResponse bulkResponse) {

if (bulkResponse.hasFailures()) {
for (final BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
LOGGER.warn("Unable to insert UBI event: " + bulkItemResponse.getFailureMessage());
}
}
}

}
});
}

} catch (Exception ex) {
Expand All @@ -102,10 +123,24 @@ public void processQueries() {
.source(buildQueryRequestMap(queryRequest), XContentType.JSON))
.forEach(queryRequestsBulkRequest::add);

LOGGER.trace("Indexing " + queryRequestsBulkRequest.numberOfActions() + " queries");
LOGGER.trace("UBI bulk indexing " + queryRequestsBulkRequest.numberOfActions() + " queries.");

if (queryRequestsBulkRequest.numberOfActions() > 0) {
client.bulk(queryRequestsBulkRequest);
client.bulk(queryRequestsBulkRequest, new RestActionListener<>(null) {

@Override
public void processResponse(final BulkResponse bulkResponse) {

if (bulkResponse.hasFailures()) {
for (final BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
LOGGER.warn("Unable to insert UBI query: " + bulkItemResponse.getFailureMessage());
}
}
}

}
});
}

} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected void processResponse(AcknowledgedResponse acknowledgedResponse) throws

private String setEventTimestamp(final String eventJson) throws JsonProcessingException {

LOGGER.info("event json: " + eventJson);
LOGGER.trace("Received UBI event json: " + eventJson);

final JsonNode rootNode = new ObjectMapper().readTree(eventJson);

Expand Down

0 comments on commit b1fe7fa

Please sign in to comment.