-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for alternate backends, including a NATS adapter #79
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
/docs/ | ||
/lib/ | ||
/bin/ | ||
lib/ | ||
bin/ | ||
/.shards/ | ||
*.dwarf | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# Multi-Backend example | ||
|
||
This is an example Turbo+Cable app to demo the use of multiple backends with the Cable shard. | ||
|
||
## Installation | ||
|
||
To use the Redis and NATS backends, you will need to have access to running | ||
Redis and NATS servers. A package manager for your operating system can simplify | ||
the installation of them on your machine. | ||
|
||
If you don't want to install a NATS server, you can use publicly available servers. For example, there is a public NATS server available at `demo.nats.io` — just don't use it for production. 😄 | ||
|
||
Once you have Redis and NATS installed, install the Crystal dependencies: | ||
|
||
```shell | ||
shards install | ||
``` | ||
|
||
## Usage | ||
|
||
To use either backend, specify the url in the `CABLE_BACKEND_URL` environment variable: | ||
|
||
```shell | ||
CABLE_BACKEND_URL=redis:/// | ||
CABLE_BACKEND_URL=nats:/// | ||
CABLE_BACKEND_URL=nats://demo.nats.io/ | ||
``` | ||
|
||
If you would like to see the messages passing through Redis when using the Redis backend, you can use the Redis CLI with the following command: | ||
|
||
```shell | ||
redis-cli subscribe time | ||
``` | ||
|
||
If you would like to see the messages passing through NATS when using the NATS backend, you can use the [NATS CLI](https://github.com/nats-io/natscli), which may need to be installed separately from the NATS server. | ||
|
||
```shell | ||
nats sub time | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
version: 2.0 | ||
shards: | ||
base32: | ||
git: https://github.com/jgaskins/base32.git | ||
version: 0.1.1+git.commit.18f5647c42dae4de654e3003825fab43dc95b029 | ||
|
||
cable: | ||
path: ../.. | ||
version: 0.2.2 | ||
|
||
cron_parser: | ||
git: https://github.com/kostya/cron_parser.git | ||
version: 0.4.0 | ||
|
||
db: | ||
git: https://github.com/crystal-lang/crystal-db.git | ||
version: 0.12.0 | ||
|
||
future: | ||
git: https://github.com/crystal-community/future.cr.git | ||
version: 1.0.0 | ||
|
||
habitat: | ||
git: https://github.com/luckyframework/habitat.git | ||
version: 0.4.7 | ||
|
||
nats: | ||
git: https://github.com/jgaskins/nats.git | ||
version: 1.3.3 | ||
|
||
redis: | ||
git: https://github.com/jgaskins/redis.git | ||
version: 0.7.0 | ||
|
||
tasker: | ||
git: https://github.com/spider-gazelle/tasker.git | ||
version: 2.1.4 | ||
|
||
turbo: | ||
git: https://github.com/jgaskins/turbo.git | ||
version: 0.1.0+git.commit.8685616e26d7903d1559f5f3f8b96085bc10af12 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
name: multi_backend | ||
version: 0.1.0 | ||
|
||
authors: | ||
- Jamie Gaskins <jgaskins@hey.com> | ||
|
||
targets: | ||
multi_backend: | ||
main: src/multi_backend.cr | ||
|
||
dependencies: | ||
cable: | ||
path: ../.. | ||
nats: | ||
github: jgaskins/nats | ||
redis: | ||
github: jgaskins/redis | ||
turbo: | ||
github: jgaskins/turbo | ||
|
||
crystal: 1.9.2 | ||
|
||
license: MIT |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
require "turbo/cable" | ||
require "cable/backend/nats" | ||
require "cable/backend/redis/backend" | ||
|
||
module AppCable | ||
class Connection < Cable::Connection | ||
identified_by id | ||
|
||
getter id = UUID.random.to_s | ||
|
||
def connect | ||
end | ||
end | ||
end | ||
|
||
Cable.configure do |settings| | ||
settings.route = "/cable" # the URL your JS Client will connect | ||
# settings.url = "redis:///" | ||
# settings.url = ENV.fetch("NATS_URL", "nats:///") | ||
settings.url = ENV.fetch("CABLE_BACKEND_URL", "redis:///") | ||
end | ||
Comment on lines
+16
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Notice that in the settings for the demo we aren't actually setting |
||
|
||
Turbo::StreamsChannel.signing_key = "this is my signing key" | ||
|
||
spawn do | ||
loop do | ||
duration = Time.measure do | ||
Turbo::StreamsChannel.broadcast_update_to "time", | ||
message: Time.local.to_s | ||
end | ||
sleep 1.second - duration | ||
end | ||
end | ||
|
||
http = HTTP::Server.new([ | ||
HTTP::LogHandler.new, | ||
Cable::Handler(AppCable::Connection).new, | ||
]) do |context| | ||
context.response << <<-HTML | ||
<!doctype html> | ||
#{Turbo.javascript_tag} | ||
#{Turbo.cable_tag} | ||
#{Turbo::Frame.new(id: "time") { }} | ||
#{Turbo.stream_from "time"} | ||
Comment on lines
+41
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It blew my mind that this was all we needed to render in order to demo this. 🤯 |
||
HTML | ||
end | ||
|
||
http.listen 3200 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
require "nats" | ||
|
||
module Cable | ||
class NATSBackend < BackendCore | ||
register "nats" | ||
|
||
getter nats : NATS::Client do | ||
NATS::Client.new(URI.parse(Cable.settings.url)) | ||
end | ||
getter streams = Hash(String, Set(NATS::Subscription)).new { |streams, channel| | ||
streams[channel] = Set(NATS::Subscription).new | ||
} | ||
Comment on lines
+10
to
+12
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to use The reason we had to include this is because you can't unsubscribe from a NATS subject directly. You have to unsubscribe from the subscription id since you can subscribe to the same subject multiple times. |
||
|
||
def subscribe_connection | ||
nats | ||
end | ||
|
||
def publish_connection | ||
nats | ||
end | ||
|
||
def close_subscribe_connection | ||
nats.close rescue nil | ||
end | ||
|
||
def close_publish_connection | ||
nats.close rescue nil | ||
end | ||
Comment on lines
+22
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NATS multiplexes everything over a single TCP connection (no connection pool needed), so we ignore any errors caused by closing the client twice. This might be too naive since |
||
|
||
def open_subscribe_connection(channel) | ||
nats | ||
end | ||
|
||
def publish_message(stream_identifier : String, message : String) | ||
nats.publish stream_identifier, message | ||
end | ||
|
||
def subscribe(stream_identifier : String) | ||
subscription = nats.subscribe stream_identifier, queue_group: object_id.to_s do |msg| | ||
Cable.server.fiber_channel.send({ | ||
stream_identifier, | ||
String.new(msg.body), | ||
}) | ||
end | ||
streams[stream_identifier] << subscription | ||
end | ||
|
||
def unsubscribe(stream_identifier : String) | ||
if subscriptions = streams.delete(stream_identifier) | ||
subscriptions.each do |subscription| | ||
nats.unsubscribe subscription | ||
end | ||
end | ||
end | ||
|
||
def ping_redis_subscribe | ||
nats.ping | ||
end | ||
|
||
def ping_redis_publish | ||
nats.ping | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,10 @@ | ||
require "redis" | ||
|
||
module Cable | ||
class RedisBackend < Cable::BackendCore | ||
register "redis" | ||
register "rediss" | ||
Comment on lines
+5
to
+6
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Registering which URI schemes you support is as simple as this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🐍 |
||
|
||
# connection management | ||
getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url)) | ||
getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
require "habitat" | ||
require "json" | ||
require "redis" | ||
require "./cable/**" | ||
|
||
# TODO: Write documentation for `Cable` | ||
|
@@ -32,8 +31,12 @@ module Cable | |
setting token : String = "token", example: "token" | ||
setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379" | ||
setting disable_sec_websocket_protocol_header : Bool = false | ||
setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend" | ||
setting redis_ping_interval : Time::Span = 15.seconds | ||
setting backend_class : Cable::BackendCore.class = Cable::RegistryBackend, example: "Cable::RedisBackend" | ||
setting backend_ping_interval : Time::Span = 15.seconds | ||
@[Deprecated("Use backend_ping_interval")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be curious to see if this shows... Since the method doesn't take annotations in to consideration, this may not actually render. That would be a nice feature for Habitat though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're right. It looks like it collects the |
||
setting redis_ping_interval : Time::Span do | ||
backend_ping_interval | ||
end | ||
Comment on lines
+37
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't tell if Habitat actually supports the block format like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, that's interesting... At this level,
A little more complex, but that basic idea. So I guess the question is, what does Crystal consider the block here? My guess is the block is passed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, there's an open issue on it. luckyframework/habitat#54 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very likely. When I looked at the code for the What I was aiming for here was something like the block for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, methods may be fine here. I'm updating habitat issues to track some of these things too. |
||
setting restart_error_allowance : Int32 = 20 | ||
setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do | ||
Cable::Logger.error(exception: exception) { message } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,9 @@ | ||
module Cable | ||
abstract class BackendCore | ||
def self.register(uri_scheme : String, backend : BackendCore.class = self) | ||
::Cable::RegistryBackend.register uri_scheme, backend | ||
end | ||
|
||
# connection management | ||
abstract def subscribe_connection | ||
abstract def publish_connection | ||
|
@@ -21,4 +25,32 @@ module Cable | |
abstract def ping_redis_subscribe | ||
abstract def ping_redis_publish | ||
end | ||
|
||
class RegistryBackend < BackendCore | ||
REGISTERED_BACKENDS = {} of String => BackendCore.class | ||
|
||
def self.register(uri_scheme : String, backend : BackendCore.class = self) | ||
REGISTERED_BACKENDS[uri_scheme] = backend | ||
end | ||
|
||
@backend : BackendCore | ||
|
||
def initialize | ||
@backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the magic behind the automatic runtime backend selection. TODO: I want to provide a better error than a plain-old raise Cable::UnknownBackend.new("Cannot find a backend for URI scheme #{scheme.inspect}. Did you require the adapter for it?") |
||
end | ||
|
||
delegate( | ||
subscribe_connection, | ||
publish_connection, | ||
close_subscribe_connection, | ||
close_publish_connection, | ||
open_subscribe_connection, | ||
publish_message, | ||
subscribe, | ||
unsubscribe, | ||
ping_redis_subscribe, | ||
ping_redis_publish, | ||
to: @backend | ||
) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since
Cable
adopted a bunch of the customizations I'd made to my fork, I didn't have to make a single change to myturbo/cable
integration! 💯