-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/550 queue splitter #557
Changes from 16 commits
dffde37
761da46
2c28f27
c385935
12b48f1
57c3987
a834584
1446d05
9ab5c6f
46ffb4c
bf32a9c
3739ea3
00b1b03
4a4eecd
7f30b4b
ba3306b
4f6d4c9
d393920
318c2ae
c6597cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{ | ||
"queue-static-split": { | ||
"description": "Simple splitter with request header", | ||
"postfixFromStatic": [ | ||
"A", | ||
"B", | ||
"C", | ||
"D" | ||
] | ||
}, | ||
"queue-header-[a-z]+": { | ||
"description": "Simple splitter with request header", | ||
"postfixDelimiter": "+", | ||
"postfixFromRequest": { | ||
"header": "x-rp-deviceid" | ||
} | ||
}, | ||
"queue-path-[a-z]+": { | ||
"description": "Simple splitter with request url matching", | ||
"postfixDelimiter": "_", | ||
"postfixFromRequest": { | ||
"url": ".*/path1/(.*)/.*" | ||
} | ||
}, | ||
"queue-header-and-path-[a-z]+": { | ||
"description": "Simple splitter with request header and url matching", | ||
"postfixDelimiter": "_", | ||
"postfixFromRequest": { | ||
"header": "x-rp-deviceid", | ||
"url": ".*/path1/(.*)/.*" | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package org.swisspush.gateleen.queue.queuing.splitter; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.core.Promise; | ||
import io.vertx.core.http.HttpServerRequest; | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public class NoOpQueueSplitter implements QueueSplitter { | ||
|
||
@Override | ||
public Future<Void> initialize() { | ||
Promise<Void> promise = Promise.promise(); | ||
promise.complete(); | ||
return promise.future(); | ||
} | ||
|
||
@Override | ||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public String convertToSubQueue(String queue, HttpServerRequest request) { | ||
return queue; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package org.swisspush.gateleen.queue.queuing.splitter; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.core.http.HttpServerRequest; | ||
|
||
/** | ||
* Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String, HttpServerRequest)} | ||
* evaluates the convert of the queue name in a sub-queue name. | ||
* | ||
* @author https://github.com/gcastaldi [Giannandrea Castaldi] | ||
*/ | ||
public interface QueueSplitter { | ||
|
||
public Future<Void> initialize(); | ||
|
||
/** | ||
* Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name. | ||
* | ||
* @param queue | ||
* @param request | ||
* @return sub-queue name | ||
*/ | ||
String convertToSubQueue(String queue, HttpServerRequest request); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package org.swisspush.gateleen.queue.queuing.splitter; | ||
|
||
import javax.annotation.Nullable; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Container holding configuration values for {@link QueueSplitterImpl} identified | ||
* by a queue pattern. | ||
* | ||
* @author https://github.com/gcastaldi [Giannandrea Castaldi] | ||
*/ | ||
public class QueueSplitterConfiguration { | ||
|
||
private final Pattern queue; | ||
|
||
private final String postfixDelimiter; | ||
|
||
@Nullable | ||
private final List<String> postfixFromStatic; | ||
|
||
@Nullable | ||
private final String postfixFromHeader; | ||
|
||
@Nullable | ||
private final Pattern postfixFromUrl; | ||
|
||
|
||
public QueueSplitterConfiguration( | ||
Pattern queue, | ||
String postfixDelimiter, | ||
@Nullable List<String> postfixFromStatic, | ||
@Nullable String postfixFromHeader, | ||
@Nullable Pattern postfixFromUrl) { | ||
this.queue = queue; | ||
this.postfixDelimiter = postfixDelimiter; | ||
this.postfixFromStatic = postfixFromStatic; | ||
this.postfixFromHeader = postfixFromHeader; | ||
this.postfixFromUrl = postfixFromUrl; | ||
} | ||
|
||
public Pattern getQueue() { | ||
return queue; | ||
} | ||
|
||
public String getPostfixDelimiter() { | ||
return postfixDelimiter; | ||
} | ||
|
||
@Nullable | ||
public List<String> getPostfixFromStatic() { | ||
return postfixFromStatic; | ||
} | ||
|
||
@Nullable | ||
public String getPostfixFromHeader() { | ||
return postfixFromHeader; | ||
} | ||
|
||
@Nullable | ||
public Pattern getPostfixFromUrl() { | ||
return postfixFromUrl; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
QueueSplitterConfiguration that = (QueueSplitterConfiguration) o; | ||
Check warning on line 70 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java Codecov / codecov/patchgateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L70
|
||
return Objects.equals(queue, that.queue) && | ||
Objects.equals(postfixDelimiter, that.postfixDelimiter) && | ||
Objects.equals(postfixFromStatic, that.postfixFromStatic) && | ||
Objects.equals(postfixFromHeader, that.postfixFromHeader) && | ||
Objects.equals(postfixFromUrl, that.postfixFromUrl); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(queue, postfixDelimiter, postfixFromStatic, postfixFromHeader, postfixFromUrl); | ||
Check warning on line 80 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java Codecov / codecov/patchgateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L80
|
||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "QueueSplitterConfiguration{" + | ||
Check warning on line 85 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java Codecov / codecov/patchgateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L85
|
||
"queue=" + queue + | ||
", postfixDelimiter='" + postfixDelimiter + '\'' + | ||
", postfixFromStatic=" + postfixFromStatic + | ||
", postfixFromHeader='" + postfixFromHeader + '\'' + | ||
", postfixFromUrl='" + postfixFromUrl + '\'' + | ||
'}'; | ||
} | ||
|
||
public boolean isSplitStatic() { | ||
return postfixFromStatic != null && !postfixFromStatic.isEmpty(); | ||
} | ||
|
||
public boolean isSplitFromRequest() { | ||
return postfixFromHeader != null || postfixFromUrl != null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package org.swisspush.gateleen.queue.queuing.splitter; | ||
|
||
import io.vertx.core.buffer.Buffer; | ||
import io.vertx.core.json.JsonArray; | ||
import io.vertx.core.json.JsonObject; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.swisspush.gateleen.core.util.StringUtils; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.regex.Pattern; | ||
import java.util.regex.PatternSyntaxException; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Parses the splitter configuration resource in to a list of {@link QueueSplitterConfiguration} | ||
* | ||
* @author https://github.com/gcastaldi [Giannandrea Castaldi] | ||
*/ | ||
public class QueueSplitterConfigurationParser { | ||
Check warning on line 23 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java Codecov / codecov/patchgateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java#L23
|
||
|
||
private static final Logger log = LoggerFactory.getLogger(QueueSplitterConfigurationParser.class); | ||
public static final String POSTFIX_FROM_STATIC_KEY = "postfixFromStatic"; | ||
public static final String POSTFIX_FROM_REQUEST_KEY = "postfixFromRequest"; | ||
public static final String POSTFIX_FROM_HEADER_KEY = "header"; | ||
public static final String POSTFIX_FROM_URL_KEY = "url"; | ||
public static final String POSTFIX_DELIMITER_KEY = "postfixDelimiter"; | ||
public static final String DEFAULT_POSTFIX_DELIMITER = "-"; | ||
|
||
static List<QueueSplitterConfiguration> parse(Buffer configurationResourceBuffer, Map<String, Object> properties) { | ||
|
||
JsonObject config; | ||
List<QueueSplitterConfiguration> queueSplitterConfigurations = new ArrayList<>(); | ||
try { | ||
String resolvedConfiguration = StringUtils.replaceWildcardConfigs( | ||
configurationResourceBuffer.toString(StandardCharsets.UTF_8), | ||
properties | ||
); | ||
config = new JsonObject(Buffer.buffer(resolvedConfiguration)); | ||
} catch (Exception ex) { | ||
log.warn("Could not replace wildcards with environment properties for queue splitter configurations or json invalid. Here the reason: {}", | ||
ex.getMessage()); | ||
return queueSplitterConfigurations; | ||
} | ||
|
||
for (String queuePattern : config.fieldNames()) { | ||
try { | ||
Pattern pattern = Pattern.compile(queuePattern); | ||
JsonObject queueConfig = config.getJsonObject(queuePattern); | ||
JsonArray postfixFromStatic = queueConfig.getJsonArray(POSTFIX_FROM_STATIC_KEY); | ||
if (postfixFromStatic != null) { | ||
List<String> staticPostfixes = postfixFromStatic.stream().map(Object::toString).collect(Collectors.toList()); | ||
queueSplitterConfigurations.add(new QueueSplitterConfiguration( | ||
pattern, | ||
queueConfig.getString("postfixDelimiter", DEFAULT_POSTFIX_DELIMITER), | ||
staticPostfixes, | ||
null, | ||
null | ||
)); | ||
} else { | ||
JsonObject postfixFromRequest = queueConfig.getJsonObject(POSTFIX_FROM_REQUEST_KEY); | ||
String postfixFromHeader = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_HEADER_KEY) : null; | ||
String postfixFromUrl = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_URL_KEY) : null; | ||
if (postfixFromRequest != null && (postfixFromHeader != null || postfixFromUrl != null)) { | ||
queueSplitterConfigurations.add(new QueueSplitterConfiguration( | ||
pattern, | ||
queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER), | ||
null, | ||
postfixFromHeader, | ||
postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null | ||
)); | ||
} else { | ||
log.warn("Queue splitter configuration without a postfix definition"); | ||
} | ||
} | ||
} catch (PatternSyntaxException patternException) { | ||
log.warn("Queue splitter '{}' is not a valid regex pattern. Discarding this queue splitter configuration", queuePattern); | ||
Check warning on line 80 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java Codecov / codecov/patchgateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java#L79-L80
|
||
} | ||
} | ||
|
||
return queueSplitterConfigurations; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postfixFromRequest != null
can be omitted since you already checkpostfixFromHeader
andpostfixFromUrl