Skip to content

Commit

Permalink
Offline retransmission extension with view parameter (#1881)
Browse files Browse the repository at this point in the history
* SPAW-942 implementation draft

* SPAW-942 test coverage & cleanup

* SPAW-942 test coverage & cleanup

* commit msg adjustment

* test coverage & cleanup

* test coverage & cleanup

* checkstyle

* retransmission view name refactor

* pr adjustments

* checkstyle

* jdk8Module registered in ObjectMapper

* validation check fix in shouldReturnClientErrorWhenRequestingRetransmissionWithEmptyData test

* validation adjustment

* validation adjustment

* validation testing

* validation testing

* validation testing

* validation testing

* pr adjustments

* checkstyle

* Update hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java

Co-authored-by: Maciej Moscicki <maciej.moscicki@allegro.com>

* permission method better naming

---------

Co-authored-by: Adam Izydorczyk <adam.izydorczyk@allegro.com>
Co-authored-by: Maciej Moscicki <maciej.moscicki@allegro.com>
Co-authored-by: Daniel Fąderski <faderskd@users.noreply.github.com>
  • Loading branch information
4 people authored Aug 19, 2024
1 parent d863e8f commit af320d6
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;

@OneSourceRetransmission
public class OfflineRetransmissionRequest {

private static final List<DateTimeFormatter> formatters = List.of(
Expand All @@ -24,7 +27,7 @@ public class OfflineRetransmissionRequest {
);
private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class);

@NotEmpty
private final String sourceViewPath;
private final String sourceTopic;
@NotEmpty
private final String targetTopic;
Expand All @@ -35,10 +38,12 @@ public class OfflineRetransmissionRequest {

@JsonCreator
public OfflineRetransmissionRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
Expand All @@ -62,8 +67,12 @@ private Instant initializeTimestamp(String timestamp) {
return null;
}

public String getSourceTopic() {
return sourceTopic;
public Optional<String> getSourceViewPath() {
return Optional.ofNullable(sourceViewPath);
}

public Optional<String> getSourceTopic() {
return Optional.ofNullable(sourceTopic);
}

public String getTargetTopic() {
Expand All @@ -84,6 +93,7 @@ public Instant getEndTimestamp() {
public String toString() {
return "OfflineRetransmissionRequest{"
+ "sourceTopic='" + sourceTopic + '\''
+ ", sourceViewPath='" + sourceViewPath + '\''
+ ", targetTopic='" + targetTopic + '\''
+ ", startTimestamp=" + startTimestamp
+ ", endTimestamp=" + endTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.util.Optional;

public class OfflineRetransmissionTask {
private final String taskId;
Expand All @@ -16,12 +17,14 @@ public class OfflineRetransmissionTask {
@JsonCreator
public OfflineRetransmissionTask(
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") Instant startTimestamp,
@JsonProperty("endTimestamp") Instant endTimestamp,
@JsonProperty("createdAt") Instant createdAt) {
this(taskId, new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
targetTopic,
startTimestamp.toString(),
Expand All @@ -39,10 +42,14 @@ public String getTaskId() {
return taskId;
}

public String getSourceTopic() {
public Optional<String> getSourceTopic() {
return request.getSourceTopic();
}

public Optional<String> getSourceViewPath() {
return request.getSourceViewPath();
}

public String getTargetTopic() {
return request.getTargetTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static TopicName fromQualifiedName(String qualifiedName) {

int index = qualifiedName.lastIndexOf(GROUP_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Missing group");
throw new IllegalArgumentException("Invalid qualified name " + qualifiedName);
}

String groupName = qualifiedName.substring(0, index);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.Constraint;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({TYPE})
@Constraint(validatedBy = OneSourceRetransmissionValidator.class)
public @interface OneSourceRetransmission {
String message() default "must contain one defined source of retransmission data - source topic or source view";

Class[] groups() default {};

Class[] payload() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;

public class OneSourceRetransmissionValidator implements ConstraintValidator<OneSourceRetransmission, OfflineRetransmissionRequest> {

public static final String EMPTY_STRING = "";

@Override
public boolean isValid(OfflineRetransmissionRequest offlineRetransmissionRequest, ConstraintValidatorContext context) {
var sourceViewPath = offlineRetransmissionRequest.getSourceViewPath();
var sourceTopic = offlineRetransmissionRequest.getSourceTopic();

return (nonBlank(sourceViewPath.orElse(EMPTY_STRING)) && sourceTopic.isEmpty())
|| (nonBlank(sourceTopic.orElse(EMPTY_STRING)) && sourceViewPath.isEmpty());
}

private static boolean nonBlank(String value) {
return value != null && !value.isBlank();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.allegro.tech.hermes.api.constraints


import jakarta.validation.ConstraintValidatorContext
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest
import spock.lang.Specification

class OneSourceRetransmissionValidatorTest extends Specification {

OneSourceRetransmissionValidator validator = new OneSourceRetransmissionValidator()
ConstraintValidatorContext mockContext = Mock()

def "Validator should validate retransmission request when sourceViewPath is '#sourceViewPath' and sourceTopic is '#sourceTopic'"() {
given:
def request = new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
"someTargetTopic",
"2024-07-08T12:00:00",
"2024-07-08T13:00:00"
)
expect:
validator.isValid(request, mockContext) == isValid

where:
sourceViewPath | sourceTopic | isValid
null | "testTopic" | true
"testView" | null | true
null | null | false
"testView" | "testTopic" | false
"" | "" | false
" " | " " | false
"" | "testTopic" | false
"testView" | " " | false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;
import pl.allegro.tech.hermes.api.OfflineRetransmissionTask;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.management.api.auth.ManagementRights;
import pl.allegro.tech.hermes.management.domain.PermissionDeniedException;
import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionService;

import java.util.List;
import java.util.Optional;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;

Expand All @@ -36,6 +36,7 @@ public class OfflineRetransmissionEndpoint {
private final OfflineRetransmissionService retransmissionService;
private final RetransmissionPermissions permissions;
private final OfflineRetransmissionAuditor auditor;
private final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionEndpoint.class);

public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmissionService,
TopicRepository topicRepository, ManagementRights managementRights) {
Expand All @@ -47,9 +48,10 @@ public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmission
@POST
@Consumes(APPLICATION_JSON)
public Response createRetransmissionTask(@Valid OfflineRetransmissionRequest request, @Context ContainerRequestContext requestContext) {
logger.info("Offline retransmission request: {}", request);
retransmissionService.validateRequest(request);
permissions.ensurePermissionsToBothTopics(request, requestContext);
OfflineRetransmissionTask task = retransmissionService.createTask(request);
permissions.ensurePermissions(request, requestContext);
var task = retransmissionService.createTask(request);
auditor.auditRetransmissionCreation(request, requestContext, task);
return Response.status(Response.Status.CREATED).build();
}
Expand All @@ -68,24 +70,30 @@ public Response deleteRetransmissionTask(@PathParam("taskId") String taskId) {
}

private static class RetransmissionPermissions {
private final Logger logger = LoggerFactory.getLogger(RetransmissionPermissions.class);
private final TopicRepository topicRepository;
private final ManagementRights managementRights;


private RetransmissionPermissions(TopicRepository topicRepository, ManagementRights managementRights) {
this.topicRepository = topicRepository;
this.managementRights = managementRights;
}

private void ensurePermissionsToBothTopics(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) {
Topic sourceTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getSourceTopic()));
Topic targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
boolean hasPermissions = managementRights.isUserAllowedToManageTopic(sourceTopic, requestContext)
&& managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
private void ensurePermissions(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) {
var targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
var hasPermissions = validateSourceTopic(request.getSourceTopic(), requestContext) && managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
if (!hasPermissions) {
logger.info("User {} has no permissions to make offline retransmission {}", requestContext.getSecurityContext().getUserPrincipal(), request);
throw new PermissionDeniedException("User needs permissions to source and target topics.");
}
}

private boolean validateSourceTopic(Optional<String> sourceTopic, ContainerRequestContext requestContext) {
return sourceTopic.isEmpty() || managementRights.isUserAllowedToManageTopic(
topicRepository.getTopicDetails(TopicName.fromQualifiedName(sourceTopic.get())),
requestContext
);
}
}

private static class OfflineRetransmissionAuditor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -42,7 +43,7 @@ public ObjectMapper objectMapper() {
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.disable(SerializationFeature.WRITE_NULL_MAP_VALUES);
mapper.registerModule(new JavaTimeModule());
mapper.registerModules(new JavaTimeModule(), new Jdk8Module()); // Jdk8Module is required for Jackson to serialize & deserialize Optional type

final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues.Std().addValue(
Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public OfflineRetransmissionService(OfflineRetransmissionRepository offlineRetra
}

public void validateRequest(OfflineRetransmissionRequest request) {
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic());
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic().orElse(null));
TopicName targetTopicName = TopicName.fromQualifiedName(request.getTargetTopic());

ensureTopicsExist(sourceTopicName, targetTopicName);
Expand Down Expand Up @@ -49,12 +49,11 @@ public void deleteTask(String taskId) {
}

private void ensureTopicsExist(TopicName sourceTopicName, TopicName targetTopicName) {
boolean sourceTopicExists = topicRepository.topicExists(sourceTopicName);
boolean targetTopicExists = topicRepository.topicExists(targetTopicName);
if (!sourceTopicExists) {
if (sourceTopicName != null && !topicRepository.topicExists(sourceTopicName)) {
throw new OfflineRetransmissionValidationException("Source topic does not exist");
}
if (!targetTopicExists) {

if (!topicRepository.topicExists(targetTopicName)) {
throw new OfflineRetransmissionValidationException("Target topic does not exist");
}
}
Expand Down
Loading

0 comments on commit af320d6

Please sign in to comment.