-
Notifications
You must be signed in to change notification settings - Fork 354
Message Queue
Experimental and not yet released !!!
This recipe uses cassandra to implement a a durable message queue. The implementation uses a combination of the distributed row lock recipe and two layers of sharding (yes sharding) to achieve a high throughput durable queue. Sharding solves two problems. The first is a time based rolling shard where at any given time most of the traffic is going to one shard. The reason for this is to minimize the impact of a growing wide row and to allow compaction to clean up. This is a rolling (i.e. mod) shard to avoid having to go back and check any old shards that may contain data. The second layer of sharding is meant to distribute load across the cluster. This mitigates hot spots and also reduces lock contention where there are multiple client.
The first step to using the queue is to create a MessageQueue instance. Multiple producers and consumers may be created from a single MessageQueue instance.
The following example shows how to create a message queue with 50 rolling time shards of 30 seconds and 20 concurrency shards within each time shard. That's a total of 1000 rows being used by the message queue. Notice that the message queue uses a keyspace client that was previously created.
CountingQueueStats stats = new CountingQueueStats();
MessageQueue queue = new ShardedDistributedMessageQueue.Builder()
.withConsistencyLevel(ConsistencyLevel.CL_QUORUM)
.withColumnFamily("Queue")
.withKeyspace(keyspace)
.withStats(stats)
.withBuckets(50, 30, TimeUnit.SECONDS)
.withShardCount(20)
.withPollInterval(100L, TimeUnit.MILLISECONDS)
.build();
As a convenience the API provides a method to create the underlying column families used by the recipe.
queue.createStorage();
'Creating' the queue adds metadata to the queue column family which can be used by other clients to determine the number of shards and other queue parameters. Call this only once when setting up the queue.
queue.createQueue();
Messages may be inserted one at a time or in bulk. Note that a row doesn't need to be locked when producing messages.
MessageProducer producer = queue.createProducer();
String messageId = producer.sendMessage(new Message().addParameter("data", "Hello world!"));
MessageQueueDispatcher provides a basic implementation of threads reading from the queue and processing messages from a thread pool
MessageQueueDispatcher dispatcher = new MessageQueueDispatcher.Builder()
.withBatchSize(5)
.withCallback(new Function<Message, Boolean>() {
@Override
public synchronized Boolean apply(Message message) {
// Return true to 'ack' the message
// Return false to not 'ack' which will result in the message timing out
// Throw any exception to put the message into a poison queue
return true;
}
})
.withMessageQueue(scheduler)
.withConsumerCount(1)
.withThreadCount(2)
.build();
dispatcher.start();
// When ready to shut down call
dispatcher.stop();
Instead of using the default Function provided to the message dispatcher it is also possible to specify a specific handler class. When a message is processed the dispatcher will create an instance of this class and pass the message as a parameter.
public class HelloWorldFunction implements Function<Message, Boolean>{
@Override
public Boolean apply(@Nullable Message input) {
System.out.println("Hello world!");
return true;
}
}
Message m = new Message()
.setKey("HelloWorld")
.setTaskClass(HelloWorldFunction.class.getCanonicalName())
String messageId = producer.sendMessage(m);
Messages may be consumed in arbitrary batch sizes. The current implementation will attempt to read up to batchSize messages from the most recent shards while frequently checking older shards for lingering messages. Once consumed the messages must back acknowledged so that the timeout event may be cancelled.
Messages are consumed using the following sequence
- Lock a shard
- Read N messages
- Submit a single mutation which releases the lock, delete the messages, inserts a timeout message for each message.
- Process the message
- ACK the message by deleting the timeout column
The following example reads a block of up to 100 messages and acks the entire set with one call. Messages can be ack'd individually as well.
Collection<Message> messages = null;
try {
messages = consumer.readMessages(100);
try {
for (Message message : messages) {
// Do something with the message
}
}
finally {
if (messages != null)
consumer.ackMessage(messages);
}
The call to produce a message returns a message id that may be used to remove the message from the queue without having to consume it from the queue first.
String messageId = producer.sendMessage(new Message().setData("The quick brown fox jumped over the lazy cow"));
queue.deleteMessage(messageId);
The call to produce a message returns a message id that may be used to read the message from the queue without having to consume it from the queue first.
String messageId = producer.sendMessage(new Message().setData("The quick brown fox jumped over the lazy cow"));
Message message = queue.readMessage(messageId);
Sometimes its nice to be able to assign a key to messages in the queue. This provides the ability to check if a recurring task already exists in the queue and to be able to read/cancel a message without having to know the specific message id (which depends on the execution time and shard). Keep in mind that using a lookup key adds additional overhead since that information must be maintained in a separate column family that is updated or accessed for each operation.
The following code will enqueue an event that has key "HelloWorld"
MessageProducer producer = queue.createProducer();
String messageId = producer.sendMessage(new Message()
.setKey("HelloWorld")
.addParameter("data", "Hello world!"));
It is now possible to access the event by key instead of the messageId
Message message = scheduler.readMessageByKey("HelloWorld");
You can also delete the message by key. This will delete the lookup as well as the queue event.
scheduler.deleteMessageByKey("HelloWorld");
To run once either don't specify a trigger or specify a RunOnceTrigger
Message m = new Message()
.setTrigger(new RunOnceTrigger.Builder()
.withDelay(10, TimeUnit.MINUTES)
.build()
);
Message m = new Message()
.setTrigger(new RepeatingTrigger.Builder()
.withDelay(10, TimeUnit.MINUTES)
.withInterval(10, TimeUnit.MINUTES)
.withRepeatCount(5)
.build()
);
You can define this CronTrigger which uses the Quartz cron parser to determine the next execution time.
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import org.quartz.CronExpression;
import com.netflix.astyanax.recipes.scheduler.Trigger;
public class CronTrigger extends AbstractTrigger {
public static class Builder {
private CronTrigger trigger = new CronTrigger();
public Builder withExpression(String expression) {
trigger.expression = expression;
return this;
}
public CronTrigger build() {
trigger.setTriggerTime(trigger.getNextCronFromNow());
return trigger;
}
}
private String expression;
@Override
public Trigger nextTrigger() {
CronTrigger nextTrigger = new CronTrigger();
nextTrigger.expression = expression;
nextTrigger.setTriggerTime(getNextCronFromNow());
nextTrigger.setExecutionCount(getExecutionCount() + 1);
return nextTrigger;
}
private long getNextCronFromNow() {
try {
Calendar cal = Calendar.getInstance();
Date currTime = cal.getTime();
CronExpression cronExpression = new CronExpression(expression);
return cronExpression.getNextValidTimeAfter(currTime).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
public String getExpression() {
return expression;
}
public void setExpression(String expression) {
this.expression = expression;
}
}
Here's an example of how to define a cron trigger that runs every 5 seconds
Message m = new Message()
.setTrigger(new CronTrigger.Builder()
.withExpression("*/5 * * * * ?")
.build()
);
Each shard is implemented as a separate wide row with the shard name as the row key and columns representing items in the queue.
The row key format is {QueueName}:{RollingTimeShard}:{ConcurrencyShard}.
The column format is a composite which takes advantage of the composite comparator type sorted to achieve several key features. The structure is {MessageType}{Priority}{TimeUUID}{State}.
- Metadata (0) - Metadata is used to store any queue configuration information and to notify clients of configuration changes (this ins't implemented yet)
- Lock (1) - Row lock columns exist on the same row to take advantage of row level isolation and to reduce having to make separate calls for releasing the lock and updating the queue.
- Message (2) - This is the actual message.
This structure makes it possible to read both the lock columns and message in a single call to cassandra thereby reducing overhead. All of the lock columns will be at the top of the response followed by the actual messages.
Priority makes it possible to inject high priority messages for processing ahead of lower priority events. When using priorities it is important to understand that due to limitations of the queue data model high priority tasks cannot be scheduled for delayed execution. Only priority 0 messages may be scheduled for delayed execution. Also, the timeout event for a high priority event will be scheduled at priority 0 and will therefore loose it's high priority should the event processing time out.
TimeUUID provides both uniqueness and sorting by time of messages in the queue. For lock columns it is primarily used to guarantee unique ids for clients trying to take the lock. Time ordering provides both FIFO like semantics as well as delayed message execution.
State is used by the locking algorithm.
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Jobs
- Getting-Started
- Configuration
- Features
- Monitoring
- Thread Safety
- Timeouts
- Recipes
- Examples
- Javadoc
- Utilities
- Cassandra-Compatibility
- FAQ
- End-to-End Examples
- Astyanax Integration with Java Driver