Skip to content

Latest commit

 

History

History

nifi

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 

Data Ingestion with Apache Nifi

The goal is to design simple flows with basic tasks of data ingestion to understand the concepts/techniques of big data ingestions and how they are implemented in Apache Nifi. It is important to examine the model and configuration of ingestion tasks to understand common concept. A second goal is to see if you can use Apache Nifi for your work in big data and data science.

Setup

Apache Nifi

You can download Apache Nifi and install it into your machine. Check the document to see if a minimum configuration should be made for your installation.

Note: the following information is with nifi-1.24.0 and nifi-2.0.0-M1

Create a test user:

$bin/nifi.sh set-single-user-credentials student0 cse4640student0

Start Nifi server

$bin/nifi.sh run

Then access Nifi from the Web browser:

https://127.0.0.1:8443/nifi

Note about the username/password by reading Nifi guide. Replace "127.0.0.1" with your nifi host IP/name.

AMQP Broker

When ingesting data through message brokers, you can use your own RabbitMQ in your local machine or a free instance created from CloudAMQP.com.

Simple program for receiving data from message brokers.

We have a simple python code that can be used for receiving messages sent to AMQP (using fanout), e.g.,

~$ python3 cs-e4640/tutorials/amqp/test_amqp_fanout_consumer.py --exchange amq.fanout

Google Storage

Google Storage is used as data sink. You can use your own google storage bucket or a common bucket available. You will need a service account credential for configuring Nifi and Google Storage.

if you use your own storage bucket then create a service account which can be used for Nifi

Exercises

Define a flow for ingesting data into Google Storage

This example illustrates a scenario where you setup Nifi as a service which continuously check file-based data sources (e.g., directories in file systems, sftp, http, ..) and ingest the new files into a cloud storage.

Include:

  • ListFile: is used to list files in a directory. The property Input Directory is where input files will be scanned for ingestion

  • FetchFile: used to fetch files from ListFile

  • PutGCSObject: this task is used to store files into Google Storage. To use it, you need to define GCPCredentialsControllerService. When you define GCPCredentialsControllerService you can use the Google credential accessing to a Google Storage. The following configuration is used with the Google Storage setup for you:

  • bucket = bdplabnifi (or your own bucket)

  • In GCPCredentialsControllerService: copy the below service account

  • Then enable GCPCredentialsControllerService

Gcloud service account for the practice will be shared. You can also use your Google Storage and set service account with your Google Storage.

Testing:

  • Copy some files into the directory specified in Input Directory prototype of ListFile and see if the new copied files will be ingested into the Google Storage.

Be careful with the files you put into the directory to avoid make wrong files to the Google Storage

If you use a shared bucket with a service account, you can use gcloud/gsutil or some programs to list contents of the bucket. For example, first download the code for listing objects into storage_list_files.py and store the google service json to a file: e.g., google.json

$export GOOGLE_APPLICATION_CREDENTIALS=google.json
$python3 storage_list_files.py bdplabnifi

see sample code in https://cloud.google.com/storage/docs/reference/libraries#client-libraries-install-python

Define a flow for ingesting data via AMQP

We should test it only with CSV or JSON files of small data. We use the following components:

  • ListFile: is used to list files in a directory. The property Input Directory is where input files will be scanned for ingestion

  • FetchFile: used to fetch files from ListFile

  • PublishAMQP: used to read content of a file and send the whole content to RabbitMQ. For this component, the configuration is based on an existing RabbitMQ. If you use the pre-defined RabbitMQ, then use the following configuration:

     exchange name: amq.fanout
     routing key: mybdpnifi
     hostname: hawk.rmq.cloudamqp.com
     port: 5672
     virtual host: frlocnsr
     username: <see below>
     password: <see below>
    
    

AMQP username/password for practice will be shared.

You can also deploy a fast docker RabbitMQ for testing:

$docker run -it -p 5672:5672 rabbitmq:3 which will give a local rabbitmq with default username/password as "guest/guest"

You may have to create a queue and set the binding from routing key to queue. Check this for help.

Using the following program to check if the data has been sent to the message broker:

$export AMQPURL=**Get the link during the practice**
$python3 cs-e4640/tutorials/amqp/test_amqp_fanout_consumer.py --exchange amq.fanout

Note that the AMQP configuration for the python program must match the AMQP broker set in Nifi. In case you use your fast RabbitMQ docker then, $export AMQPURL="amqp://guest:guest@localhost"

Capture changes in legacy databases and do ingestion to a big data platform

This exercise illustrates how to take only changes from databases and ingest the changes into big data storage/databases.

Assume that you have a relational database, say MySQL in the following example. You can setup it to have the following configuration:

  • Enable binary logging feature in MySQL (see https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html and https://snapshooter.com/learn/mysql/enable-and-use-binary-log-mysql). For example,

     server-id =1
     log_bin                = /var/log/mysql/mysql-bin.log
     binlog_format = row
    

    Make sure you setup it right, otherwise binary logging feature might not work. In the practice, we can give you the access to a remote MySQL server, make sure you have "mysql" installed in your machine.

  • Define a database user name for test: such as cse4640 with password ="bigdataplatforms"

  • Create a database under the selected username. E.g., create a database bdpdb

     mysql> create database bdpdb;
     mysql> use bdpdb;
    
  • Then create a table like:

     CREATE TABLE myTable (
     	id INTEGER PRIMARY KEY,
     	country text,
     	duration_seconds INTEGER,
     	english_cname text ,
     	latitude float,
     	longitude float,
     	species text
     );
    

Note the information about username, table, MySQL hostname, etc.

Now we will capture changes from a SQL database (assume this is a legacy database). First step in to define relevant connectors that Nifi uses to communicate with SQL instances:

  1. Use a CaptureChangeMySQL processor with the following configuration based on the username, MySQL host, database, etc.

    MySQL Hosts:
    Username: "cse4640"
    Password: "bigdataplatforms"
    Database/Schema: bdpdb
    Table Name Pattern: myTable*
    
  2. PublishAMQP processor: similar to the previous exercise, we just publish the whole change captured to an AMQP message broker.

  3. Start an AMQP consumer client to receive the change

    $export AMQPURL=**Get the link during the practice**
    $python3 cs-e4640/tutorials/amqp/test_amqp_fanout_consumer.py --exchange amq.fanout
  4. Start to insert the data by inserting some data into the selected table. For example,

    INSERT INTO myTable (country, duration_seconds, english_cname, id,  species, latitude, longitude) values ('United States',42,'Yellow-breasted Chat',408123,'virens',33.6651,-117.8434);
    

    For simple tests, just change the value of the INSERT to add new data into the database to see.

You might get a problem reported elsewhere: https://issues.apache.org/jira/browse/NIFI-9323. In this case, maybe you should disable the flow, clear states and then restart Nifi.

Conclusions

Now you have an overview on the vast capabilities of Apache Nifi. We suggest you try to define simple data-flow in order to make some practice.

After successful with the above steps, now you can try different situations:

  • Now, as in the first example, we can define ListFile, FetchFile and PutCSObject to automatically store all the updates to a legacy database in a Google storage in csv format.
  • Add other processors to handle the change nicely
  • Using Apache Kafka as messaging system for ingestion
  • Ingest the change into the right sink (database, storage)
  • Do it with a large scale setting

Furthermore, you can examine other tools to understand the underlying models and techniques for ingestion, like:

Challenge

Write a flow that:

  1. Collect malware sample from git or vendors ftp servers
    • Less funny, change malware with images
  2. Process the sampes:
    • Get MD5 hash
    • Get binary name
    • Get binary size
  3. Create a csv entry containing hash,name,size
  4. Merge all entries in a single file
  5. Store the file to your own Google storage

Authors

  • Eljon Harlicaj
  • Linh Truong