Skip to content

Tutorial 6: Custom kernel added to simple pipeline

Jin Heo edited this page Dec 16, 2023 · 1 revision

How to run

$ cd FlexPipe/examples
$ bash 6_add_kernel.sh

In this tutorial, along with the given kernels, we add a custom kernel and make it work as a part of the pipeline with the given kernels. The pipeline topology looks below.

|Source|[o1, B]---->[i1, B]|Intermediate|[o1, B]---->[i1, B]|Sink|  // IntermediateKernel is a custom kernel.
|      |[o2, B]---->[i1, NB]|Sink|  // o2 is branched from o1 by a kernel user

where B: blocking, NB: non-blocking

Custom Kernel

class IntermediateKernel : public flexpipe::Kernel
{
public:
  IntermediateKernel(string id) : flexpipe::Kernel(id)
  {
    setName("IntermediateKernel");
    portManager.registerInPortTag("i1", flexpipe::PortDependency::BLOCKING,
                                  flexpipe::deserializeDefault<MsgType>);
    portManager.registerOutPortTag("o1", flexpipe::sendLocalBasicCopy<MsgType>,
                                   flexpipe::serializeVector<MsgType>);
    frequencyManager.setFrequency(1);
  }

  ~IntermediateKernel() {}

  raft::kstatus run() override
  {
    ...
  }
};

The custom kernel is IntermediateKernel, and its functionality is just to pass an input to an output port. The input and output ports' data type is the same as the given kernels.

Port Activation by Kernel User

int main()
{
  raft::map pipeline;

  /* 1. Create instances of the given kernels */
  SourceKernel *sourceKernel = new SourceKernel("sourceKernel");
  SinkKernelNB *sinkKernelNB = new SinkKernelNB("sinkKernelNB");
  SinkKernelB *sinkKernelB = new SinkKernelB("sinkKernelB");

  IntermediateKernel *intermediateKernel = new IntermediateKernel("intermediateKernel");

  /* 2. Activate ports that are registered by the kernel developers */
  sourceKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);
  sourceKernel->portManager.duplicateOutPortAsLocal<MsgType>("o1", "o2", flexpipe::PortDependency::BLOCKING);

  sinkKernelNB->portManager.activateInPortAsLocal<MsgType>("i1");
  sinkKernelB->portManager.activateInPortAsLocal<MsgType>("i1");

  intermediateKernel->portManager.activateInPortAsLocal<MsgType>("i1");
  intermediateKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);

  /* 3. Link activated ports of the kernels */
  pipeline.link(sourceKernel, "o1", intermediateKernel, "i1", 1);
  pipeline.link(sourceKernel, "o2", sinkKernelB, "i1", 1);

  pipeline.link(intermediateKernel, "o1", sinkKernelNB, "i1", 1);

  /* 4. Run the pipeline */
  pipeline.exe();

  return 0;
}

In this example, the given and custom kernels are created together. Similar to the previous tutorials, their ports are activated and linked.