Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offline retransmission extension with view parameter #1881

Merged
merged 33 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9b71ab8
SPAW-942 implementation draft
bartekdrobczyk Jul 5, 2024
6dcb847
SPAW-942 test coverage & cleanup
bartekdrobczyk Jul 9, 2024
f8c414d
SPAW-942 test coverage & cleanup
bartekdrobczyk Jul 11, 2024
b3e3951
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Jul 15, 2024
cac7b3d
commit msg adjustment
bartekdrobczyk Jul 5, 2024
32fe56d
test coverage & cleanup
bartekdrobczyk Jul 9, 2024
028aaba
test coverage & cleanup
bartekdrobczyk Jul 11, 2024
4d8ee20
Merge remote-tracking branch 'origin/SPAW-942_hermes_contribution' in…
bartekdrobczyk Jul 15, 2024
c42f1af
checkstyle
bartekdrobczyk Jul 15, 2024
faabfd2
retransmission view name refactor
bartekdrobczyk Jul 16, 2024
567df51
pr adjustments
bartekdrobczyk Jul 24, 2024
315b51f
checkstyle
bartekdrobczyk Jul 24, 2024
df425ae
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Jul 24, 2024
0a49ec6
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Jul 29, 2024
181b183
jdk8Module registered in ObjectMapper
bartekdrobczyk Jul 30, 2024
dfda566
validation check fix in shouldReturnClientErrorWhenRequestingRetransm…
adamizydorczyk-allegro Jul 30, 2024
0ef11b0
validation adjustment
bartekdrobczyk Jul 31, 2024
2417871
validation adjustment
bartekdrobczyk Jul 31, 2024
0463929
validation testing
bartekdrobczyk Jul 31, 2024
fba4255
validation testing
bartekdrobczyk Jul 31, 2024
857e778
validation testing
bartekdrobczyk Jul 31, 2024
a7440e4
validation testing
bartekdrobczyk Jul 31, 2024
844d9ba
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Aug 5, 2024
e9a4ef0
Merge remote-tracking branch 'origin/SPAW-942_hermes_contribution' in…
bartekdrobczyk Aug 7, 2024
77ef546
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Aug 7, 2024
d3553fe
Merge remote-tracking branch 'origin/SPAW-942_hermes_contribution' in…
bartekdrobczyk Aug 7, 2024
3668781
pr adjustments
bartekdrobczyk Aug 7, 2024
b3d9b46
checkstyle
bartekdrobczyk Aug 7, 2024
ca62a7e
Merge branch 'master' into SPAW-942_hermes_contribution
bartekdrobczyk Aug 13, 2024
a9b3a47
Update hermes-management/src/main/java/pl/allegro/tech/hermes/managem…
bartekdrobczyk Aug 13, 2024
8d83cdf
permission method better naming
bartekdrobczyk Aug 13, 2024
c30963f
Merge remote-tracking branch 'origin/SPAW-942_hermes_contribution' in…
bartekdrobczyk Aug 13, 2024
75dbbc5
Merge branch 'master' into SPAW-942_hermes_contribution
faderskd Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;

@OneSourceRetransmission
public class OfflineRetransmissionRequest {

private static final List<DateTimeFormatter> formatters = List.of(
Expand All @@ -24,7 +27,7 @@ public class OfflineRetransmissionRequest {
);
private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class);

@NotEmpty
private final String sourceViewPath;
faderskd marked this conversation as resolved.
Show resolved Hide resolved
private final String sourceTopic;
@NotEmpty
private final String targetTopic;
Expand All @@ -35,10 +38,12 @@ public class OfflineRetransmissionRequest {

@JsonCreator
public OfflineRetransmissionRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
Expand All @@ -62,8 +67,12 @@ private Instant initializeTimestamp(String timestamp) {
return null;
}

public String getSourceTopic() {
return sourceTopic;
public Optional<String> getSourceViewPath() {
return Optional.ofNullable(sourceViewPath);
}

public Optional<String> getSourceTopic() {
return Optional.ofNullable(sourceTopic);
}

public String getTargetTopic() {
Expand All @@ -84,6 +93,7 @@ public Instant getEndTimestamp() {
public String toString() {
return "OfflineRetransmissionRequest{"
+ "sourceTopic='" + sourceTopic + '\''
+ ", sourceViewPath='" + sourceViewPath + '\''
+ ", targetTopic='" + targetTopic + '\''
+ ", startTimestamp=" + startTimestamp
+ ", endTimestamp=" + endTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.util.Optional;

public class OfflineRetransmissionTask {
private final String taskId;
Expand All @@ -16,12 +17,14 @@ public class OfflineRetransmissionTask {
@JsonCreator
public OfflineRetransmissionTask(
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") Instant startTimestamp,
@JsonProperty("endTimestamp") Instant endTimestamp,
@JsonProperty("createdAt") Instant createdAt) {
this(taskId, new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
targetTopic,
startTimestamp.toString(),
Expand All @@ -39,10 +42,14 @@ public String getTaskId() {
return taskId;
}

public String getSourceTopic() {
public Optional<String> getSourceTopic() {
return request.getSourceTopic();
}

public Optional<String> getSourceViewPath() {
return request.getSourceViewPath();
}

public String getTargetTopic() {
return request.getTargetTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static TopicName fromQualifiedName(String qualifiedName) {

int index = qualifiedName.lastIndexOf(GROUP_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Missing group");
throw new IllegalArgumentException("Invalid qualified name " + qualifiedName);
}

String groupName = qualifiedName.substring(0, index);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.Constraint;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({TYPE})
@Constraint(validatedBy = OneSourceRetransmissionValidator.class)
public @interface OneSourceRetransmission {
String message() default "must contain one defined source of retransmission data - source topic or source view";

Class[] groups() default {};

Class[] payload() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;

public class OneSourceRetransmissionValidator implements ConstraintValidator<OneSourceRetransmission, OfflineRetransmissionRequest> {

public static final String EMPTY_STRING = "";

@Override
public boolean isValid(OfflineRetransmissionRequest offlineRetransmissionRequest, ConstraintValidatorContext context) {
var sourceViewPath = offlineRetransmissionRequest.getSourceViewPath();
var sourceTopic = offlineRetransmissionRequest.getSourceTopic();

return (nonBlank(sourceViewPath.orElse(EMPTY_STRING)) && sourceTopic.isEmpty())
|| (nonBlank(sourceTopic.orElse(EMPTY_STRING)) && sourceViewPath.isEmpty());
}

private static boolean nonBlank(String value) {
return value != null && !value.isBlank();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.allegro.tech.hermes.api.constraints


import jakarta.validation.ConstraintValidatorContext
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest
import spock.lang.Specification

class OneSourceRetransmissionValidatorTest extends Specification {

OneSourceRetransmissionValidator validator = new OneSourceRetransmissionValidator()
ConstraintValidatorContext mockContext = Mock()

def "Validator should validate retransmission request when sourceViewPath is '#sourceViewPath' and sourceTopic is '#sourceTopic'"() {
given:
def request = new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
"someTargetTopic",
"2024-07-08T12:00:00",
"2024-07-08T13:00:00"
)
expect:
validator.isValid(request, mockContext) == isValid

where:
sourceViewPath | sourceTopic | isValid
null | "testTopic" | true
"testView" | null | true
null | null | false
"testView" | "testTopic" | false
"" | "" | false
" " | " " | false
"" | "testTopic" | false
"testView" | " " | false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;
import pl.allegro.tech.hermes.api.OfflineRetransmissionTask;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.management.api.auth.ManagementRights;
import pl.allegro.tech.hermes.management.domain.PermissionDeniedException;
import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionService;

import java.util.List;
import java.util.Optional;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;

Expand All @@ -36,6 +36,7 @@ public class OfflineRetransmissionEndpoint {
private final OfflineRetransmissionService retransmissionService;
private final RetransmissionPermissions permissions;
private final OfflineRetransmissionAuditor auditor;
private final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionEndpoint.class);

public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmissionService,
TopicRepository topicRepository, ManagementRights managementRights) {
Expand All @@ -47,9 +48,10 @@ public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmission
@POST
@Consumes(APPLICATION_JSON)
public Response createRetransmissionTask(@Valid OfflineRetransmissionRequest request, @Context ContainerRequestContext requestContext) {
logger.info("Offline retransmission request: {}", request);
retransmissionService.validateRequest(request);
permissions.ensurePermissionsToBothTopics(request, requestContext);
OfflineRetransmissionTask task = retransmissionService.createTask(request);
var task = retransmissionService.createTask(request);
auditor.auditRetransmissionCreation(request, requestContext, task);
return Response.status(Response.Status.CREATED).build();
}
Expand All @@ -68,24 +70,30 @@ public Response deleteRetransmissionTask(@PathParam("taskId") String taskId) {
}

private static class RetransmissionPermissions {
private final Logger logger = LoggerFactory.getLogger(RetransmissionPermissions.class);
private final TopicRepository topicRepository;
private final ManagementRights managementRights;


private RetransmissionPermissions(TopicRepository topicRepository, ManagementRights managementRights) {
this.topicRepository = topicRepository;
this.managementRights = managementRights;
}

private void ensurePermissionsToBothTopics(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) {
bartekdrobczyk marked this conversation as resolved.
Show resolved Hide resolved
Topic sourceTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getSourceTopic()));
Topic targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
boolean hasPermissions = managementRights.isUserAllowedToManageTopic(sourceTopic, requestContext)
&& managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
var targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
var hasPermissions = validateSourceTopic(request.getSourceTopic(), requestContext) && managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
if (!hasPermissions) {
logger.info("User {} has no permissions to make retransmission {}", requestContext.getSecurityContext().getUserPrincipal(), request);
bartekdrobczyk marked this conversation as resolved.
Show resolved Hide resolved
throw new PermissionDeniedException("User needs permissions to source and target topics.");
}
}

private boolean validateSourceTopic(Optional<String> sourceTopic, ContainerRequestContext requestContext) {
return sourceTopic.isEmpty() || managementRights.isUserAllowedToManageTopic(
topicRepository.getTopicDetails(TopicName.fromQualifiedName(sourceTopic.get())),
requestContext
);
}
}

private static class OfflineRetransmissionAuditor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -42,7 +43,7 @@ public ObjectMapper objectMapper() {
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.disable(SerializationFeature.WRITE_NULL_MAP_VALUES);
mapper.registerModule(new JavaTimeModule());
mapper.registerModules(new JavaTimeModule(), new Jdk8Module()); // Jdk8Module is required for Jackson to serialize & deserialize Optional type

final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues.Std().addValue(
Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public OfflineRetransmissionService(OfflineRetransmissionRepository offlineRetra
}

public void validateRequest(OfflineRetransmissionRequest request) {
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic());
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic().orElse(null));
TopicName targetTopicName = TopicName.fromQualifiedName(request.getTargetTopic());

ensureTopicsExist(sourceTopicName, targetTopicName);
Expand Down Expand Up @@ -49,12 +49,11 @@ public void deleteTask(String taskId) {
}

private void ensureTopicsExist(TopicName sourceTopicName, TopicName targetTopicName) {
boolean sourceTopicExists = topicRepository.topicExists(sourceTopicName);
boolean targetTopicExists = topicRepository.topicExists(targetTopicName);
if (!sourceTopicExists) {
if (sourceTopicName != null && !topicRepository.topicExists(sourceTopicName)) {
throw new OfflineRetransmissionValidationException("Source topic does not exist");
}
if (!targetTopicExists) {

if (!topicRepository.topicExists(targetTopicName)) {
throw new OfflineRetransmissionValidationException("Target topic does not exist");
}
}
Expand Down
Loading
Loading