End to End data pipeline for Monitoring Real-Time BlockChain- Crypto Currency Data Using Apache APIs: Nifi, Kafka, Spark, Cassandra, and Real-Time Dashboard using Tableau.

End to End data pipeline for Monitoring Real-Time BlockChain- Crypto Currency Data Using Apache APIs: Nifi, Kafka, Spark, Cassandra, and Real-Time Dashboard using Tableau.

In this blog post, we will learn how to build a real-time analytics dashboard in Tableau using Apache NiFi, Spark streaming, Kafka, Cassandra.

Spark streaming is widely used in real-time data processing, especially with Apache Kafka. A typical scenario involves a Nifi as producer application writing to a Kafka topic. The Spark application then subscribes to the topic and consumes records. This records be further processed downstream using operations like filter etc. and write into a Cassandra for persistent storage and get integrated with tableau for realtime dashboard.

This solution also fit in scenarios where businesses are struggling to make sense of the data collected over customer habits and preferences to be able to make smarter business decisions.




Below is the quick introduction and configurations required of the tools which I have used in this blog:

1) Apache NiFi: It is an open source software for automating and managing the flow of data between systems. It is a powerful and reliable system to process and distribute data. It provides a web-based User Interface for creating, monitoring, & controlling data flows. It has a highly configurable and modifiable data flow process that can modify data at runtime. It is easily extensible through the development of custom components.

Setup the Apache Nifi, visit http://nifi.apache.org/ to download



# Set JAVA_HOME in nifi-env.sh file.

# Start the Nifi
[root@node1 bin]# /opt/nifi-1.11.4/bin/nifi.sh start

# Edit nifi.properties under /opt/nifi-1.11.4/conf for custom port
nifi.web.http.port = 9001 // Change as per your requirement

# Open URL with configure http://node1:{port}/nifi

Drag & Drop the 3 processors (InvokeHTTP, SplitJson and PublishKafka) and configure the below parameters.

InvokeHTTP: à
Remote URL: https://api.nomics.com/v1/currencies/ticker?key={YOUR_KEY}&ids={YOUR_DESIRED_COINS}&interval=1d,7d,30d

SplitJson à
JsonPathExpression : $.*    // This will convert the 1 single records to multiple

PublishKafka:à
       kafka Brokers : node1:9092,node2:9092,node3:9092
       Topic Name : crypto-topic
       Max Request size : 5 MB


2)  Apache Kafka: Apache Kafka is an open source distributed streaming platform which is useful in building real-time data pipelines and stream processing applications. I have used Confluent Kafka which is a data streaming platform based on Apache Kafka: a full-scale streaming platform, capable of not only publish-and-subscribe, but also the storage and processing of data within the stream. Confluent is a more complete distribution of Apache Kafka.
Setup Confluent Kafka: visit https://docs.confluent.io/current/installation/installing_cp/rhel-centos.html and follow the instruction
# Create a topic

[root@node1 ~]# kafka-topics --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 3 --partitions 3 --topic crypto-topic

## Topic details:

[root@node1 ~]# kafka-topics --describe --zookeeper localhost:2181 --topic crypto-topic
Topic: crypto-topic     PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: crypto-topic     Partition: 0    Leader: 1       Replicas: 3,2,1 Isr: 1
        Topic: crypto-topic     Partition: 1    Leader: 1       Replicas: 1,3,2 Isr: 1
        Topic: crypto-topic     Partition: 2    Leader: 1       Replicas: 2,1,3 Isr: 1


3) Apache Spark: Apache Spark is an open source and flexible in-memory framework which serves as an alternative to map-reduce for handling batch, real-time analytics, and data processing workloads. It provides native bindings for the Java, Scala, Python, and R programming languages, and supports SQL, streaming data, machine learning, and graph processing.
[root@node1 opt]# wget https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 opt]# tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 ~]# cat > /opt/spark/conf/slaves                #// Enter the total number servers who act as a slaves
node1
node2
node3
## Start the 3 nodes Spark Cluster
[root@node1 ~]# /opt/spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.out
node1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node1.out
node2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out





4) Apache Cassandra is the open-source, active-everywhere NoSQL that powers the internet’s largest applications. Get the benefits of open-source with direct access to the engineers that support the largest Cassandra deployments. I have used Datastax Cassandra which is is scale-out data infrastructure for enterprises that need to handle anyworkload in any cloud and Built on the foundation of Apache Cassandra, DataStax Enterprise adds an operationalreliability, monitoring and security layer hardened by the largest internet apps and the Fortune100
Setup Datastax Cassandra : https://downloads.datastax.com/#enterprise
[root@node1 opt]# wget https://downloads.datastax.com/enterprise/dse-6.8.tar.gz
[root@node1 opt]# tar -zxvf dse-6.8.tar.gz

# Start the Cassandra
[root@node3 opt]# /opt/dse-6.8.0/bin/dse cassandra -R

# Create key Space & table

[root@node3 bin]# ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 6.8.0 | DSE 6.8.0 | CQL spec 3.4.5 | DSE protocol v2]
Use HELP for help.
cqlsh> CREATE KEYSPACE cryptoks
WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};

cqlsh> CREATE TABLE cryptousd (
    rank text PRIMARY KEY,
    name text,
    market_cap text,
    price text,
    circulating_supply text,
       max_supply text,
       high text,
       price_timestamp text,
       price_change_1d text,
       volume_1d text,
       volume_change_1d text,
       market_cap_change_1d text,
       price_change_7d text,
       volume_7d text,
       volume_change_7d text,
       market_cap_change_7d text,
       price_change_30d text,
       volume_30d text,
       volume_change_30d text,
       market_cap_change_30d text);

NOTE:-> I have used all column as a text, please change datatype as per your requirement.



5) Tableau:  Tableau is a powerful and fastest growing data visualization tool used in the Business Intelligence Industry. It helps in simplifying raw data into the very easily understandable format. Data analysis is very fast with Tableau and the visualizations created are in the form of dashboards and worksheets.





Now our Platform is ready and it’s time to play!.




Start the processors and you will see some outputs in In, Read/Write, Out, Tasks/Time and Open consumer console of kafka to verify the incoming traffic from Nifi




## Realtime Data is now coming into kafka topic from Nifi

[root@node1 ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic crypto-topic
{"id":"ETH","currency":"ETH","symbol":"ETH","name":"Ethereum","logo_url":"https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/eth.svg","rank":"2","price":"212.23360664","price_date":"2020-05-09T00:00:00Z","price_timestamp":"2020-05-09T13:59:00Z","market_cap":"23527378222","circulating_supply":"110856045","1d":{"price_change":"-0.00148155","price_change_pct":"-0.0000","volume":"17928137260.83","volume_change":"-2140901901.17","volume_change_pct":"-0.1067","market_cap_change":"2764406.65","market_cap_change_pct":"0.0001"},"7d":{"price_change":"-0.78461067","price_change_pct":"-0.0037","volume":"127772060660.33","volume_change":"-17089839955.15","volume_change_pct":"-0.1180","market_cap_change":"-66629458.38","market_cap_change_pct":"-0.0028"},"30d":{"price_change":"49.51167869","price_change_pct":"0.3043","volume":"557873724388.82","volume_change":"97606332444.58","volume_change_pct":"0.2121","market_cap_change":"5555252216.45","market_cap_change_pct":"0.3091"},"high":"1395.34621699","high_timestamp":"2018-01-13T00:00:00Z"}


-----------output omitted ----------------------------------------------

## Now, We will execute the pyspark script which I have created and can downloaded from my github account https://github.com/sumitbagga05/realtime/blob/master/crypto.py

I have segregated the major part of the scripts into sections so that you all will get some idea about it.
##################################################################################
# To view realtime dataframe on console                                                                                  
##################################################################################

columns_df.writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "true")\
        .format("console") \
        .start()

Realtime Output in Console:



###################################################################################
## User Defined Function for moving the data into cassandra
###################################################################################

def writeToCassandra(writeDF, epochId):
    writeDF.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="cryptousd", keyspace="cryptoks") \
        .mode('append') \
        .save()

## Taking Data as batch to realtime
df3 = columns_df.writeStream \
                .trigger(processingTime="5 seconds") \
                .outputMode("update") \
                .foreachBatch(writeToCassandra) \
                .start()




Real-Time Output in Tableau



Summary



In this post, you learned how to use the following:
  • NiFi as Producer to Kafka Topics.
  • Spark Structured Streaming with Cassandra to ingest messages using the Kafka API
  • Spark Structured Streaming to persist to Cassandra Database for continuously rapidly available SQL analysis.
  • Realtime Dashboard Overview in Tableau

6 comments:

  1. Sumit it is definitely a good article. It reminded my days introducing apache metron as a SOC platform.

    ReplyDelete
    Replies
    1. Glad to know it sir :)
      And thanks for the motivation

      Delete
  2. Hi Sumit it is really good article. What is the system requirements for this pipeline? I'm trying in Ubuntu 16.04 machine with 6GB of RAM. My Nifi is running but couldn't open the UI port the same. Still loading for more than an hour. Can you please give some input

    ReplyDelete
    Replies
    1. For Nifi, it doesn't required high end server.. It's seems to have an issue with port. Please run netstat command to check if that port is listening/open state and also check the firewall if it's blocking it.
      Paste the error here...

      Delete