diff --git a/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java b/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java index c177200..200bb7a 100644 --- a/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java +++ b/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java @@ -13,10 +13,13 @@ import com.o19s.ubi.utils.UbiUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.action.RestActionListener; import java.util.Collection; import java.util.HashMap; @@ -48,11 +51,12 @@ public class OpenSearchDataManager extends DataManager { /** * Gets a singleton instance of the manager. + * * @param client An OpenSearch {@link Client}. * @return An instance of {@link OpenSearchDataManager}. */ public static OpenSearchDataManager getInstance(Client client) { - if(openSearchEventManager == null) { + if (openSearchEventManager == null) { openSearchEventManager = new OpenSearchDataManager(client); } return openSearchEventManager; @@ -77,8 +81,25 @@ public void processEvents() { .map(event -> new IndexRequest(event.getIndexName()).source(event.getEvent(), XContentType.JSON)) .forEach(eventsBulkRequest::add); - if(eventsBulkRequest.numberOfActions() > 0) { - client.bulk(eventsBulkRequest); + if (eventsBulkRequest.numberOfActions() > 0) { + + LOGGER.trace("UBI bulk indexing " + eventsBulkRequest.numberOfActions() + " events."); + + client.bulk(eventsBulkRequest, new RestActionListener<>(null) { + + @Override + public void processResponse(final BulkResponse bulkResponse) { + + if (bulkResponse.hasFailures()) { + for (final BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { + if (bulkItemResponse.isFailed()) { + LOGGER.warn("Unable to insert UBI event: " + bulkItemResponse.getFailureMessage()); + } + } + } + + } + }); } } catch (Exception ex) { @@ -102,10 +123,24 @@ public void processQueries() { .source(buildQueryRequestMap(queryRequest), XContentType.JSON)) .forEach(queryRequestsBulkRequest::add); - LOGGER.trace("Indexing " + queryRequestsBulkRequest.numberOfActions() + " queries"); + LOGGER.trace("UBI bulk indexing " + queryRequestsBulkRequest.numberOfActions() + " queries."); if (queryRequestsBulkRequest.numberOfActions() > 0) { - client.bulk(queryRequestsBulkRequest); + client.bulk(queryRequestsBulkRequest, new RestActionListener<>(null) { + + @Override + public void processResponse(final BulkResponse bulkResponse) { + + if (bulkResponse.hasFailures()) { + for (final BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { + if (bulkItemResponse.isFailed()) { + LOGGER.warn("Unable to insert UBI query: " + bulkItemResponse.getFailureMessage()); + } + } + } + + } + }); } } catch (Exception ex) { diff --git a/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java index c0873bc..90758c7 100644 --- a/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java +++ b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java @@ -286,7 +286,7 @@ protected void processResponse(AcknowledgedResponse acknowledgedResponse) throws private String setEventTimestamp(final String eventJson) throws JsonProcessingException { - LOGGER.info("event json: " + eventJson); + LOGGER.trace("Received UBI event json: " + eventJson); final JsonNode rootNode = new ObjectMapper().readTree(eventJson);