Skip to content

Commit

Permalink
Merge branch 'master' into new_offset_committing_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m authored Aug 21, 2024
2 parents 5db6818 + 8057c70 commit 1250931
Show file tree
Hide file tree
Showing 29 changed files with 1,298 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;

@OneSourceRetransmission
public class OfflineRetransmissionRequest {

private static final List<DateTimeFormatter> formatters = List.of(
Expand All @@ -24,7 +27,7 @@ public class OfflineRetransmissionRequest {
);
private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class);

@NotEmpty
private final String sourceViewPath;
private final String sourceTopic;
@NotEmpty
private final String targetTopic;
Expand All @@ -35,10 +38,12 @@ public class OfflineRetransmissionRequest {

@JsonCreator
public OfflineRetransmissionRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
Expand All @@ -62,8 +67,12 @@ private Instant initializeTimestamp(String timestamp) {
return null;
}

public String getSourceTopic() {
return sourceTopic;
public Optional<String> getSourceViewPath() {
return Optional.ofNullable(sourceViewPath);
}

public Optional<String> getSourceTopic() {
return Optional.ofNullable(sourceTopic);
}

public String getTargetTopic() {
Expand All @@ -84,6 +93,7 @@ public Instant getEndTimestamp() {
public String toString() {
return "OfflineRetransmissionRequest{"
+ "sourceTopic='" + sourceTopic + '\''
+ ", sourceViewPath='" + sourceViewPath + '\''
+ ", targetTopic='" + targetTopic + '\''
+ ", startTimestamp=" + startTimestamp
+ ", endTimestamp=" + endTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.util.Optional;

public class OfflineRetransmissionTask {
private final String taskId;
Expand All @@ -16,12 +17,14 @@ public class OfflineRetransmissionTask {
@JsonCreator
public OfflineRetransmissionTask(
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") Instant startTimestamp,
@JsonProperty("endTimestamp") Instant endTimestamp,
@JsonProperty("createdAt") Instant createdAt) {
this(taskId, new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
targetTopic,
startTimestamp.toString(),
Expand All @@ -39,10 +42,14 @@ public String getTaskId() {
return taskId;
}

public String getSourceTopic() {
public Optional<String> getSourceTopic() {
return request.getSourceTopic();
}

public Optional<String> getSourceViewPath() {
return request.getSourceViewPath();
}

public String getTargetTopic() {
return request.getTargetTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static TopicName fromQualifiedName(String qualifiedName) {

int index = qualifiedName.lastIndexOf(GROUP_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Missing group");
throw new IllegalArgumentException("Invalid qualified name " + qualifiedName);
}

String groupName = qualifiedName.substring(0, index);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.Constraint;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({TYPE})
@Constraint(validatedBy = OneSourceRetransmissionValidator.class)
public @interface OneSourceRetransmission {
String message() default "must contain one defined source of retransmission data - source topic or source view";

Class[] groups() default {};

Class[] payload() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;

public class OneSourceRetransmissionValidator implements ConstraintValidator<OneSourceRetransmission, OfflineRetransmissionRequest> {

public static final String EMPTY_STRING = "";

@Override
public boolean isValid(OfflineRetransmissionRequest offlineRetransmissionRequest, ConstraintValidatorContext context) {
var sourceViewPath = offlineRetransmissionRequest.getSourceViewPath();
var sourceTopic = offlineRetransmissionRequest.getSourceTopic();

return (nonBlank(sourceViewPath.orElse(EMPTY_STRING)) && sourceTopic.isEmpty())
|| (nonBlank(sourceTopic.orElse(EMPTY_STRING)) && sourceViewPath.isEmpty());
}

private static boolean nonBlank(String value) {
return value != null && !value.isBlank();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.allegro.tech.hermes.api.constraints


import jakarta.validation.ConstraintValidatorContext
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest
import spock.lang.Specification

class OneSourceRetransmissionValidatorTest extends Specification {

OneSourceRetransmissionValidator validator = new OneSourceRetransmissionValidator()
ConstraintValidatorContext mockContext = Mock()

def "Validator should validate retransmission request when sourceViewPath is '#sourceViewPath' and sourceTopic is '#sourceTopic'"() {
given:
def request = new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
"someTargetTopic",
"2024-07-08T12:00:00",
"2024-07-08T13:00:00"
)
expect:
validator.isValid(request, mockContext) == isValid

where:
sourceViewPath | sourceTopic | isValid
null | "testTopic" | true
"testView" | null | true
null | null | false
"testView" | "testTopic" | false
"" | "" | false
" " | " " | false
"" | "testTopic" | false
"testView" | " " | false
}

}
26 changes: 26 additions & 0 deletions hermes-console/json-server/db.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@
]
}
],
"inconsistentGroups3":[
{
"name": "pl.allegro.public.group",
"inconsistentMetadata": [],
"inconsistentTopics": [
{
"name": "pl.allegro.public.group.DummyEvent",
"inconsistentMetadata": [],
"inconsistentSubscriptions": [
{
"name": "pl.allegro.public.group.DummyEvent$foobar-service",
"inconsistentMetadata": [
{
"datacenter": "DC1"
},
{
"datacenter": "DC2",
"content": "{\n \"id\": \"foobar-service\",\n \"topicName\": \"pl.allegro.public.group.DummyEvent\",\n \"name\": \"foobar-service\",\n \"endpoint\": \"service://foobar-service/events/dummy-event\",\n \"state\": \"ACTIVE\",\n \"description\": \"Test Hermes endpoint\",\n \"subscriptionPolicy\": {\n \"rate\": 10,\n \"messageTtl\": 60,\n \"messageBackoff\": 100,\n \"requestTimeout\": 1000,\n \"socketTimeout\": 0,\n \"sendingDelay\": 0,\n \"backoffMultiplier\": 1.0,\n \"backoffMaxIntervalInSec\": 600,\n \"retryClientErrors\": true,\n \"backoffMaxIntervalMillis\": 600000\n },\n \"trackingEnabled\": false,\n \"trackingMode\": \"trackingOff\",\n \"owner\": {\n \"source\": \"Service Catalog\",\n \"id\": \"42\"\n },\n \"monitoringDetails\": {\n \"severity\": \"NON_IMPORTANT\",\n \"reaction\": \"\"\n },\n \"contentType\": \"JSON\",\n \"deliveryType\": \"SERIAL\",\n \"filters\": [\n {\n \"type\": \"avropath\",\n \"path\": \"foobar\",\n \"matcher\": \"^FOO_BAR$|^BAZ_BAR$\",\n \"matchingStrategy\": \"any\"\n },\n {\n \"type\": \"avropath\",\n \"path\": \".foo.bar.baz\",\n \"matcher\": \"true\",\n \"matchingStrategy\": \"all\"\n }\n ],\n \"mode\": \"ANYCAST\",\n \"headers\": [\n {\n \"name\": \"X-My-Header\",\n \"value\": \"boobar\"\n },\n {\n \"name\": \"X-Another-Header\",\n \"value\": \"foobar\"\n }\n ],\n \"endpointAddressResolverMetadata\": {\n \"additionalMetadata\": false,\n \"nonSupportedProperty\": 2\n },\n \"http2Enabled\": false,\n \"subscriptionIdentityHeadersEnabled\": false,\n \"autoDeleteWithTopicEnabled\": false,\n \"createdAt\": 1579507131.238,\n \"modifiedAt\": 1672140855.813\n}"
}
]
}
]
}
]
}
],
"topicNames": [
"pl.allegro.public.offer.product.ProductEventV1",
"pl.allegro.public.offer.product.ProductEventV2",
Expand Down
1 change: 1 addition & 0 deletions hermes-console/json-server/routes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"/consistency/groups": "/consistencyGroups",
"/consistency/inconsistencies/groups?groupNames=pl.allegro.public.offer*": "/inconsistentGroups",
"/consistency/inconsistencies/groups?groupNames=pl.allegro.public.group2*": "/inconsistentGroups2",
"/consistency/inconsistencies/groups?groupNames=pl.allegro.public.group": "/inconsistentGroups3",
"/groups": "/groups",
"/owners/sources/Service%20Catalog/:id": "/topicsOwners/:id",
"/readiness/datacenters": "/readinessDatacenters",
Expand Down
16 changes: 16 additions & 0 deletions hermes-console/json-server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ server.put(
},
);

server.post(
'/consistency/sync/topics/pl.allegro.public.group.DummyEvent/subscriptions/barbaz-service*',
(req, res) => {
res.sendStatus(200);
},
);

server.post(
'/consistency/sync/topics/pl.allegro.public.group.DummyEvent*',
(req, res) => {
res.status(404).jsonp({
message: 'Group pl.allegro.public.group not found',
});
},
);

server.post('/filters/:topic', (req, res) => {
res.jsonp(filterDebug);
});
Expand Down
42 changes: 42 additions & 0 deletions hermes-console/src/api/hermes-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,45 @@ export function verifyFilters(
},
);
}

export function syncGroup(
groupName: string,
primaryDatacenter: string,
): ResponsePromise<void> {
return axios.post<void>(`/consistency/sync/groups/${groupName}`, null, {
params: {
primaryDatacenter: primaryDatacenter,
},
});
}

export function syncTopic(
topicQualifiedName: string,
primaryDatacenter: string,
): ResponsePromise<void> {
return axios.post<void>(
`/consistency/sync/topics/${topicQualifiedName}`,
null,
{
params: {
primaryDatacenter: primaryDatacenter,
},
},
);
}

export function syncSubscription(
topicQualifiedName: string,
subscriptionName: string,
primaryDatacenter: string,
): ResponsePromise<void> {
return axios.post<void>(
`/consistency/sync/topics/${topicQualifiedName}/subscriptions/${subscriptionName}`,
null,
{
params: {
primaryDatacenter: primaryDatacenter,
},
},
);
}
Loading

0 comments on commit 1250931

Please sign in to comment.