-
Notifications
You must be signed in to change notification settings - Fork 12
Extending the Framework
In general, the framework consists of the following components:
- Sensors - provide data to the pipeline (only output)
- Transformers - modify data within the pipeline (input + output)
- Consumers - take data from the pipeline and perform an external action (only input)
To create your own components you can follow the steps below.
The first step to integrate a new sensor is to create a new Java class that extends the abstract Sensor
class.
import hcm.ssj.core.Sensor;
public class MySensor extends Sensor
{
}
Afterwards, implement the abstract connect()
, disconnect()
and getOptions()
methods. The basic structure should look like this (you can also look at existing sensors such as Empatica
, Bitalino
, EstimoteBeacon
, etc... for inspiration):
import hcm.ssj.core.Log;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.Sensor;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
public class MySensor extends Sensor
{
public class Options extends OptionList
{
// Specify your options here
public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");
private Options()
{
addOptions();
}
}
public final Options options = new Options();
public MySensor()
{
// Set custom name for your component (shows up in SSJ Creator)
_name = "MySensor";
}
@Override
public OptionList getOptions()
{
return options;
}
@Override
protected boolean connect() throws SSJFatalException
{
boolean connected = false;
// Example for accessing your options
if (options.myBooleanOption.get())
{
// Example for using the integrated logger
Log.i("String option value: " + options.myStringOption.get());
}
// Establish connection to your sensor here
// Return true if connected successfully
connected = true;
return connected;
}
@Override
protected void disconnect() throws SSJFatalException
{
// Disconnect from your sensor here and perform cleanup
}
}
In addition to the Sensor
component which handles the connection to the sensor, at least one SensorChannel
component needs to be created that is responsible for providing the sensor data to the pipeline. It is also common that a sensor can have multiple channels (e.g., if you want to integrate a wearable device that provides the heart rate and accelerometer data via bluetooth, then you would create a sensor component that handles the bluetooth connection and two sensor channels - one that provides the heart rate and another one for the accelerometer data).
To create a sensor channel component simply create a new Java class and extend the abstract SensorChannel
class.
import hcm.ssj.core.SensorChannel;
public class MySensorChannel extends SensorChannel
{
}
Afterwards, overwrite the abstract methods. A basic structure should look like this:
import hcm.ssj.core.Cons;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.SensorChannel;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;
public class MySensorChannel extends SensorChannel
{
public class Options extends OptionList
{
// Specify your options here
public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
public final Option<Integer> sampleRate = new Option<>("sampleRate", 5, Integer.class, "sample rate");
private Options()
{
addOptions();
}
}
public final Options options = new Options();
public MySensorChannel()
{
// Set custom name for your component (shows up in SSJ Creator)
_name = "MySensorChannel";
}
@Override
public OptionList getOptions()
{
return options;
}
@Override
public void enter(Stream stream_out) throws SSJFatalException
{
// Called once at the pipeline start (before processing loop)
// Used for setup
// Example how to access sensor class, e.g. to read data from the connection established there
MySensor mySensor = ((MySensor) _sensor);
}
@Override
protected boolean process(Stream stream_out) throws SSJFatalException
{
// Example for providing data
float[] out = stream_out.ptrF();
// Provide values for each sample dimension
out[0] = (float) Math.random();
out[1] = (float) Math.random();
out[2] = (float) Math.random();
// Return true if data was provided successfully
return true;
}
@Override
public void flush(Stream stream_out) throws SSJFatalException
{
// Called once at the pipeline stop (after processing loop)
// Used for cleanup
}
@Override
protected double getSampleRate()
{
// Number of samples per second. process() method will be called this amount of times per second
return 5; // or use options.sampleRate.get() to make it adjustable
}
@Override
protected int getSampleDimension()
{
// Each sample can have multiple dimensions e.g., accelerometer data has 3 dimensions for x, y and z
return 3;
}
@Override
protected Cons.Type getSampleType()
{
// Datatype of the provided samples
return Cons.Type.FLOAT;
}
@Override
protected void describeOutput(Stream stream_out)
{
// Describe the dimensions of the sample
stream_out.desc = new String[stream_out.dim];
stream_out.desc[0] = "Acc X";
stream_out.desc[1] = "Acc Y";
stream_out.desc[2] = "Acc Z";
}
}
You can now use your sensor and channel(s) like this:
// Get pipeline
Pipeline pipeline = Pipeline.getInstance();
// Create sensor
MySensor sensor = new MySensor();
sensor.options.myStringOption.set("custom string");
// Create channel
MySensorChannel channel = new MySensorChannel();
channel.options.myBoolOption.set(true);
// Create consumer
Logger logger = new Logger();
// Add components to pipeline
pipeline.addSensor(sensor, channel);
pipeline.addConsumer(logger, channel);
pipeline.start();
The main purpose of transformers is to convert data (e.g., calculate features). To integrate your own transformer, simply create a new Java class that extends the abstract Transformer
class.
import hcm.ssj.core.Transformer;
public class MyTransformer extends Transformer
{
}
Afterwards, overwrite the abstract methods. A basic structure should look like this:
import hcm.ssj.core.Cons;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.Transformer;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;
public class MyTransformer extends Transformer
{
public class Options extends OptionList
{
// Specify your options here
public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");
private Options()
{
addOptions();
}
}
public final Options options = new Options();
public MyTransformer()
{
// Set custom name for your component (shows up in SSJ Creator)
_name = "MyTransformer";
}
@Override
public OptionList getOptions()
{
return options;
}
@Override
public void enter(Stream[] stream_in, Stream stream_out) throws SSJFatalException
{
// Called once at the pipeline start (before transform loop)
// Used for setup
}
@Override
public void transform(Stream[] stream_in, Stream stream_out) throws SSJFatalException
{
// Perform data processing
// This example checks if each value of the received data is > 5
// Check for correct input datatype
if (stream_in[0].type == Cons.Type.FLOAT)
{
float[] in = stream_in[0].ptrF();
boolean[] out = stream_out.ptrBool();
// The sample data is received in a 1-dimensional array
// So if two samples with three dimensions are received it would look like this:
// [s1d1, s1d2, s1d3, s2d1, s2d2, s2d3]
// Loop through number of samples
for (int num = 0; num < stream_in[0].num; num++)
{
// Loop through dimensions of each sample
for (int dim = 0; dim < stream_in[0].dim; dim++)
{
// Fill output
out[num * stream_in[0].dim + dim] = in[num * stream_in[0].dim + dim] > 5;
}
}
}
}
@Override
public void flush(Stream[] stream_in, Stream stream_out) throws SSJFatalException
{
// Called once at the pipeline stop (after transform loop)
// Used for cleanup
}
@Override
public int getSampleDimension(Stream[] stream_in)
{
// Number of dimensions for the output sample
// In this example the same number of dimensions as the first input stream is returned
return stream_in[0].dim;
}
@Override
public Cons.Type getSampleType(Stream[] stream_in)
{
// Datatype of the provided samples
return Cons.Type.BOOL;
}
@Override
public int getSampleNumber(int sampleNumber_in)
{
// Number of samples returned for each batch of sampleNumber_in
// In this example the transformer provides one sample for each received sample
return sampleNumber_in;
}
@Override
protected void describeOutput(Stream[] stream_in, Stream stream_out)
{
// Describe the dimensions of the output samples
stream_out.desc = new String[stream_out.dim];
for (int i = 0; i < stream_out.dim; i++)
{
stream_out.desc[i] = "Dim " + i + " > 5";
}
}
}
You can now use your transformer like this:
// Get pipeline
Pipeline pipeline = Pipeline.getInstance();
[...]
// Create transformer
MyTransformer transformer = new MyTransformer();
transformer.options.myStringOption.set("custom value");
// Create consumer
Logger logger = new Logger();
// Add components to pipeline
[...]
pipeline.addTransformer(transformer, sensorChannel);
pipeline.addConsumer(logger, transformer);
pipeline.start();
Consumers mainly take the data from the pipeline and perform an external action (e.g., write to filesystem, send over network, etc...). To integrate your own consumer, simply create a new Java class that extends the abstract Consumer
class.
import hcm.ssj.core.Consumer;
public class MyConsumer extends Consumer
{
}
Afterwards, overwrite the abstract methods. A basic structure should look like this:
import hcm.ssj.core.Consumer;
import hcm.ssj.core.Log;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.event.Event;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;
public class MyConsumer extends Consumer
{
public class Options extends OptionList
{
// Specify your options here
public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");
private Options()
{
addOptions();
}
}
public final Options options = new Options();
public MyConsumer()
{
// Set custom name for your component (shows up in SSJ Creator)
_name = "MyConsumer";
}
@Override
public OptionList getOptions()
{
return options;
}
@Override
public void enter(Stream[] stream_in) throws SSJFatalException
{
// Called once at the pipeline start (before consume loop)
// Used for setup
}
@Override
protected void consume(Stream[] stream_in, Event trigger) throws SSJFatalException
{
// Perform consumer action
// This example just logs the values to the console
for (Stream stream : stream_in)
{
for (int num = 0; num < stream.num; num++)
{
String msg = "Sample data: ";
for (int dim = 0; dim < stream.dim; dim++)
{
switch (stream.type)
{
case BYTE:
msg += stream.ptrB()[num * stream.dim + dim] + " ";
break;
case CHAR:
msg += stream.ptrC()[num * stream.dim + dim] + " ";
break;
case SHORT:
msg += stream.ptrS()[num * stream.dim + dim] + " ";
break;
case INT:
msg += stream.ptrI()[num * stream.dim + dim] + " ";
break;
case LONG:
msg += stream.ptrL()[num * stream.dim + dim] + " ";
break;
case FLOAT:
msg += stream.ptrF()[num * stream.dim + dim] + " ";
break;
case DOUBLE:
msg += stream.ptrD()[num * stream.dim + dim] + " ";
break;
case BOOL:
msg += stream.ptrBool()[num * stream.dim + dim] + " ";
break;
}
}
Log.i(msg);
}
}
}
@Override
public void flush(Stream[] stream_in) throws SSJFatalException
{
// Called once at the pipeline stop (after consume loop)
// Used for cleanup
}
}
You can now use your consumer like this:
// Get pipeline
Pipeline pipeline = Pipeline.getInstance();
[...]
// Create consumer
MyConsumer consumer = new MyConsumer();
consumer.options.myStringOption.set("custom value");
// Add components to pipeline
[...]
pipeline.addConsumer(consumer, transformer);
pipeline.start();