Skip to content

Commit

Permalink
Job Queue (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
azahnen authored Aug 5, 2024
1 parent 10182c1 commit 1ea5ae3
Show file tree
Hide file tree
Showing 89 changed files with 1,450 additions and 59 deletions.
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cron4j = '2.2.5'
rxjava = '3.1.8'
rxjava-ext = '3.1.1'
rs = '1.0.4'
uuid = '5.0.0'

[libraries]
# auth
Expand Down Expand Up @@ -53,6 +54,9 @@ dagger-auto = { module = "io.github.azahnen:dagger-auto", version.ref = "dagger-
#s3
minio = { module = "io.minio:minio", version.ref = "minio" }

#jobs
uuid = { module = "com.fasterxml.uuid:java-uuid-generator", version.ref = "uuid" }

#services
cron4j = { module = "it.sauronsoftware.cron4j:cron4j", version.ref = "cron4j" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface AppContext {

URI getUri();

String getInstanceName();

default boolean isDevEnv() {
return getEnvironment() == ENV.DEVELOPMENT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.dropwizard.jetty.HttpConnectorFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -107,6 +109,11 @@ public AppConfiguration getConfiguration() {
return cfg;
}

@Override
public String getInstanceName() {
return String.format("%s@%s", name, getRealHostName());
}

@Override
public URI getUri() {
return uri;
Expand Down Expand Up @@ -156,13 +163,10 @@ public String init(String[] args, Map<String, ByteSource> baseConfigs) throws IO
String externalUrl = getConfiguration().getServerFactory().getExternalUrl();
if (Strings.isNullOrEmpty(externalUrl)) {
this.uri =
URI.create(String.format("%s://%s:%d", getScheme(), getHostName(), getApplicationPort()));
URI.create(
String.format("%s://%s:%d/", getScheme(), getHostName(), getApplicationPort()));
} else {
String uri =
externalUrl.endsWith("/")
? externalUrl.substring(0, externalUrl.length() - 1)
: externalUrl;
this.uri = URI.create(uri);
this.uri = URI.create(externalUrl.endsWith("/") ? externalUrl : externalUrl + "/");
}

return String.format(
Expand Down Expand Up @@ -365,6 +369,25 @@ private String getHostName() {
.orElse("localhost");
}

private String getRealHostName() {
String host = System.getenv("HOSTNAME");

if (Objects.nonNull(host)) {
return host;
}

try {
String result = InetAddress.getLocalHost().getHostName();
if (!Strings.isNullOrEmpty(result)) {
return result;
}
} catch (UnknownHostException e) {
// failed; try alternate means.
}

return "UNKNOWN";
}

private int getApplicationPort() {
return ((HttpConnectorFactory)
((DefaultServerFactory) getConfiguration().getServerFactory())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public enum MARKER implements MyMarker {
STACKTRACE,
DUMP,
DI,
S3;
S3,
JOBS;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class LoggingConfiguration extends DefaultLoggingFactory {
private boolean configDumps;
private boolean stackTraces;
private boolean wiring;
private boolean jobs;

public LoggingConfiguration() {
super();
Expand All @@ -112,6 +113,7 @@ public LoggingConfiguration() {
this.configDumps = false;
this.stackTraces = false;
this.wiring = false;
this.jobs = false;
}

public List<ILoggingEvent> configure(
Expand Down Expand Up @@ -162,7 +164,8 @@ public void configure(MetricRegistry metricRegistry, String name) {
s3,
configDumps,
stackTraces,
wiring));
wiring,
jobs));
}

@JsonProperty("showThirdPartyLoggers")
Expand Down Expand Up @@ -275,6 +278,16 @@ public void setWiring(boolean wiring) {
this.wiring = wiring;
}

@JsonProperty
public boolean isJobs() {
return jobs;
}

@JsonProperty
public void setJobs(boolean jobs) {
this.jobs = jobs;
}

@JsonProperty("type")
public void setType(String type) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class LoggingFilter extends TurboFilter {
private boolean configDumps;
private boolean stackTraces;
private boolean wiring;
private boolean jobs;

public LoggingFilter(
boolean showThirdPartyLoggers,
Expand All @@ -44,7 +45,8 @@ public LoggingFilter(
boolean s3,
boolean configDumps,
boolean stackTraces,
boolean wiring) {
boolean wiring,
boolean jobs) {
this.showThirdPartyLoggers = showThirdPartyLoggers;
this.apiRequests = apiRequests;
this.apiRequestUsers = apiRequestUsers;
Expand All @@ -56,6 +58,7 @@ public LoggingFilter(
this.configDumps = configDumps;
this.stackTraces = stackTraces;
this.wiring = wiring;
this.jobs = jobs;
}

@Override
Expand Down Expand Up @@ -103,6 +106,10 @@ public FilterReply decide(
return FilterReply.ACCEPT;
}

if (Objects.equals(marker, MARKER.JOBS)) {
return jobs ? FilterReply.ACCEPT : FilterReply.NEUTRAL;
}

if (Objects.isNull(marker) && (showThirdPartyLoggers || logger.getName().startsWith("de.ii"))) {
return FilterReply.NEUTRAL;
}
Expand Down Expand Up @@ -205,4 +212,12 @@ public boolean isWiring() {
public void setWiring(boolean wiring) {
this.wiring = wiring;
}

public boolean isJobs() {
return jobs;
}

public void setJobs(boolean jobs) {
this.jobs = jobs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class AbstractVolatileComposed extends AbstractVolatile
private final boolean noHealth;
private State baseState;
private boolean ready;
private boolean initialized;

public AbstractVolatileComposed(VolatileRegistry volatileRegistry, String... capabilities) {
this(null, volatileRegistry, false, capabilities);
Expand Down Expand Up @@ -85,6 +86,7 @@ protected Tuple<State, String> volatileInit() {
}

protected void onInitComponentsAvailable() {
this.initialized = true;
Tuple<State, String> result = volatileInit();

if (Objects.nonNull(result.second())) {
Expand Down Expand Up @@ -231,7 +233,9 @@ private void checkStates() {
return;
}
if (baseState != State.AVAILABLE) {
if (ready && initComponents.values().stream().allMatch(Volatile2::isAvailable)) {
if (ready
&& !initialized
&& initComponents.values().stream().allMatch(Volatile2::isAvailable)) {
onInitComponentsAvailable();
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void delete(Path path) throws IOException {
LOGGER.trace("Deleting blob at {}", filePath);
}

Files.delete(filePath);
Files.deleteIfExists(filePath);
}

// TODO: remote sources might provide readable locals, but never writable ones
Expand All @@ -201,7 +201,7 @@ public Optional<Path> asLocalPath(Path path, boolean writable) throws IOExceptio
}

if (writable || has(path)) {
Path filePath = full(path);
Path filePath = full(path).normalize();

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Providing writable local blob at {}", filePath);
Expand Down
11 changes: 11 additions & 0 deletions xtraplatform-jobs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

maturity = 'MATURE'
maintenance = 'FULL'
description = 'Job queue.'
descriptionDe = 'Job-Queue.'

dependencies {
provided project(':xtraplatform-ops')

embeddedFlat(libs.uuid)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2024 interactive instruments GmbH
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package de.ii.xtraplatform.jobs.app;

import de.ii.xtraplatform.base.domain.AppContext;
import de.ii.xtraplatform.jobs.domain.BaseJob;
import de.ii.xtraplatform.jobs.domain.Job;
import de.ii.xtraplatform.jobs.domain.JobQueue;
import de.ii.xtraplatform.jobs.domain.JobSet;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// @Singleton
// @AutoBind
public class JobQueueRedis implements JobQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueRedis.class);

// @Inject
JobQueueRedis(AppContext appContext) {
// TODO: if enabled, connect with jedis

// TODO: one queue per type, queue has ids, one hash for each id with details

// TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination
// this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to
// retry
}

@Override
public synchronized void push(BaseJob job, boolean untake) {
if (job instanceof Job) {
// TODO: LPUSH xtraplatform:jobs:queue and HMSET xtraplatform:jobs:job:<id>
} else if (job instanceof JobSet) {
// TODO: HMSET xtraplatform:jobs:set:<id>
} else {
throw new IllegalArgumentException("Unknown job type: " + job.getClass());
}
}

@Override
public synchronized Optional<Job> take(String type, String executor) {
// TODO: RPOPLPUSH xtraplatform:jobs:queue xtraplatform:jobs:progress and HMSET
// xtraplatform:jobs:job:<id>

return Optional.empty();
}

@Override
public synchronized boolean done(String jobId) {
// TODO: LREM xtraplatform:jobs:progress 0 <id>

// TODO: if > 0 then HMGET xtraplatform:jobs:job:<id>

// TODO: if partOf then HINCRBY xtraplatform:jobs:set:<partOf> current 1 (if done, mark for
// removal)

// TODO: if followUps then LPUSH xtraplatform:jobs:queue <followUps>

// TODO: HDEL xtraplatform:jobs:job:<id>

return false;
}

@Override
public boolean doneSet(String jobSetId) {
// TODO: HDEL xtraplatform:jobs:set:<id>

return false;
}

@Override
public synchronized boolean error(String jobId, String error, boolean retry) {
// TODO: retry logic
return false;
}

@Override
public Collection<JobSet> getSets() {
return List.of();
}

@Override
public Map<String, Deque<Job>> getOpen() {
return Map.of();
}

@Override
public Collection<Job> getTaken() {
return List.of();
}

@Override
public Collection<Job> getFailed() {
return List.of();
}

@Override
public JobSet getSet(String setId) {
return null;
}
}
Loading

0 comments on commit 1ea5ae3

Please sign in to comment.