Skip to content

Commit

Permalink
add storage datacenter to published message tracking (#1833)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky authored Mar 5, 2024
1 parent f2cef4c commit 8c9e700
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ public class PublishedMessageTrace implements MessageTrace {
private final String message;
private final String cluster;
private final String extraRequestHeaders;
private final String storageDatacenter;

@JsonCreator
public PublishedMessageTrace(@JsonProperty("messageId") String messageId,
@JsonProperty("timestamp") Long timestamp,
@JsonProperty("topicName") String topicName,
@JsonProperty("status") PublishedMessageTraceStatus status,
@JsonProperty("reason") String reason,
@JsonProperty("message") String message,
@JsonProperty("cluster") String cluster,
@JsonProperty("extraRequestHeaders") String extraRequestHeaders) {
@JsonProperty("timestamp") Long timestamp,
@JsonProperty("topicName") String topicName,
@JsonProperty("status") PublishedMessageTraceStatus status,
@JsonProperty("reason") String reason,
@JsonProperty("message") String message,
@JsonProperty("cluster") String cluster,
@JsonProperty("extraRequestHeaders") String extraRequestHeaders,
@JsonProperty("storageDc") String storageDatacenter) {
this.messageId = messageId;
this.timestamp = timestamp;
this.status = status;
Expand All @@ -36,6 +38,7 @@ public PublishedMessageTrace(@JsonProperty("messageId") String messageId,
this.message = message;
this.cluster = cluster;
this.extraRequestHeaders = extraRequestHeaders;
this.storageDatacenter = storageDatacenter;
}

public String getMessageId() {
Expand Down Expand Up @@ -76,6 +79,11 @@ public String getExtraRequestHeaders() {
return extraRequestHeaders;
}

@JsonProperty("storageDc")
public String getStorageDatacenter() {
return storageDatacenter;
}

@Override
public int hashCode() {
return Objects.hash(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
return new HandlersChainFactory(
topicsCache,
new MessageErrorProcessor(new ObjectMapper(), trackers, trackingHeadersExtractor),
new MessageEndProcessor(trackers, new BrokerListeners(), trackingHeadersExtractor),
new MessageEndProcessor(trackers, new BrokerListeners(), trackingHeadersExtractor, "dc"),
new MessageFactory(
new MessageValidators(Collections.emptyList()),
new MessageContentTypeEnforcer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class BackupMessagesLoader {
private final int maxResendRetries;
private final Duration resendSleep;
private final Duration readTopicInfoSleep;
private final String datacenter;

private final Set<Topic> topicsAvailabilityCache = new HashSet<>();
private final AtomicReference<ConcurrentLinkedQueue<Pair<Message, CachedTopic>>> toResend = new AtomicReference<>();
Expand All @@ -64,7 +65,9 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
SchemaRepository schemaRepository,
SchemaExistenceEnsurer schemaExistenceEnsurer,
Trackers trackers,
BackupMessagesLoaderParameters backupMessagesLoaderParameters) {
BackupMessagesLoaderParameters backupMessagesLoaderParameters,
String datacenter
) {
this.brokerMessageProducer = brokerMessageProducer;
this.brokerListeners = brokerListeners;
this.topicsCache = topicsCache;
Expand All @@ -75,6 +78,7 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
this.resendSleep = backupMessagesLoaderParameters.getLoadingPauseBetweenResend();
this.readTopicInfoSleep = backupMessagesLoaderParameters.getLoadingWaitForBrokerTopicInfo();
this.maxResendRetries = backupMessagesLoaderParameters.getMaxResendRetries();
this.datacenter = datacenter;
}

public void loadMessages(List<BackupMessage> messages) {
Expand Down Expand Up @@ -261,7 +265,7 @@ public void onPublished(Message message, Topic topic) {
brokerTimer.close();
cachedTopic.incrementPublished();
brokerListeners.onAcknowledge(message, topic);
trackers.get(topic).logPublished(message.getId(), topic.getName(), "", Collections.emptyMap());
trackers.get(topic).logPublished(message.getId(), topic.getName(), "", datacenter, Collections.emptyMap());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.schema.SchemaExistenceEnsurer;
import pl.allegro.tech.hermes.schema.SchemaRepository;
Expand Down Expand Up @@ -53,9 +54,11 @@ public BackupMessagesLoader backupMessagesLoader(BrokerMessageProducer brokerMes
TopicsCache topicsCache,
SchemaRepository schemaRepository,
Trackers trackers,
LocalMessageStorageProperties localMessageStorageProperties) {
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider
) {
return new BackupMessagesLoader(brokerMessageProducer, brokerListeners, topicsCache, schemaRepository,
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties);
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties, datacenterNameProvider.getDatacenterName());
}

@Bean(initMethod = "extend")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pl.allegro.tech.hermes.frontend.publishing.preview.MessagePreviewLog;
import pl.allegro.tech.hermes.frontend.server.auth.AuthenticationConfiguration;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

Expand Down Expand Up @@ -63,8 +64,9 @@ public ThroughputLimiter throughputLimiter(ThroughputProperties throughputProper

@Bean
public MessageEndProcessor messageEndProcessor(Trackers trackers, BrokerListeners brokerListeners,
TrackingHeadersExtractor trackingHeadersExtractor) {
return new MessageEndProcessor(trackers, brokerListeners, trackingHeadersExtractor);
TrackingHeadersExtractor trackingHeadersExtractor,
DatacenterNameProvider datacenterNameProvider) {
return new MessageEndProcessor(trackers, brokerListeners, trackingHeadersExtractor, datacenterNameProvider.getDatacenterName());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,32 @@ public class MessageEndProcessor {
private final Trackers trackers;
private final BrokerListeners brokerListeners;
private final TrackingHeadersExtractor trackingHeadersExtractor;
private final String datacenter;

public MessageEndProcessor(Trackers trackers, BrokerListeners brokerListeners, TrackingHeadersExtractor trackingHeadersExtractor) {
public MessageEndProcessor(Trackers trackers, BrokerListeners brokerListeners, TrackingHeadersExtractor trackingHeadersExtractor, String datacenter) {
this.trackers = trackers;
this.brokerListeners = brokerListeners;
this.trackingHeadersExtractor = trackingHeadersExtractor;
this.datacenter = datacenter;
}

public void sent(HttpServerExchange exchange, AttachmentContent attachment) {
trackers.get(attachment.getTopic()).logPublished(attachment.getMessageId(),
attachment.getTopic().getName(), readHostAndPort(exchange),
attachment.getTopic().getName(),
readHostAndPort(exchange),
datacenter,
trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
sendResponse(exchange, attachment, StatusCodes.CREATED);
attachment.getCachedTopic().incrementPublished();
}

public void delayedSent(HttpServerExchange exchange, CachedTopic cachedTopic, Message message) {
trackers.get(cachedTopic.getTopic()).logPublished(message.getId(), cachedTopic.getTopic().getName(),
readHostAndPort(exchange), trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
trackers.get(cachedTopic.getTopic()).logPublished(
message.getId(),
cachedTopic.getTopic().getName(),
readHostAndPort(exchange),
datacenter,
trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
brokerListeners.onAcknowledge(message, cachedTopic.getTopic());
cachedTopic.incrementPublished();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class BackupMessagesLoaderTest {

private final Topic topic = TopicBuilder.topic("pl.allegro.tech.hermes.test").build();

private final String datacenter = "dc1";

@Before
public void setUp() {
tempDir = Files.createTempDir();
Expand Down Expand Up @@ -104,7 +106,8 @@ public void shouldNotSendOldMessages() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
Expand Down Expand Up @@ -147,7 +150,8 @@ public void shouldSendAndResendMessages() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
Expand Down Expand Up @@ -176,7 +180,8 @@ public void shouldSendOnlyWhenBrokerTopicIsAvailable() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);
MessageRepository messageRepository = new ChronicleMapMessageRepository(
new File(tempDir.getAbsoluteFile(), "messages.dat"),
Expand Down Expand Up @@ -214,7 +219,8 @@ public void shouldSendMessageWithAllArgumentsFromBackupMessage() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ public interface LogSchemaAware {
String SOURCE_HOSTNAME = "hostname";
String REMOTE_HOSTNAME = "remote_hostname";
String EXTRA_REQUEST_HEADERS = "extra_request_headers";
String STORAGE_DATACENTER = "storageDc";

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.REMOTE_HOSTNAME;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.SOURCE_HOSTNAME;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.STATUS;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.STORAGE_DATACENTER;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.SUBSCRIPTION;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.TIMESTAMP;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.TIMESTAMP_SECONDS;
Expand Down Expand Up @@ -142,6 +143,7 @@ private XContentBuilder prepareMapping(String indexType, Function<XContentBuilde
.startObject(SOURCE_HOSTNAME).field("type", "keyword").field("norms", false).endObject()
.startObject(REMOTE_HOSTNAME).field("type", "keyword").field("norms", false).endObject()
.startObject(REASON).field("type", "text").field("norms", false).endObject()
.startObject(STORAGE_DATACENTER).field("type", "text").field("norms", false).endObject()
.startObject(EXTRA_REQUEST_HEADERS).field("type", "text").field("norms", false).endObject();

return additionalMapping.apply(jsonBuilder).endObject().endObject().endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import pl.allegro.tech.hermes.api.PublishedMessageTraceStatus;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.TrackerElasticSearchMetrics;
import pl.allegro.tech.hermes.tracker.BatchingLogRepository;
Expand Down Expand Up @@ -51,8 +50,9 @@ public void logPublished(String messageId,
long timestamp,
String topicName,
String hostname,
String storageDatacenter,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, SUCCESS, hostname, extraRequestHeaders)));
queue.offer(build(() -> success(messageId, timestamp, topicName, hostname, storageDatacenter, extraRequestHeaders)));
}

@Override
Expand All @@ -62,7 +62,7 @@ public void logError(String messageId,
String reason,
String hostname,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, ERROR, reason, hostname, extraRequestHeaders)));
queue.offer(build(() -> error(messageId, timestamp, topicName, reason, hostname, extraRequestHeaders)));
}

@Override
Expand All @@ -71,37 +71,49 @@ public void logInflight(String messageId,
String topicName,
String hostname,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, INFLIGHT, hostname, extraRequestHeaders)));
queue.offer(build(() -> inflight(messageId, timestamp, topicName, hostname, extraRequestHeaders)));
}

@Override
public void close() {
this.elasticClient.close();
}

private XContentBuilder document(String messageId,
long timestamp,
String topicName,
PublishedMessageTraceStatus status,
String hostname,
Map<String, String> extraRequestHeaders)
private XContentBuilder success(String messageId,
long timestamp,
String topicName,
String hostname,
String storageDatacenter,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, status.toString(), hostname, extraRequestHeaders).endObject();
return notEndedDocument(messageId, timestamp, topicName, SUCCESS.toString(), hostname, extraRequestHeaders)
.field(STORAGE_DATACENTER, storageDatacenter)
.endObject();
}

private XContentBuilder document(String messageId,

private XContentBuilder inflight(String messageId,
long timestamp,
String topicName,
PublishedMessageTraceStatus status,
String reason,
String hostname,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, status.toString(), hostname, extraRequestHeaders)
return notEndedDocument(messageId, timestamp, topicName, INFLIGHT.name(), hostname, extraRequestHeaders).endObject();
}

private XContentBuilder error(String messageId,
long timestamp,
String topicName,
String reason,
String hostname,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, ERROR.toString(), hostname, extraRequestHeaders)
.field(REASON, reason)
.endObject();
}


protected XContentBuilder notEndedDocument(String messageId,
long timestamp,
String topicName,
Expand Down
Loading

0 comments on commit 8c9e700

Please sign in to comment.