Skip to content

Tutorial 1: Simple pipeline

Jin Heo edited this page Dec 16, 2023 · 2 revisions

How to run

$ cd FlexPipe/examples
$ bash 1_local_pipeline.sh

In this tutorial, by using the given SourceKernel and SinkKernelB (with blocking input), we create a pipeline. The pipeline topology looks below.

|Source|[o1, B]------>[i1, B]|Sink|

where B: blocking

Register-Activation Interfaces for Ports

FlexPipe enables flexible pipelining by its port interface, and the register-activation interface is the gist. The kernel developer registers the input/output ports by the kernel's functionality. The registered ports are activated differently by the kernel user who creates a pipeline by using the given kernels.

Port Registration by Kernel Developer

/* examples/sample_kernels.h */
class SourceKernel : public flexpipe::Kernel
{
  int msgIndex = 0;

public:
  SourceKernel(string id) : flexpipe::Kernel(id)
  {
    setName("SourceKernel");
    portManager.registerOutPortTag("o1", flexpipe::sendLocalBasicCopy<MsgType>,
                                   flexpipe::serializeVector<MsgType>);
    frequencyManager.setFrequency(1);
  }

  ~SourceKernel() {}

  raft::kstatus run() override
  {
    ...
  }
};

In the source kernel, the single output port of "o1" is registered. When the output port is connected to the downstream kernels, it is determined by the pipeline context whether the upstream kernel's execution is synchronized to the downstream kernel or not. So, the output port's blocking/non-blocking is set by the kernel user when creating the pipeline.


/* examples/sample_kernels.h */
class SinkKernelB : public flexpipe::Kernel
{
public:
  SinkKernelB(string id) : flexpipe::Kernel(id)
  {
    setName("SinkKernelB");

    // Register the input port -- For input dependency, the kernel developer sets blocking/nonblocking.
    portManager.registerInPortTag("i1", flexpipe::PortDependency::BLOCKING,
                                  flexpipe::deserializeDefault<MsgType>);

    // Set the desired execution frequency, but the kernel user can change this by the pipeline context.
    frequencyManager.setFrequency(1);
  }

  ~SinkKernelB() {}

  raft::kstatus run() override
  {
    ...
  }
};

In the sink kernel, the single input port of "i1" is registered as blocking. This means the kernel is executed by this blocking input. As the kernel writer knows its functionality well, the input dependencies are set by the writer via the port-registration interface.

Port Activation by Kernel User

The kernel user uses the given kernels to create a pipeline. The below codes show how the pipeline is created.

/* examples/1_local_pipeline.cc */
int main()
{
  raft::map pipeline;

  /* 1. Create instances of the given kernels */
  SourceKernel *sourceKernel = new SourceKernel("sourceKernel");
  SinkKernelB *sinkKernelB = new SinkKernelB("sinkKernel");

  /* 2. Activate ports that are registered by the kernel developers */
  // For output, the port dependency is set by a kernel user who wants to configure the whole pipeline.
  sourceKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);
  // For input, the port dependency is set by a kernel developer who knows the dependency of the kernel.
  sinkKernelB->portManager.activateInPortAsLocal<MsgType>("i1");

  /* 3. Link kernels that are connected as flexpipe */
  pipeline.link(sourceKernel, "o1", sinkKernelB, "i1", 1);

  /* 4. Run the pipeline */
  pipeline.exe();

  return 0;
}

After creating the kernels, their registered ports are activated. For the output port, its blocking/non-blocking is set by the kernel user while the input port's blocking/non-blocking is set by the kernel developer as described above.

The activated ports are linked, and the pipeline is executed. This local pipeline is under the FlexPipe runtime, and the port communication is zero-copy. So, even with the large data size (1920x1080x3 bytes), its end-to-end pipeline latency is very low.