Skip to content

Commit

Permalink
Make .opendistro-job-scheduler-lock a System Index
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
cwperks committed Mar 23, 2023
1 parent 4649c3d commit 06e1824
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void createBasicWatcherJob() throws Exception {
String jobId = OpenSearchRestTestCase.randomAlphaOfLength(10);
createWatcherJobJson(jobId, jobParameter);

long actualCount = waitAndCountRecords(index, 100000);
long actualCount = waitAndCountRecords(index, 120000);
Assert.assertEquals(1, actualCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.jobscheduler.spi.utils;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand Down Expand Up @@ -44,7 +45,7 @@

public final class LockService {
private static final Logger logger = LogManager.getLogger(LockService.class);
private static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock";
public static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock";

private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -77,20 +78,25 @@ public boolean lockIndexExist() {

@VisibleForTesting
void createLockIndex(ActionListener<Boolean> listener) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) {
}

private void updateLock(final LockModel updateLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
.id(updateLock.getLockId())
.setIfSeqNo(updateLock.getSeqNo())
Expand Down Expand Up @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener<LockModel> li
} catch (IOException e) {
logger.error("IOException occurred updating lock.", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

private void createLock(final LockModel tempLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId())
.source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand All @@ -240,29 +249,37 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
} catch (IOException e) {
logger.error("IOException occurred creating lock", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

public void findLock(final String lockId, ActionListener<LockModel> listener) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
listener.onResponse(null);
}
}
}
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -294,19 +311,24 @@ public void release(final LockModel lock, ActionListener<Boolean> listener) {
* or not the delete was successful
*/
public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
Expand Down Expand Up @@ -41,6 +42,7 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
Expand All @@ -61,7 +63,7 @@

import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
Expand Down Expand Up @@ -162,6 +164,15 @@ public void onIndexModule(IndexModule indexModule) {
}
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
final SystemIndexDescriptor systemIndexDescriptor = new SystemIndexDescriptor(
LockService.LOCK_INDEX_NAME,
"Job Scheduler Lock index"
);
return Collections.singletonList(systemIndexDescriptor);
}

@Override
public void loadExtensions(ExtensionLoader loader) {

Expand Down

0 comments on commit 06e1824

Please sign in to comment.