-
Notifications
You must be signed in to change notification settings - Fork 0
Tutorial 6: Custom kernel added to simple pipeline
Jin Heo edited this page Dec 16, 2023
·
1 revision
$ cd FlexPipe/examples
$ bash 6_add_kernel.sh
In this tutorial, along with the given kernels, we add a custom kernel and make it work as a part of the pipeline with the given kernels. The pipeline topology looks below.
|Source|[o1, B]---->[i1, B]|Intermediate|[o1, B]---->[i1, B]|Sink| // IntermediateKernel is a custom kernel.
| |[o2, B]---->[i1, NB]|Sink| // o2 is branched from o1 by a kernel user
where B: blocking, NB: non-blocking
class IntermediateKernel : public flexpipe::Kernel
{
public:
IntermediateKernel(string id) : flexpipe::Kernel(id)
{
setName("IntermediateKernel");
portManager.registerInPortTag("i1", flexpipe::PortDependency::BLOCKING,
flexpipe::deserializeDefault<MsgType>);
portManager.registerOutPortTag("o1", flexpipe::sendLocalBasicCopy<MsgType>,
flexpipe::serializeVector<MsgType>);
frequencyManager.setFrequency(1);
}
~IntermediateKernel() {}
raft::kstatus run() override
{
...
}
};
The custom kernel is IntermediateKernel, and its functionality is just to pass an input to an output port. The input and output ports' data type is the same as the given kernels.
int main()
{
raft::map pipeline;
/* 1. Create instances of the given kernels */
SourceKernel *sourceKernel = new SourceKernel("sourceKernel");
SinkKernelNB *sinkKernelNB = new SinkKernelNB("sinkKernelNB");
SinkKernelB *sinkKernelB = new SinkKernelB("sinkKernelB");
IntermediateKernel *intermediateKernel = new IntermediateKernel("intermediateKernel");
/* 2. Activate ports that are registered by the kernel developers */
sourceKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);
sourceKernel->portManager.duplicateOutPortAsLocal<MsgType>("o1", "o2", flexpipe::PortDependency::BLOCKING);
sinkKernelNB->portManager.activateInPortAsLocal<MsgType>("i1");
sinkKernelB->portManager.activateInPortAsLocal<MsgType>("i1");
intermediateKernel->portManager.activateInPortAsLocal<MsgType>("i1");
intermediateKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);
/* 3. Link activated ports of the kernels */
pipeline.link(sourceKernel, "o1", intermediateKernel, "i1", 1);
pipeline.link(sourceKernel, "o2", sinkKernelB, "i1", 1);
pipeline.link(intermediateKernel, "o1", sinkKernelNB, "i1", 1);
/* 4. Run the pipeline */
pipeline.exe();
return 0;
}
In this example, the given and custom kernels are created together. Similar to the previous tutorials, their ports are activated and linked.