Skip to content

Extending the Framework

Michael Dietz edited this page Apr 24, 2023 · 6 revisions

In general, the framework consists of the following components:

  • Sensors - provide data to the pipeline (only output)
  • Transformers - modify data within the pipeline (input + output)
  • Consumers - take data from the pipeline and perform an external action (only input)

To create your own components you can follow the steps below.

Creating Sensors

The first step to integrate a new sensor is to create a new Java class that extends the abstract Sensor class.

import hcm.ssj.core.Sensor;

public class MySensor extends Sensor
{
    
}

Afterwards, implement the abstract connect(), disconnect() and getOptions() methods. The basic structure should look like this (you can also look at existing sensors such as Empatica, Bitalino, EstimoteBeacon, etc... for inspiration):

import hcm.ssj.core.Log;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.Sensor;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;


public class MySensor extends Sensor
{
    public class Options extends OptionList
    {
        // Specify your options here
        public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
        public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
        public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
        public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");

        private Options()
        {
            addOptions();
        }
    }
    public final Options options = new Options();

    public MySensor()
    {
        // Set custom name for your component (shows up in SSJ Creator)
        _name = "MySensor";
    }

    @Override
    public OptionList getOptions()
    {
        return options;
    }

    @Override
    protected boolean connect() throws SSJFatalException
    {
        boolean connected = false;

        // Example for accessing your options
        if (options.myBooleanOption.get())
        {
            // Example for using the integrated logger
            Log.i("String option value: " + options.myStringOption.get());
        }

        // Establish connection to your sensor here
        // Return true if connected successfully
        connected = true;

        return connected;
    }

    @Override
    protected void disconnect() throws SSJFatalException
    {
        // Disconnect from your sensor here and perform cleanup
    }
}

In addition to the Sensor component which handles the connection to the sensor, at least one SensorChannel component needs to be created that is responsible for providing the sensor data to the pipeline. It is also common that a sensor can have multiple channels (e.g., if you want to integrate a wearable device that provides the heart rate and accelerometer data via bluetooth, then you would create a sensor component that handles the bluetooth connection and two sensor channels - one that provides the heart rate and another one for the accelerometer data).

To create a sensor channel component simply create a new Java class and extend the abstract SensorChannel class.

import hcm.ssj.core.SensorChannel;

public class MySensorChannel extends SensorChannel
{

}

Afterwards, overwrite the abstract methods. A basic structure should look like this:

import hcm.ssj.core.Cons;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.SensorChannel;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;


public class MySensorChannel extends SensorChannel
{
    public class Options extends OptionList
    {
        // Specify your options here
        public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
        public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
        public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
        public final Option<Integer> sampleRate = new Option<>("sampleRate", 5, Integer.class, "sample rate");

        private Options()
        {
            addOptions();
        }
    }
    public final Options options = new Options();

    public MySensorChannel()
    {
        // Set custom name for your component (shows up in SSJ Creator)
        _name = "MySensorChannel";
    }

    @Override
    public OptionList getOptions()
    {
        return options;
    }

    @Override
    public void enter(Stream stream_out) throws SSJFatalException
    {
        // Called once at the pipeline start (before processing loop)
        // Used for setup

        // Example how to access sensor class, e.g. to read data from the connection established there
        MySensor mySensor = ((MySensor) _sensor);
    }

    @Override
    protected boolean process(Stream stream_out) throws SSJFatalException
    {
        // Example for providing data
        float[] out = stream_out.ptrF();

        // Provide values for each sample dimension
        out[0] = (float) Math.random();
        out[1] = (float) Math.random();
        out[2] = (float) Math.random();

        // Return true if data was provided successfully
        return true;
    }

    @Override
    public void flush(Stream stream_out) throws SSJFatalException
    {
        // Called once at the pipeline stop (after processing loop)
        // Used for cleanup
    }

    @Override
    protected double getSampleRate()
    {
        // Number of samples per second. process() method will be called this amount of times per second
        return 5; // or use options.sampleRate.get() to make it adjustable
    }

    @Override
    protected int getSampleDimension()
    {
        // Each sample can have multiple dimensions e.g., accelerometer data has 3 dimensions for x, y and z
        return 3;
    }

    @Override
    protected Cons.Type getSampleType()
    {
        // Datatype of the provided samples
        return Cons.Type.FLOAT;
    }

    @Override
    protected void describeOutput(Stream stream_out)
    {
        // Describe the dimensions of the sample
        stream_out.desc = new String[stream_out.dim];
        stream_out.desc[0] = "Acc X";
        stream_out.desc[1] = "Acc Y";
        stream_out.desc[2] = "Acc Z";
    }
}

You can now use your sensor and channel(s) like this:

// Get pipeline
Pipeline pipeline = Pipeline.getInstance();

// Create sensor
MySensor sensor = new MySensor();
sensor.options.myStringOption.set("custom string");

// Create channel
MySensorChannel channel = new MySensorChannel();
channel.options.myBoolOption.set(true);

// Create consumer
Logger logger = new Logger();

// Add components to pipeline
pipeline.addSensor(sensor, channel);
pipeline.addConsumer(logger, channel);

pipeline.start();

Creating Transformers

The main purpose of transformers is to convert data (e.g., calculate features). To integrate your own transformer, simply create a new Java class that extends the abstract Transformer class.

import hcm.ssj.core.Transformer;

public class MyTransformer extends Transformer
{
    
}

Afterwards, overwrite the abstract methods. A basic structure should look like this:

import hcm.ssj.core.Cons;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.Transformer;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;


public class MyTransformer extends Transformer
{
    public class Options extends OptionList
    {
        // Specify your options here
        public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
        public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
        public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
        public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");

        private Options()
        {
            addOptions();
        }
    }
    public final Options options = new Options();

    public MyTransformer()
    {
        // Set custom name for your component (shows up in SSJ Creator)
        _name = "MyTransformer";
    }

    @Override
    public OptionList getOptions()
    {
        return options;
    }

    @Override
    public void enter(Stream[] stream_in, Stream stream_out) throws SSJFatalException
    {
        // Called once at the pipeline start (before transform loop)
        // Used for setup
    }

    @Override
    public void transform(Stream[] stream_in, Stream stream_out) throws SSJFatalException
    {
        // Perform data processing

        // This example checks if each value of the received data is > 5
        // Check for correct input datatype
        if (stream_in[0].type == Cons.Type.FLOAT)
        {
            float[] in = stream_in[0].ptrF();
            boolean[] out = stream_out.ptrBool();

            // The sample data is received in a 1-dimensional array
            // So if two samples with three dimensions are received it would look like this:
            // [s1d1, s1d2, s1d3, s2d1, s2d2, s2d3]

            // Loop through number of samples
            for (int num = 0; num < stream_in[0].num; num++)
            {
                // Loop through dimensions of each sample
                for (int dim = 0; dim < stream_in[0].dim; dim++)
                {
                    // Fill output
                    out[num * stream_in[0].dim + dim] = in[num * stream_in[0].dim + dim] > 5;
                }
            }
        }
    }

    @Override
    public void flush(Stream[] stream_in, Stream stream_out) throws SSJFatalException
    {
        // Called once at the pipeline stop (after transform loop)
        // Used for cleanup
    }

    @Override
    public int getSampleDimension(Stream[] stream_in)
    {
        // Number of dimensions for the output sample
        // In this example the same number of dimensions as the first input stream is returned
        return stream_in[0].dim;
    }

    @Override
    public Cons.Type getSampleType(Stream[] stream_in)
    {
        // Datatype of the provided samples
        return Cons.Type.BOOL;
    }

    @Override
    public int getSampleNumber(int sampleNumber_in)
    {
        // Number of samples returned for each batch of sampleNumber_in
        // In this example the transformer provides one sample for each received sample
        return sampleNumber_in;
    }

    @Override
    protected void describeOutput(Stream[] stream_in, Stream stream_out)
    {
        // Describe the dimensions of the output samples
        stream_out.desc = new String[stream_out.dim];

        for (int i = 0; i < stream_out.dim; i++)
        {
            stream_out.desc[i] = "Dim " + i + " > 5";
        }
    }
}

You can now use your transformer like this:

// Get pipeline
Pipeline pipeline = Pipeline.getInstance();

[...]

// Create transformer
MyTransformer transformer = new MyTransformer();
transformer.options.myStringOption.set("custom value");

// Create consumer
Logger logger = new Logger();

// Add components to pipeline
[...]
pipeline.addTransformer(transformer, sensorChannel);
pipeline.addConsumer(logger, transformer);

pipeline.start();

Creating Consumers

Consumers mainly take the data from the pipeline and perform an external action (e.g., write to filesystem, send over network, etc...). To integrate your own consumer, simply create a new Java class that extends the abstract Consumer class.

import hcm.ssj.core.Consumer;

public class MyConsumer extends Consumer
{

}

Afterwards, overwrite the abstract methods. A basic structure should look like this:

import hcm.ssj.core.Consumer;
import hcm.ssj.core.Log;
import hcm.ssj.core.SSJFatalException;
import hcm.ssj.core.event.Event;
import hcm.ssj.core.option.Option;
import hcm.ssj.core.option.OptionList;
import hcm.ssj.core.stream.Stream;


public class MyConsumer extends Consumer
{
    public class Options extends OptionList
    {
        // Specify your options here
        public final Option<String> myStringOption = new Option<>("myStringOption", "default value", String.class, "option description");
        public final Option<Boolean> myBooleanOption = new Option<>("myBooleanOption", true, Boolean.class, "option description");
        public final Option<Float> myFloatOption = new Option<>("myFloatOption", 1.0f, Float.class, "option description");
        public final Option<Integer> myIntOption = new Option<>("myIntOption", 1, Integer.class, "option description");

        private Options()
        {
            addOptions();
        }
    }
    public final Options options = new Options();

    public MyConsumer()
    {
        // Set custom name for your component (shows up in SSJ Creator)
        _name = "MyConsumer";
    }

    @Override
    public OptionList getOptions()
    {
        return options;
    }

    @Override
    public void enter(Stream[] stream_in) throws SSJFatalException
    {
        // Called once at the pipeline start (before consume loop)
        // Used for setup
    }

    @Override
    protected void consume(Stream[] stream_in, Event trigger) throws SSJFatalException
    {
        // Perform consumer action

        // This example just logs the values to the console
        for (Stream stream : stream_in)
        {
            for (int num = 0; num < stream.num; num++)
            {
                String msg = "Sample data: ";
                
                for (int dim = 0; dim < stream.dim; dim++)
                {
                    switch (stream.type)
                    {
                        case BYTE:
                            msg += stream.ptrB()[num * stream.dim + dim] + " ";
                            break;
                        case CHAR:
                            msg += stream.ptrC()[num * stream.dim + dim] + " ";
                            break;
                        case SHORT:
                            msg += stream.ptrS()[num * stream.dim + dim] + " ";
                            break;
                        case INT:
                            msg += stream.ptrI()[num * stream.dim + dim] + " ";
                            break;
                        case LONG:
                            msg += stream.ptrL()[num * stream.dim + dim] + " ";
                            break;
                        case FLOAT:
                            msg += stream.ptrF()[num * stream.dim + dim] + " ";
                            break;
                        case DOUBLE:
                            msg += stream.ptrD()[num * stream.dim + dim] + " ";
                            break;
                        case BOOL:
                            msg += stream.ptrBool()[num * stream.dim + dim] + " ";
                            break;
                    }
                }
                
                Log.i(msg);
            }
        }
    }

    @Override
    public void flush(Stream[] stream_in) throws SSJFatalException
    {
        // Called once at the pipeline stop (after consume loop)
        // Used for cleanup
    }
}

You can now use your consumer like this:

// Get pipeline
Pipeline pipeline = Pipeline.getInstance();

[...]

// Create consumer
MyConsumer consumer = new MyConsumer();
consumer.options.myStringOption.set("custom value");

// Add components to pipeline
[...]
pipeline.addConsumer(consumer, transformer);

pipeline.start();
Clone this wiki locally