This API is considered experimental and might change before it's finalized. We encourage to submit feedback on this WebSocket API.
Pulsar WebSocket API is meant to provide a simple way to interact with Pulsar from languages outside the JVM.
Through WebSocket you can publish and consume messages and use all the features available from the Java client library.
Apart from the Pulsar standalone that already has the WebSocket service enabled, there are 2 possible ways to deploy the WebSocket service:
Enable WebSocket in conf/broker.conf
:
webSocketServiceEnabled=true
This will run the service within the same HTTP server that is already running in the broker.
WebSocket service can be run as a separate component on its own.
Configuration is located at conf/websocket.conf
and the minimal parameters to
change are:
globalZookeeperServers=...
# Port to use to server HTTP request
webServicePort=8080
# Name of the pulsar cluster to connect to
clusterName=...
Then, to start the service:
$ bin/pulsar-daemon start websocket
There are 2 WebSocket endpoint, for publishing and consuming messages and all exchanges are done through JSON messages.
Open a WebSocket session to create a producer for a specific topic:
http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}/{topic}
{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}
Key | Type | Requirement | Explanation |
---|---|---|---|
payload |
String | Required | Base-64 encoded payload |
properties |
Key-Value pairs | Optional | Application defined properties |
context |
String | Optional | Application defined request identifier |
key |
String | Optional | For partitioned topics, decides the partition to use |
replicationClusters |
List | Optional | Restrict replication to these clusters |
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
Key | Type | Requirement | Explanation |
---|---|---|---|
result |
String | Required | ok if successful or error message |
messageId |
String | Required | Message Id assigned to the published message |
context |
String | Optional | Application defined request identifier |
Open a WebSocket session to create a producer for a specific topic:
http://{serviceUrl}:8080/ws/consumer/persistent/{property}/{cluster}/{namespace}/{topic}/{subscription}
Server will push messages on the WebSocket session:
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785",
"context": "1"
}
Key | Type | Requirement | Explanation |
---|---|---|---|
messageId |
String | Required | Message Id |
payload |
String | Required | Base-64 encoded payload |
properties |
Key-Value pairs | Optional | Application defined properties |
publishTime |
String | Required | Publish timestamp |
context |
String | Optional | Application defined request identifier |
key |
String | Optional | Original routing key set by producer |
Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.
{
"messageId": "CAAQAw=="
}
Key | Type | Requirement | Explanation |
---|---|---|---|
messageId |
String | Required | Message Id of the processed message |
In case of error the server will close the WebSocket session using the following error codes:
Error Code | Error Message |
---|---|
1 | Failed to create producer |
2 | Failed to subscribe |
3 | Failed to de-serialize from JSON |
4 | Failed to serialize to JSON |
5 | Failed to authenticate client |
6 | Client is not authorized |
Application is responsible to re-establish a new WebSocket session after a backoff period.
import websocket, base64, json
ws = websocket.create_connection(
'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic')
# Send one message
ws.send(json.dumps({
'payload' : base64.b64encode('Hello World'),
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print 'Message published successfully'
else:
print 'Failed to publish message:', response
ws.close()
import websocket, base64, json
ws = websocket.create_connection(
'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub')
while True:
msg = json.loads(ws.recv())
if not msg: break
print 'Received: ', msg, ' - payload:', base64.b64decode(msg['payload'])
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
var WebSocket = require('ws');
var ws = new WebSocket(
"ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1");
var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};
ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});
ws.on('message', function(message) {
console.log('received ack: %s', message);
});
var WebSocket = require('ws');
var socket = new WebSocket(
"ws://localhost:6080/pubilsh/persistent/my-property/us-west/my-ns/my-topic1/my-sub-1")
socket.onmessage = function(pckt){
var receiveMsg = pckt.data;
var ackMsg = {"messageId" : receiveMsg.messageId}
socket.send(JSON.stringify(ackMsg));
};