Skip to content

Commit

Permalink
Merge branch 'master' into minor_ui_improvements_august_2024
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m authored Aug 21, 2024
2 parents 1a6b9ed + af320d6 commit 75d83a6
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission;
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;

@OneSourceRetransmission
public class OfflineRetransmissionRequest {

private static final List<DateTimeFormatter> formatters = List.of(
Expand All @@ -24,7 +27,7 @@ public class OfflineRetransmissionRequest {
);
private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class);

@NotEmpty
private final String sourceViewPath;
private final String sourceTopic;
@NotEmpty
private final String targetTopic;
Expand All @@ -35,10 +38,12 @@ public class OfflineRetransmissionRequest {

@JsonCreator
public OfflineRetransmissionRequest(
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") String startTimestamp,
@JsonProperty("endTimestamp") String endTimestamp) {
this.sourceViewPath = sourceViewPath;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.startTimestamp = initializeTimestamp(startTimestamp);
Expand All @@ -62,8 +67,12 @@ private Instant initializeTimestamp(String timestamp) {
return null;
}

public String getSourceTopic() {
return sourceTopic;
public Optional<String> getSourceViewPath() {
return Optional.ofNullable(sourceViewPath);
}

public Optional<String> getSourceTopic() {
return Optional.ofNullable(sourceTopic);
}

public String getTargetTopic() {
Expand All @@ -84,6 +93,7 @@ public Instant getEndTimestamp() {
public String toString() {
return "OfflineRetransmissionRequest{"
+ "sourceTopic='" + sourceTopic + '\''
+ ", sourceViewPath='" + sourceViewPath + '\''
+ ", targetTopic='" + targetTopic + '\''
+ ", startTimestamp=" + startTimestamp
+ ", endTimestamp=" + endTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer;

import java.time.Instant;
import java.util.Optional;

public class OfflineRetransmissionTask {
private final String taskId;
Expand All @@ -16,12 +17,14 @@ public class OfflineRetransmissionTask {
@JsonCreator
public OfflineRetransmissionTask(
@JsonProperty("taskId") String taskId,
@JsonProperty("sourceViewPath") String sourceViewPath,
@JsonProperty("sourceTopic") String sourceTopic,
@JsonProperty("targetTopic") String targetTopic,
@JsonProperty("startTimestamp") Instant startTimestamp,
@JsonProperty("endTimestamp") Instant endTimestamp,
@JsonProperty("createdAt") Instant createdAt) {
this(taskId, new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
targetTopic,
startTimestamp.toString(),
Expand All @@ -39,10 +42,14 @@ public String getTaskId() {
return taskId;
}

public String getSourceTopic() {
public Optional<String> getSourceTopic() {
return request.getSourceTopic();
}

public Optional<String> getSourceViewPath() {
return request.getSourceViewPath();
}

public String getTargetTopic() {
return request.getTargetTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static TopicName fromQualifiedName(String qualifiedName) {

int index = qualifiedName.lastIndexOf(GROUP_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Missing group");
throw new IllegalArgumentException("Invalid qualified name " + qualifiedName);
}

String groupName = qualifiedName.substring(0, index);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.Constraint;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({TYPE})
@Constraint(validatedBy = OneSourceRetransmissionValidator.class)
public @interface OneSourceRetransmission {
String message() default "must contain one defined source of retransmission data - source topic or source view";

Class[] groups() default {};

Class[] payload() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.allegro.tech.hermes.api.constraints;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;

public class OneSourceRetransmissionValidator implements ConstraintValidator<OneSourceRetransmission, OfflineRetransmissionRequest> {

public static final String EMPTY_STRING = "";

@Override
public boolean isValid(OfflineRetransmissionRequest offlineRetransmissionRequest, ConstraintValidatorContext context) {
var sourceViewPath = offlineRetransmissionRequest.getSourceViewPath();
var sourceTopic = offlineRetransmissionRequest.getSourceTopic();

return (nonBlank(sourceViewPath.orElse(EMPTY_STRING)) && sourceTopic.isEmpty())
|| (nonBlank(sourceTopic.orElse(EMPTY_STRING)) && sourceViewPath.isEmpty());
}

private static boolean nonBlank(String value) {
return value != null && !value.isBlank();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.allegro.tech.hermes.api.constraints


import jakarta.validation.ConstraintValidatorContext
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest
import spock.lang.Specification

class OneSourceRetransmissionValidatorTest extends Specification {

OneSourceRetransmissionValidator validator = new OneSourceRetransmissionValidator()
ConstraintValidatorContext mockContext = Mock()

def "Validator should validate retransmission request when sourceViewPath is '#sourceViewPath' and sourceTopic is '#sourceTopic'"() {
given:
def request = new OfflineRetransmissionRequest(
sourceViewPath,
sourceTopic,
"someTargetTopic",
"2024-07-08T12:00:00",
"2024-07-08T13:00:00"
)
expect:
validator.isValid(request, mockContext) == isValid

where:
sourceViewPath | sourceTopic | isValid
null | "testTopic" | true
"testView" | null | true
null | null | false
"testView" | "testTopic" | false
"" | "" | false
" " | " " | false
"" | "testTopic" | false
"testView" | " " | false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;
import pl.allegro.tech.hermes.api.OfflineRetransmissionTask;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.management.api.auth.ManagementRights;
import pl.allegro.tech.hermes.management.domain.PermissionDeniedException;
import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionService;

import java.util.List;
import java.util.Optional;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;

Expand All @@ -36,6 +36,7 @@ public class OfflineRetransmissionEndpoint {
private final OfflineRetransmissionService retransmissionService;
private final RetransmissionPermissions permissions;
private final OfflineRetransmissionAuditor auditor;
private final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionEndpoint.class);

public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmissionService,
TopicRepository topicRepository, ManagementRights managementRights) {
Expand All @@ -47,9 +48,10 @@ public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmission
@POST
@Consumes(APPLICATION_JSON)
public Response createRetransmissionTask(@Valid OfflineRetransmissionRequest request, @Context ContainerRequestContext requestContext) {
logger.info("Offline retransmission request: {}", request);
retransmissionService.validateRequest(request);
permissions.ensurePermissionsToBothTopics(request, requestContext);
OfflineRetransmissionTask task = retransmissionService.createTask(request);
permissions.ensurePermissions(request, requestContext);
var task = retransmissionService.createTask(request);
auditor.auditRetransmissionCreation(request, requestContext, task);
return Response.status(Response.Status.CREATED).build();
}
Expand All @@ -68,24 +70,30 @@ public Response deleteRetransmissionTask(@PathParam("taskId") String taskId) {
}

private static class RetransmissionPermissions {
private final Logger logger = LoggerFactory.getLogger(RetransmissionPermissions.class);
private final TopicRepository topicRepository;
private final ManagementRights managementRights;


private RetransmissionPermissions(TopicRepository topicRepository, ManagementRights managementRights) {
this.topicRepository = topicRepository;
this.managementRights = managementRights;
}

private void ensurePermissionsToBothTopics(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) {
Topic sourceTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getSourceTopic()));
Topic targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
boolean hasPermissions = managementRights.isUserAllowedToManageTopic(sourceTopic, requestContext)
&& managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
private void ensurePermissions(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) {
var targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic()));
var hasPermissions = validateSourceTopic(request.getSourceTopic(), requestContext) && managementRights.isUserAllowedToManageTopic(targetTopic, requestContext);
if (!hasPermissions) {
logger.info("User {} has no permissions to make offline retransmission {}", requestContext.getSecurityContext().getUserPrincipal(), request);
throw new PermissionDeniedException("User needs permissions to source and target topics.");
}
}

private boolean validateSourceTopic(Optional<String> sourceTopic, ContainerRequestContext requestContext) {
return sourceTopic.isEmpty() || managementRights.isUserAllowedToManageTopic(
topicRepository.getTopicDetails(TopicName.fromQualifiedName(sourceTopic.get())),
requestContext
);
}
}

private static class OfflineRetransmissionAuditor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -42,7 +43,7 @@ public ObjectMapper objectMapper() {
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.disable(SerializationFeature.WRITE_NULL_MAP_VALUES);
mapper.registerModule(new JavaTimeModule());
mapper.registerModules(new JavaTimeModule(), new Jdk8Module()); // Jdk8Module is required for Jackson to serialize & deserialize Optional type

final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues.Std().addValue(
Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public OfflineRetransmissionService(OfflineRetransmissionRepository offlineRetra
}

public void validateRequest(OfflineRetransmissionRequest request) {
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic());
TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic().orElse(null));
TopicName targetTopicName = TopicName.fromQualifiedName(request.getTargetTopic());

ensureTopicsExist(sourceTopicName, targetTopicName);
Expand Down Expand Up @@ -49,12 +49,11 @@ public void deleteTask(String taskId) {
}

private void ensureTopicsExist(TopicName sourceTopicName, TopicName targetTopicName) {
boolean sourceTopicExists = topicRepository.topicExists(sourceTopicName);
boolean targetTopicExists = topicRepository.topicExists(targetTopicName);
if (!sourceTopicExists) {
if (sourceTopicName != null && !topicRepository.topicExists(sourceTopicName)) {
throw new OfflineRetransmissionValidationException("Source topic does not exist");
}
if (!targetTopicExists) {

if (!topicRepository.topicExists(targetTopicName)) {
throw new OfflineRetransmissionValidationException("Target topic does not exist");
}
}
Expand Down
Loading

0 comments on commit 75d83a6

Please sign in to comment.