Default Java and Kafka implementation for Hagrid.
Hagrid itself is a system to easily communicate with pub/sub based systems like Kafka and RabbitMQ. It uses Protobuf to serialize and deserialize the sent data, as it is really efficient and has a concise protocol definition syntax. For us, we can use Hagrid to realize microservices and their communication with each other.
In this document we want to elaborate on how to use Hagrid and how to create your own implementation (e.g. if you want to use RabbitMQ).
Here we want to give a shortcut example on how to quickly use Hagrid.
First we have to import the Maven repository for Hagrid itself and for the Grape service which serves the HagridService
.
<dependency>
<groupId>dev.volix.rewinside.odyssey.hagrid</groupId>
<artifactId>hagrid-api</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>dev.volix.lib</groupId>
<artifactId>grape-api</artifactId>
<version>[0.0,)</version>
<scope>provided</scope>
</dependency>
Now we can access Grape and get our service by using:
Future<HagridService> future = Grape.getInstance().get(HagridService.class);
HagridService hagrid = future.get();
After that we can now start sending packets.
hagrid.communication().registerTopic("chat", new StringHagridSerdes());
hagrid.wizard().topic("chat")
.payload("Hello there!")
.send();
Just like the name suggests, topics are like a channel where specific messages flow through. That means, that data get tagged with a topic such that consumers who subscribe to that topic can all consume said data.
In our example above we used the topic chat
, which could be like a chatroom, where many receivers can consume a message.
In Hagrid topics are more like a pattern of topics and they are not specific. Which means if you listen to the topic chat
, you also listen to all subtopics of chat (e.g. chat-global
, chat-friends
, chat-friends-sports
and so on).
Some valid patterns are:
topic
: Listen to all subtopics, no matter the depth.topic-*
: Listen to only subtopics with the depth of 1.topic-subtopic-*
: Listen to only subtopics oftopic-subtopic
with depth 1.topic-*-*
: Listen to subtopics oftopic
with the depth 2.
Each topic needs to have a specific serdes (serializer and deserializer) attached to it, so that Hagrid can understand how to pack and unpack the data.
There are some default serdes, that could be helpful:
- NullHagridSerdes: No matter what data gets sent, it gets trashed. This can be used to just test connections, ping services, etc.
- StringHagridSerdes: Allows sending plain text as strings.
- MessageHagridSerdes: Key serdes of Hagrid, which allows sending Protobuf messages across the network.
To create a custom one, you simply have to implement HagridSerdes
like so:
public class StringHagridSerdes implements HagridSerdes<String> {
@Override
public Class<String> getType() {
return String.class;
}
@Override
public byte[] serialize(final String payload) {
return payload.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserialize(final String typeUrl, final byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}
Without registering a topic, Hagrid will not subscribe to it and therefore not know when a packet goes into the topic. Additionally, you can not send a packet into the topic, as Hagrid needs to know how to serialize it.
hagrid.communication().registerTopic("chat", new StringHagridSerdes());
You can see that the method expects two parameters, first is the name of the topic and second is the serdes, that we want to use for this specific topic.
Important note: The topic must not be a topic pattern, but it will still automatically set the serdes for all subtopics as well.
If you want to have more options for the specific topic, you can use the method as follows:
hagrid.communication().registerTopic("chat", new StringHagridSerdes(),
TopicPropertes().newBuilder().receiveStalePackets(false).build())
That way you can configure the topic with different options. In this case the topic will not receive packets that have been sent while the service was inactive.
Now that we concluded how topics work, we can go on explaining how to listen to packets.
There are actually two ways to create and register a listener.
If you want the fast way, you can use the following:
hagrid.communication().registerListener(HagridListener.builder(new HagridListenerMethod<String>() {
@Override
public void listen(String payload, HagridPacket<String> req, HagridResponse response) {
// do stuff with the payload
}
}).payloadClass(String.class).topic("chat").build());
In the builder you can set many different things, but most important is the payloadClass
and the topic
. These two have to be set.
For convenience you can use an annotation and then register the enclosing class of the method, so that the method gets registered as a listener.
Here is an example:
public class SomeClass {
@HagridListens(topic = "chat")
public void onChat(String payload) {
System.out.println("Harry says: " + payload);
}
}
As you can see in the method parameter list, we can skip the last two, if we don't need them. The valid parameters are:
onFoo(T payload)
onFoo(T payload, HagridPacket<T> req)
onFoo(T payload, HagridPacket<T> req, HagridResponse res)
Now we can register our listener by using the following:
hagrid.communication().registerListeners(new SomeClass());
That way every method with the @HagridListens
annotation gets registered. Note: The methods can be private or static, it does not matter.
If you have a lot of listener methods in a single class you may want to set the topic for all of them. This can be done by annotating the enclosing class, like so:
@HagridListens(topic = "chat")
public class SomeClass {
@HagridListens
public void onChat1(String payload) {
System.out.println("Harry says: " + payload);
}
@HagridListens
public void onChat2(String payload) {
System.out.println("Harry also says: " + payload);
}
}
Now all listener methods automatically have the topic set to chat
by the class.
In Hagrid we support listening to both incoming as well as outgoing packets. Outgoing means, that if we sent a packet via our instance, the outgoing listener will trigger.
@HagridListens(topic = "chat", direction = Direction.UPSTREAM)
public void onChat(String payload) {
System.out.println("We sent following message: " + payload);
}
Important: This listener will be executed after the data has been sent.
If we have a structure such that all our listeners need to respond to the request they are getting, it can get quite tedious to handle that ourselves.
For that we can use a second annotation @HagridResponds
, to say that we want to always send a response, no matter if it fails or if we don't specify a response at all.
@HagridResponds
@HagridListens(topic = "chat")
public void onChat(String payload) {
int someNumber = Integer.parseInt(payload);
System.out.println("We received a number: " + someNumber);
}
Now if the payload is a number, everything is fine and we would respond with the status OK, otherwise we would automatically respond with INTERNAL and a message specifying the exception.
Otherwise if we want to specify a status ourselves, we can use the following:
@HagridListens(topic = "chat")
public void onChat(String payload, HagridPacket<?> req, HagridResponse res) {
try {
res.status(StatusCode.OK, "everything is fine.");
} catch (NumberFormatException ex) {
res.status(StatusCode.BAD_REQUEST, "you have not sent a number");
return;
}
System.out.println("We received a number: " + someNumber);
}
That way after the listener has been executed, it will automatically construct a needed response with the status given.
To add some kind of more detailed description of what happened, you can use something called subcodes
.
@HagridListens(topic = "chat")
public void onChat(String payload, HagridPacket<?> req, HagridResponse res) {
try {
res.status(StatusCode.OK, "everything is fine.");
} catch (NumberFormatException ex) {
// adding a subcode as an enum or an int to the status
res.status(StatusCode.BAD_REQUEST, ChatSubCode.NOT_A_NUMBER, "you have not sent a number");
return;
}
System.out.println("We received a number: " + someNumber);
}
In this case ChatSubCode
is an enum created by the application, which then gets transformed into an int by the ordinal - more specifically ordinal() + 1
as the subcode starts counting at 1.
Now the receiver of the response can also use ChatSubCode
to get a better idea of what BAD_REQUEST
means specifically without having to map the message to a specific status.
As explained above topics on Hagrid's site can be seen as a pattern. This can become useful if we e.g. have chatrooms and we want to listen to all of them.
@HagridListens(topic = "chat-room-*")
public void onChat(String payload, HagridPacket<?> req) {
System.out.printf("Someone in room %s said: %s", req.getTopic(), payload);
}
You can see, that we can listen to all subtopics of chat-room-*
with depth 1 and that we can get the actual topic from our packet via req.getTopic()
.
Now that we understand how all these things work, we can finally explain a bit more on how sending packets work.
Without expecting a response, we can just send the packet down the line. Just like we showed in the example at the beginning, we can send one like this:
hagrid.wizard().topic("chat")
.payload("Hello there!")
.send();
If we now want to respond to another packet, we can use something like this:
HagridPacket<String> question = new HagridPacket<>("How are you?");
hagrid.wizard().topic("chat")
.respondsTo(question.getId())
.payload("I'm fine!")
.send();
Of course the packet question
does not have an id at the moment, the id gets generated automatically when sending a packet. But you can see, that responding to an incoming packet is really easy!
Let's do the same but instead we want to say, that we are not fine:
hagrid.wizard().topic("chat")
.respondsTo(question.getId())
.status(StatusCode.INTERNAL)
.payload("I'm not fine ..")
.send();
That way we can give information if the request is valid and if it could be handled correctly, without modifying the payload!
All these sendings where a one way ticket, but let us have a look how we can listen for a response.
hagrid.wizard().topic("chat")
.payload("How are you?")
.sendAndWait(String.class)
.thenAccept(stringHagridPacket -> {
if(stringHagridPacket.getStatus().isOk()) {
// "That is good to hear!"
} else {
// "I hope you get better soon .."
}
});
For that we have to specify which kind of payload we wait for. For our example it is always a String
, but if the serdes of the topic is set to handle more abstract classes (e.g. CloudPacket
), then we can pass on a specific implementation of that (e.g. BroadcastMessageCloudPacket
).
If no response is being sent back, the listener will timeout. We can handle it like this:
hagrid.wizard().topic("chat")
.payload("How are you?")
.sendAndWait(String.class)
.thenAccept(stringHagridPacket -> {
if(stringHagridPacket.getStatus().isTimeout()) {
// we did not get a response
return;
}
// do something
});
In the brief example at the beginning we showed how it would look like when using Grape. But sometimes you need to access an actual implementation of the HagridService
. Or you want to create your own one.
Let us have a look at that.
To prepare using a custom implementation we definitely have to implement some custom classes first.
- HagridConnectionHandler:
ConnectionHandler
is an interface and this class is a small abstract default implementation of that. We use the connection handler to manage the connection to the external broker. - HagridSubscriber: The subscriber is the instance to subscribe to topics and to listen for packets. The
DownstreamHandler
will delegate the connection to it. - HagridPublisher: The publisher is the instance for sending packets. The
UpstreamHandler
will delegate the connection to it.
Now we can create a custom class and let it implement HagridService
. You will see that there are specific methods that need to return one of the handlers.
public class CustomHagridService implements HagridService {
private final Logger logger = Logger.getLogger(this.getClass().getName());
private final PropertiesConfig hagridConfig;
private final CustomConnectionHandler connectionHandler;
private final HagridUpstreamHandler upstreamHandler;
private final HagridDownstreamHandler downstreamHandler;
private final HagridCommunicationHandler communicationHandler;
@Override
public Logger getLogger() {
return logger;
}
@Override
public PropertiesConfig getConfiguration() {
return hagridConfig;
}
@Override
public PacketWizard wizard() {
return new HagridPacketWizard(this);
}
@Override
public ConnectionHandler connection() {
return connectionHandler;
}
@Override
public UpstreamHandler upstream() {
return upstreamHandler;
}
@Override
public DownstreamHandler downstream() {
return downstreamHandler;
}
@Override
public CommunicationHandler communication() {
return communicationHandler;
}
}
But you can see that we have default implementations for the most part. But they are of course not in the hagrid-api
Repository, but instead in the hagrid-core
. You can import it like that:
<dependency>
<groupId>dev.volix.rewinside.odyssey.hagrid</groupId>
<artifactId>hagrid-core</artifactId>
<version>1.2.0</version>
</dependency>
At default we have a Kafka implementation that one can use and is recommended to do so.
For that we just have to import the hagrid-kafka
implementation
<dependency>
<groupId>dev.volix.rewinside.odyssey.hagrid</groupId>
<artifactId>hagrid-kafka</artifactId>
<version>0.7.0</version>
</dependency>
And create a KafkaHagridService
instance.
HagridService hagrid = KafkaHagridService.create()
.withBrokerAddress("localhost:9092")
.withBrokerId("my-microservice")
.withAuth(KafkaAuth.forBasicAuth("user", "password"))
.build();
try {
hagrid.connect();
} catch (HagridConnectionException e) {
// can not connect to Kafka broker
return;
}
You can see that that way we also have to connect the service, as we do not have it prepared for us via Grape. And now we can use the service just like any other implementation. Under the hood, there are more things that happen, we do not need to care.