Skip to content

Latest commit

 

History

History
235 lines (185 loc) · 10.5 KB

streaming.adoc

File metadata and controls

235 lines (185 loc) · 10.5 KB

Simple Apache NiFi Ingest Flow

In this workshop you’ll implement a data pipeline, using NiFi to ingest data and write it to Kudu tables.

Labs summary

  • Lab 1 - On the Apache NiFi, obtain data

  • Lab 2 - On the NiFi cluster, prepare the data, process each record save results to Kudu.

Lab 1 - Apache NiFi: obtain data

In this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (mosquitto). The MQTT broker plays the role of a gateway that is connected to many and different type of sensors through the "mqtt" protocol. Your cluster comes with an embedded MQTT broker that the simulation script publishes to. For convenience, we will use NiFi to run the script rather than Shell commands.

  1. Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas.

    simulate1
  2. Right-click the processor, select Configure (or, alternatively, just double-click the processor). On the PROPERTIES tab, set the properties shown below to run our Python simulate script.

    Command:           python3
    Command Arguments: /opt/demo/simulate.py
    simulate2
  3. In the SCHEDULING tab, set to Run Schedule: 1 sec

    Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc…​

    runSimulator1or30
  4. In the SETTINGS tab, check the "success" relationship in the AUTOMATICALLY TERMINATED RELATIONSHIPS section. Click Apply.

    nifiTerminateRelationships
  5. You can then right-click to Start this simulator runner.

    nifiDemoStart
  6. Right-click and select Stop after a few seconds and look at the provenance. You’ll see that it has run a number of times and produced results.

    NiFiViewDataProvenance
    NiFiDataProvenance

Lab 2 - Configuring Edge Flow Management

Cloudera Edge Flow Management gives you a visual overview of all MiNiFi agents in your environment, and allows you to update the flow configuration for each one, with versioning control thanks to the NiFi Registry integration. In this lab, you will create the MiNiFi flow and publish it for the MiNiFi agent to pick it up.

  1. Open the EFM Web UI at http://<public_dns>:10080/efm/ui/. Ensure you see your minifi agent’s heartbeat messages in the Events Monitor. Click on the info icon on a heartbeat record to see the details of the heartbeat.

    cem heartbeats
  2. You can then select the Flow Designer tab (flow designer icon). To build a dataflow, select the desired class (iot-1) from the table and click OPEN. Alternatively, you can double-click on the desired class.

  3. Add a ConsumeMQTT Processor to the canvas, by dragging the processor icon to the canvas, selecting the ConsumeMQTT processor type and clicking on the Add button. Once the processor is on the canvas, double-click it and configure it with below settings:

    Broker URI:     tcp://edge2ai-1.dim.local:1883
    Client ID:      minifi-iot
    Topic Filter:   iot/#
    Max Queue Size: 60
    add consumer mqtt

    And ensure you scroll down on the properties page to set the Topic Filter and Max Queue Size:

    add consumer mqtt 2
  4. Add a Remote Process Group (RPG) to the canvas and configure it as follows:

    URL:                http://edge2ai-1.dim.local:8080/nifi
    Transport Protocol: HTTP
    add rpg
  5. At this point you need to connect the ConsumerMQTT processor to the RPG. For this, you first need to add an Input Port to the remote NiFi server. Open the NiFi Web UI at http://<public_dns>:8080/nifi/ and drag the Input Port to the canvas. Call it something like "from Gateway".

    add input port
  6. To terminate the NiFI Input Port let’s, for now, add a Funnel to the canvas…​

    add funnel
  7. …​ and setup a connection from the Input Port to it. To setup a connection, hover the mouse over the Input Port until an arrow symbol is shown in the center. Click on the arrow, drag it and drop it on the Funnel to connect the two elements.

    connecting
  8. Right-click on the Input Port and start it. Alternatively, click on the Input Port to select it and then press the start ("play") button on the Operate panel:

    operate panel
  9. You will need the ID of the Input Port to complete the connection of the ConsumeMQTT processor to the RPG (NiFi). Double-click on the Input Port and copy its ID.

    input port id
  10. Back to the Flow Designer, connect the ConsumeMQTT processor to the RPG. The connection requires an ID and you can paste here the ID you copied from the Input Port. Make sure that there are NO SPACES!

    connect to rpg

    Double-click the connection to check the configuration:

    efmSetCloudConfiguration
  11. The Flow is now complete, but before publishing it, create the Bucket in the NiFi Registry so that all versions of your flows are stored for review and audit. Open the NiFi Registry at http://<public_dns>:18080/nifi-registry, click on the wrench/spanner icon (spanner icon) on the top-right corner on and create a bucket called IoT (ATTENTION: the bucket name is CASE-SENSITIVE).

    create bucket
  12. You can now publish the flow for the MiNiFi agent to automatically pick up. Click Publish, add a descriptive comment for your changes and click Apply.

    publish flow
    cem first version
  13. Go back to the NiFi Registry Web UI and click on the NiFi Registry name, next to the Cloudera logo. If the flow publishing was successful, you should see the flow’s version details in the NiFi Registry.

    flow in nifi registry
  14. At this point, you can test the edge flow up until NiFi. Start the NiFi simulator (ExecuteProcess processor) again and confirm you can see the messages queued in NiFi.

    queued events
  15. You can stop the simulator (Stop the NiFi processor) once you confirm that the flow is working correctly.

Lab 2 - Configuring the NiFi flow

In this lab, you will create a NiFi flow to receive the data from all gateways and push it to Kafka.

Creating a Process Group

Before we start building our flow, let’s create a Process Group to help organizing the flows in the NiFi canvas and also to enable flow version control.

  1. Open the NiFi Web UI, create a new Process Group and name it something like Process Sensor Data.

    create pgroup
  2. We want to be able to version control the flows we will add to the Process Group. In order to do that, we first need to connect NiFi to the NiFi Registry. On the NiFi global menu, click on "Controller Settings", navigate to the "Registry Clients" tab and add a Registry client with the following URL:

    Name: NiFi Registry
    URL:  http://edge2ai-1.dim.local:18080
    global controller settings
    add registry client
  3. On the NiFi Registry Web UI, add another bucket for storing the Sensor flow we’re about to build'. Call it SensorFlows:

    sensor flows bucket
  4. Back on the NiFi Web UI, to enable version control for the Process Group, right-click on it and select Version > Start version control and enter the details below. Once you complete, a version control tick will appear on the Process Group, indicating that version control is now enabled for it.

    Registry:  NiFi Registry
    Bucket:    SensorFlows
    Flow Name: SensorProcessGroup
  5. Let’s also enable processors in this Process Group to use schemas stored in Schema Registry. Right-click on the Process Group, select Configure and navigate to the Controller Services tab. Click the + icon and add a HortonworksSchemaRegistry service. After the service is added, click on the service’s cog icon (cog icon), go to the Properties tab and configure it with the following Schema Registry URL and click Apply.

    URL: http://edge2ai-1.dim.local:7788/api/v1
    added hwx sr service
  6. Click on the lightning bolt icon (enable icon) to enable the HortonworksSchemaRegistry Controller Service.

  7. Still on the Controller Services screen, let’s add two additional services to handle the reading and writing of JSON records. Click on the plus button button and add the following two services:

    • JsonTreeReader, with the following properties:

      Schema Access Strategy: Use 'Schema Name' Property
      Schema Registry:        HortonworksSchemaRegistry
      Schema Name:            ${schema.name} -> already set by default!
    • JsonRecordSetWriter, with the following properties:

      Schema Write Strategy:  HWX Schema Reference Attributes
      Schema Access Strategy: Use 'Schema Name' Property
      Schema Registry:        HortonworksSchemaRegistry
  8. Enable the JsonTreeReader and the JsonRecordSetWriter Controller Services you just created, by clicking on their respective lightning bolt icons (enable icon).

    controller services

Creating the flow

  1. Double-click on the newly created process group to expand it.

  2. Inside the process group, add a new Input Port and name it "Sensor Data"

  3. We need to tell NiFi which schema should be used to read and write the Sensor data. For this we’ll use an UpdateAttribute processor to add an attribute to the FlowFile indicating the schema name.

    Add an UpdateAttribute processor by dragging the processor icon to the canvas:

    add updateattribute
  4. Double-click the UpdateAttribute processor and configure it as follows:

    1. In the SETTINGS tab:

      Name: Set Schema Name
    2. In the PROPERTIES tab:

      • Click on the plus button button and add the following property:

        Property Name:  schema.name
        Property Value: SensorReading