Ether provides an ultra-lightweight interface for scientists and engineers to orchestrate hardware and software resources for complex data acquisition and analysis goals. It's designed to be useful at every phase of a project; but in particular, the early phases, when resources and designs can change rapidly and are often discovered through a process of exploration and trial-and-error rather than immutable, a priori decisions.
Ether dynamically facilitates direct, local, and remote function calling between Python instances and processes as well as data management and logging. It is designed to minimize user overhead and coding skill requirements by relying on an interface based on decorators, yaml configurations, and a small number of direct calls to Ether functions.
- A computational biologist has code that operates lab equipment and runs analysis, and wants to integrate it into an automated data acquisition pipeline
- An AI engineer wants to combine multiple machine learning models into a real-time processing pipeline
- An operations engineer needs to add remote monitoring to existing industrial control systems
- A behavioral scientist wants to turn a Gymnasium environment into an interactive game with joystick control and real-time display
- Transform existing classes into distributed system components with via decorators or yaml configuration
- Automatic process lifecycle management and monitoring
- Configuration-based instance launching
- Built-in initialization and cleanup
- Type validation for messages using Pydantic
- System-wide logging of events and execution
- Automatic guarantees on data saving and annotation/metadata
Here's an example of how to build a data processing pipeline using Ether. This example shows how three independent classes can work together with minimal modification:
from ether import ether_pub, ether_sub
# Original class - just add decorators to connect it
class DataGenerator:
@ether_sub()
@ether_pub(topic="DataProcessor.process_data")
def generate_data(self, data: int = 42) -> dict:
print(f"Generating data: {data}")
return {"name": self.name, "data": data}
# Processing class - can run in a separate process
class DataProcessor:
def __init__(self, multiplier: int = 2):
self.multiplier = multiplier
@ether_sub()
@ether_pub(topic="DataCollector.collect_result")
def process_data(self, name: str, data: int = 0) -> dict:
processed_data = data * self.multiplier
return {
"result_name": name,
"value": processed_data
}
# Collection class - automatically receives results
class DataCollector:
@ether_sub()
def collect_result(self, result_name: str, value: int):
print(f"Collected result: {result_name} = {value}")
You can run these components either manually in a single process or distributed across multiple processes using a configuration file:
from ether import ether
# Initialize the messaging system
ether.init()
# Create instances as normal
generator = DataGenerator(name="generator1")
processor = DataProcessor(name="processor1", multiplier=2)
collector = DataCollector(name="collector1")
# Use normally - messages flow automatically
generator.generate_data(42)
Create a YAML configuration file (config.yaml
) to specify how components should be distributed:
instances:
generator1:
class_path: myapp.DataGenerator
processor1:
class_path: myapp.DataProcessor
kwargs:
multiplier: 2
processor2:
class_path: myapp.DataProcessor
kwargs:
multiplier: 4
collector1:
class_path: myapp.DataCollector
Then run your application:
from ether import ether
# Initialize with config - components launch automatically
ether.init(config="config.yaml")
# Send data into the pipeline
ether.pub({"data": 42}, topic="DataGenerator.generate_data")
Sometimes you may want to use Ether with classes that you can't or don't want to modify directly. Ether provides a way to apply pub/sub decorators through configuration instead of modifying source code.
The registry configuration allows you to specify which methods should be decorated with ether_pub
and ether_sub
. Here's an example:
registry:
examples.external_class_integration.DataGenerator:
methods:
generate_data:
ether_sub: {} # Empty dict for default settings
ether_pub:
topic: "data" # Publish to 'data' topic
examples.external_class_integration.DataProcessor:
methods:
process_data:
ether_sub:
topic: "data" # Subscribe to 'data' topic
ether_pub:
topic: "processed_data" # Publish to 'processed_data' topic
examples.external_class_integration.DataCollector:
methods:
collect_result:
ether_sub:
topic: "processed_data" # Subscribe to 'processed_data' topic
instances:
generator:
class_path: third_party_module.DataGenerator
kwargs:
process_id: 1
processor2x:
class_path: third_party_module.DataProcessor
kwargs:
multiplier: 2
collector:
class_path: third_party_module.DataCollector
-
The registry configuration specifies:
- Which classes to modify via the keys in
registry
- Which methods to decorate via the keys in
methods
- What decorators to apply (e.g.
ether_pub
,ether_sub
. etc.) - The
kwargs
to pass to each decorator
- Which classes to modify via the keys in
-
When Ether initializes:
from ether import ether # Load config from file ether.init(config="path/to/config.yaml") # Or use dict configuration config = { "registry": { "my.module.MyClass": { "methods": { "my_method": { "ether_pub": {"topic": "my_topic"} } } } } } ether.init(config=config)
-
The specified decorators are applied to the classes, and Ether functionality is added
-
The classes can then be used normally, either:
- Created manually:
instance = MyClass()
- Launched automatically via the
instances
configuration
- Created manually:
- No modification of source code required
- Works with third-party classes
- Configuration can be changed without code changes
- Same functionality as manual decoration
Here's how you might use Ether to automate a lab experiment:
class TemperatureSensor:
@ether_pub(topic="DataLogger.log_temperature")
def read_temperature(self) -> dict:
temp = self._hardware.get_temperature()
return {"temperature": temp, "timestamp": time.time()}
class DataLogger:
@ether_sub()
@ether_pub(topic="ExperimentController.check_temperature")
def log_temperature(self, temperature: float, timestamp: float) -> dict:
self._db.save_temperature(temperature, timestamp)
return {"temperature": temperature, "timestamp": timestamp}
class ExperimentController:
@ether_sub()
def check_temperature(self, temperature: float, timestamp: float):
if temperature > self.max_temp:
self._safety_shutdown()
- Ether provides decorators that make it easy to publish or subsribe to messaging topics.
@ether_pub
and@ether_sub
provide general decorators that define message types automatically from method inputs and return types.- Ether also provides shortcut subscribe decorators for common steps in data acqusition and analysis systems (e.g. @save, @cleanup, @startup, @log)
- Ether handles process creation and monitoring
- Messages automatically flow between components based on topics
- Messages are validated using Pydantic models