diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 7651f7a0..4e75f525 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -8,6 +8,7 @@ */ package org.opensearch.jobscheduler.spi.utils; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -77,20 +78,25 @@ public boolean lockIndexExist() { @VisibleForTesting void createLockIndex(ActionListener listener) { - if (lockIndexExist()) { - listener.onResponse(true); - } else { - final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); - client.admin() - .indices() - .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { - if (exception instanceof ResourceAlreadyExistsException - || exception.getCause() instanceof ResourceAlreadyExistsException) { - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + if (lockIndexExist()) { + listener.onResponse(true); + } else { + final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) { } private void updateLock(final LockModel updateLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME) .id(updateLock.getLockId()) .setIfSeqNo(updateLock.getSeqNo()) @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener li } catch (IOException e) { logger.error("IOException occurred updating lock.", e); listener.onResponse(null); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } private void createLock(final LockModel tempLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId()) .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) @@ -243,25 +252,30 @@ private void createLock(final LockModel tempLock, ActionListener list } public void findLock(final String lockId, ActionListener listener) { - GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); - client.get(getRequest, ActionListener.wrap(response -> { - if (!response.isExists()) { - listener.onResponse(null); - } else { - try { - XContentParser parser = XContentType.JSON.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); - parser.nextToken(); - listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); - } catch (IOException e) { - logger.error("IOException occurred finding lock", e); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); + } catch (IOException e) { + logger.error("IOException occurred finding lock", e); + listener.onResponse(null); + } } - } - }, exception -> { - logger.error("Exception occurred finding lock", exception); - listener.onFailure(exception); - })); + }, exception -> { + logger.error("Exception occurred finding lock", exception); + listener.onFailure(exception); + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /** @@ -293,19 +307,24 @@ public void release(final LockModel lock, ActionListener listener) { * or not the delete was successful */ public void deleteLock(final String lockId, ActionListener listener) { - DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); - client.delete(deleteRequest, ActionListener.wrap(response -> { - listener.onResponse( - response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND - ); - }, exception -> { - if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { - logger.debug("Index is not found to delete lock. {}", exception.getMessage()); - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); + client.delete(deleteRequest, ActionListener.wrap(response -> { + listener.onResponse( + response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + }, exception -> { + if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete lock. {}", exception.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /**