Skip to content

Latest commit

 

History

History
269 lines (196 loc) · 7.4 KB

WebSocket.md

File metadata and controls

269 lines (196 loc) · 7.4 KB

Pulsar WebSocket API

This API is considered experimental and might change before it's finalized. We encourage to submit feedback on this WebSocket API.

Pulsar WebSocket API is meant to provide a simple way to interact with Pulsar from languages outside the JVM.

Through WebSocket you can publish and consume messages and use all the features available from the Java client library.

Running WebSocket service

Apart from the Pulsar standalone that already has the WebSocket service enabled, there are 2 possible ways to deploy the WebSocket service:

Embedded with Pulsar broker

Enable WebSocket in conf/broker.conf:

webSocketServiceEnabled=true

This will run the service within the same HTTP server that is already running in the broker.

WebSocket as a separate component

WebSocket service can be run as a separate component on its own.

Configuration is located at conf/websocket.conf and the minimal parameters to change are:

globalZookeeperServers=...

# Port to use to server HTTP request
webServicePort=8080

# Name of the pulsar cluster to connect to
clusterName=...

Then, to start the service:

$ bin/pulsar-daemon start websocket

API Reference

There are 2 WebSocket endpoint, for publishing and consuming messages and all exchanges are done through JSON messages.

Producer

Open a WebSocket session to create a producer for a specific topic:

http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}/{topic}
Publishing a message
{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}
Key Type Requirement Explanation
payload String Required Base-64 encoded payload
properties Key-Value pairs Optional Application defined properties
context String Optional Application defined request identifier
key String Optional For partitioned topics, decides the
partition to use
replicationClusters List Optional Restrict replication to these clusters
Acknowledgement from server
{
   "result": "ok",
   "messageId": "CAAQAw==",
   "context": "1"
 }
Key Type Requirement Explanation
result String Required ok if successful or error message
messageId String Required Message Id assigned to the published message
context String Optional Application defined request identifier

Consumer

Open a WebSocket session to create a producer for a specific topic:

http://{serviceUrl}:8080/ws/consumer/persistent/{property}/{cluster}/{namespace}/{topic}/{subscription}
Receiving messages

Server will push messages on the WebSocket session:

{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785",
  "context": "1"
}
Key Type Requirement Explanation
messageId String Required Message Id
payload String Required Base-64 encoded payload
properties Key-Value pairs Optional Application defined properties
publishTime String Required Publish timestamp
context String Optional Application defined request identifier
key String Optional Original routing key set by producer
Acknowledging the message

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.

{
  "messageId": "CAAQAw=="
}
Key Type Requirement Explanation
messageId String Required Message Id of the processed message

Error codes

In case of error the server will close the WebSocket session using the following error codes:

Different Possible Error Types

Error Code Error Message
1 Failed to create producer
2 Failed to subscribe
3 Failed to de-serialize from JSON
4 Failed to serialize to JSON
5 Failed to authenticate client
6 Client is not authorized

Application is responsible to re-establish a new WebSocket session after a backoff period.

Examples

Python

Python producer

import websocket, base64, json

ws = websocket.create_connection(
   'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic')

# Send one message
ws.send(json.dumps({
    'payload' : base64.b64encode('Hello World'),
    'properties': {
       'key1' : 'value1',
       'key2' : 'value2'
    },
    'context' : 5
}))

response =  json.loads(ws.recv())
if response['result'] == 'ok':
    print 'Message published successfully'
else:
    print 'Failed to publish message:', response
ws.close()

Python consumer

import websocket, base64, json

ws = websocket.create_connection(
   'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub')

while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print 'Received: ', msg, ' - payload:', base64.b64decode(msg['payload'])

    # Acknowledge successful processing
    ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

NodeJS

NodeJS producer

var WebSocket = require('ws');
var ws = new WebSocket(
	  "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1");

var message = {
  "payload" : new Buffer("Hello World").toString('base64'),
  "properties": {
    "key1" : "value1",
    "key2" : "value2"
  },
  "context" : "1"
};

ws.on('open', function() {
  // Send one message
  ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
  console.log('received ack: %s', message);
});

NodeJS consumer

var WebSocket = require('ws');
var socket = new WebSocket(
	"ws://localhost:6080/pubilsh/persistent/my-property/us-west/my-ns/my-topic1/my-sub-1")

socket.onmessage = function(pckt){
	var receiveMsg = pckt.data;
	var ackMsg = {"messageId" : receiveMsg.messageId}
	socket.send(JSON.stringify(ackMsg));      
};