This module allows to:
-
publish Bluesky documents to a NATS JetStream
-
consume a NATS JetStream
So far, the module needs to be installed from sources.
local installation
So far, no public module is available.
-
Get the code cloned locally
-
Run
uv build
-
In your project run
uv add -f <path_to_your_dist> bluesky-nats
In case you don’t have uv
, use the respective pip
commands, which I don’t know, sorry.
The module requires a NATS JetStream-enabled server. If not explicitly specified, the Publisher will automatically publish to the NATS JetStream "bluesky". Make sure that the respective JetStream is available on the server and that your "subject" pattern matches the configuration.
A simple Docker setup for local development and testing is provided in the docker directory, with a Readme.adoc for guidance.
To use the examples, make sure to install the module with the optional dependencies in the examples
group.
Run uv sync --extra examples
, then enable the virtual environment with . .venv/bin/activate
.
The publisher.py
example will run the most basic Bluesky RunEngine example.
By invoking python examples/publisher.py
, the output should look like this:
Transient Scan ID: 1 Time: 2024-09-19 10:20:58
Persistent Unique Scan ID: '0d7b16dd-acab-4fed-8d78-c1ab45114304'
New stream: 'primary'
+-----------+------------+------------+
| seq_num | time | det1 |
+-----------+------------+------------+
| 1 | 10:20:58.1 | 5.000 |
+-----------+------------+------------+
generator count ['0d7b16dd'] (scan num: 1)
For more advanced configuration of the NATS client, a configuration builder can be used.
The keywords match the NATS connect
signature, refer to the documentation for details.
builder example
import bluesky_nats.callbacks
from bluesky_nats.nats_client import NATSClientConfig
# this will be OK
assert NATSClientConfig() == NATSClientConfig.builder().build()
# proper build
config = (
NATSClientConfig.builder()
.from_file("examples/config/config.json") # load default configuration from file from JSON
.from_file("examples/config/cluster.yaml") # overwrite ALL fields from another file, but YAML
.from_file("examples/config/cluster.toml") # overwrite yet again, TOML this timed not that this makes any sense
.set("max_reconnect_attempts", value=20) # this sets a single field manually
.set_callback("error_cb", bluesky_nats.callbacks.error_callback) # register a callback from the module
.set_callback("user_jwt_cb", lambda: print("user_jwt_callback")) # register a callback from a lambda
.build() # put it all together
)
❗
|
Subsequent calls to .from_file will overwrite all fields.
|
ℹ️
|
The callbacks have to match the expexted type. error_cb , disconnected_cb , closed_cb , reconnected_cb and discovered_server_cb must be coroutines.
|