End to End data pipeline for Monitoring Real-Time BlockChain- Crypto Currency Data Using Apache APIs: Nifi, Kafka, Spark, Cassandra, and Real-Time Dashboard using Tableau.
In this blog post, we will learn how to build a real-time analytics dashboard in Tableau using Apache NiFi, Spark streaming, Kafka, Cassandra.
Spark streaming is widely used in real-time data processing, especially with Apache Kafka. A typical scenario involves a Nifi as producer application writing to a Kafka topic. The Spark application then subscribes to the topic and consumes records. This records be further processed downstream using operations like filter etc. and write into a Cassandra for persistent storage and get integrated with tableau for realtime dashboard.
This solution also fit in scenarios where businesses are struggling to make sense of the data collected over customer habits and preferences to be able to make smarter business decisions.
Summary
In this blog post, we will learn how to build a real-time analytics dashboard in Tableau using Apache NiFi, Spark streaming, Kafka, Cassandra.
Spark streaming is widely used in real-time data processing, especially with Apache Kafka. A typical scenario involves a Nifi as producer application writing to a Kafka topic. The Spark application then subscribes to the topic and consumes records. This records be further processed downstream using operations like filter etc. and write into a Cassandra for persistent storage and get integrated with tableau for realtime dashboard.
This solution also fit in scenarios where businesses are struggling to make sense of the data collected over customer habits and preferences to be able to make smarter business decisions.
Below is the quick introduction and
configurations required of the tools which I have used in this blog:
1) Apache NiFi: It is an
open source software for automating and managing the flow of data between
systems. It is a powerful and reliable system to process and distribute data.
It provides a web-based User Interface for creating, monitoring, &
controlling data flows. It has a highly configurable and modifiable data flow
process that can modify data at runtime. It is easily extensible through the
development of custom components.
# Set JAVA_HOME in nifi-env.sh file.
# Start the Nifi
[root@node1 bin]# /opt/nifi-1.11.4/bin/nifi.sh start
# Edit nifi.properties under /opt/nifi-1.11.4/conf for
custom port
nifi.web.http.port = 9001 // Change as per your requirement
Drag & Drop the 3 processors (InvokeHTTP, SplitJson and
PublishKafka) and configure the below parameters.
InvokeHTTP: à
Remote URL:
https://api.nomics.com/v1/currencies/ticker?key={YOUR_KEY}&ids={YOUR_DESIRED_COINS}&interval=1d,7d,30d
SplitJson à
JsonPathExpression : $.* // This
will convert the 1 single records to multiple
PublishKafka:à
kafka Brokers
: node1:9092,node2:9092,node3:9092
Topic Name :
crypto-topic
Max Request
size : 5 MB
2) Apache Kafka:
Apache Kafka is an open source distributed streaming platform which is useful
in building real-time data pipelines and stream processing applications. I have
used Confluent Kafka which is a data streaming platform based on Apache
Kafka: a full-scale streaming platform, capable of not only
publish-and-subscribe, but also the storage and processing of data within the
stream. Confluent is a more complete distribution of Apache
Kafka.
Setup Confluent Kafka: visit https://docs.confluent.io/current/installation/installing_cp/rhel-centos.html and follow the instruction
# Create a topic
[root@node1 ~]# kafka-topics --create --zookeeper
node1:2181,node2:2181,node3:2181 --replication-factor 3 --partitions 3 --topic crypto-topic
## Topic details:
[root@node1 ~]# kafka-topics --describe --zookeeper
localhost:2181 --topic crypto-topic
Topic: crypto-topic
PartitionCount: 3
ReplicationFactor: 3 Configs:
Topic:
crypto-topic Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1
Topic:
crypto-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1
Topic:
crypto-topic Partition: 2 Leader: 1 Replicas: 2,1,3 Isr: 1
3) Apache
Spark: Apache Spark is an open source and flexible
in-memory framework which serves as an alternative to map-reduce for handling
batch, real-time analytics, and data processing workloads. It provides native
bindings for the Java, Scala, Python, and R programming languages, and supports
SQL, streaming data, machine learning, and graph processing.
[root@node1 opt]# wget https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 opt]# tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 ~]# cat > /opt/spark/conf/slaves #// Enter the total number
servers who act as a slaves
node1
node2
node3
## Start the 3 nodes Spark Cluster
[root@node1 ~]# /opt/spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to
/opt/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.out
node1: starting org.apache.spark.deploy.worker.Worker,
logging to
/opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node1.out
node2: starting org.apache.spark.deploy.worker.Worker,
logging to
/opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: starting org.apache.spark.deploy.worker.Worker,
logging to
/opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out
4) Apache
Cassandra is the open-source, active-everywhere NoSQL
that powers the internet’s largest applications. Get the benefits of
open-source with direct access to the engineers that support the largest
Cassandra deployments. I have used Datastax Cassandra which is is scale-out data infrastructure for
enterprises that need to handle anyworkload in any cloud and Built on the
foundation of Apache Cassandra, DataStax Enterprise adds an
operationalreliability, monitoring and security layer hardened by the largest
internet apps and the Fortune100
Setup Datastax Cassandra : https://downloads.datastax.com/#enterprise
[root@node1 opt]# wget
https://downloads.datastax.com/enterprise/dse-6.8.tar.gz
[root@node1 opt]# tar -zxvf dse-6.8.tar.gz
# Start the Cassandra
[root@node3 opt]# /opt/dse-6.8.0/bin/dse cassandra -R
# Create key Space & table
[root@node3 bin]# ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 6.8.0 | DSE 6.8.0 | CQL spec 3.4.5 | DSE protocol
v2]
Use HELP for help.
cqlsh> CREATE KEYSPACE cryptoks
WITH replication = {'class':'SimpleStrategy',
'replication_factor' : 1};
cqlsh> CREATE TABLE cryptousd (
rank text PRIMARY
KEY,
name text,
market_cap text,
price text,
circulating_supply text,
max_supply
text,
high text,
price_timestamp
text,
price_change_1d
text,
volume_1d text,
volume_change_1d
text,
market_cap_change_1d
text,
price_change_7d
text,
volume_7d text,
volume_change_7d
text,
market_cap_change_7d
text,
price_change_30d
text,
volume_30d
text,
volume_change_30d
text,
market_cap_change_30d
text);
NOTE:-> I have used all column as a text, please change
datatype as per your requirement.
5) Tableau: Tableau
is a powerful and fastest growing data visualization tool used in the Business
Intelligence Industry. It helps in simplifying raw data into the very easily
understandable format. Data analysis is very fast with Tableau and the
visualizations created are in the form of dashboards and worksheets.
Setup
Tableau : https://www.tableau.com/products/trial
Now our Platform is ready and it’s time to play!.
Start the processors and you will see some outputs in In, Read/Write, Out, Tasks/Time and Open consumer console of kafka to verify the incoming traffic from Nifi
## Now, We will execute the pyspark script which I have created and can downloaded from my github account https://github.com/sumitbagga05/realtime/blob/master/crypto.py
## Realtime Data is now coming into kafka topic from Nifi
[root@node1 ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic crypto-topic
{"id":"ETH","currency":"ETH","symbol":"ETH","name":"Ethereum","logo_url":"https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/eth.svg","rank":"2","price":"212.23360664","price_date":"2020-05-09T00:00:00Z","price_timestamp":"2020-05-09T13:59:00Z","market_cap":"23527378222","circulating_supply":"110856045","1d":{"price_change":"-0.00148155","price_change_pct":"-0.0000","volume":"17928137260.83","volume_change":"-2140901901.17","volume_change_pct":"-0.1067","market_cap_change":"2764406.65","market_cap_change_pct":"0.0001"},"7d":{"price_change":"-0.78461067","price_change_pct":"-0.0037","volume":"127772060660.33","volume_change":"-17089839955.15","volume_change_pct":"-0.1180","market_cap_change":"-66629458.38","market_cap_change_pct":"-0.0028"},"30d":{"price_change":"49.51167869","price_change_pct":"0.3043","volume":"557873724388.82","volume_change":"97606332444.58","volume_change_pct":"0.2121","market_cap_change":"5555252216.45","market_cap_change_pct":"0.3091"},"high":"1395.34621699","high_timestamp":"2018-01-13T00:00:00Z"}
-----------output omitted ----------------------------------------------
## Now, We will execute the pyspark script which I have created and can downloaded from my github account https://github.com/sumitbagga05/realtime/blob/master/crypto.py
I have segregated the major part of the scripts
into sections so that you all will get some idea about it.
##################################################################################
# To view realtime dataframe on console
##################################################################################
columns_df.writeStream \
.trigger(processingTime='5
seconds') \
.outputMode("append") \
.option("truncate", "true")\
.format("console") \
.start()
Realtime Output in Console:
###################################################################################
## User Defined Function for moving the data into cassandra
###################################################################################
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="cryptousd", keyspace="cryptoks") \
.mode('append') \
.save()
## Taking Data as batch to realtime
df3 = columns_df.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
Real-Time Output in Tableau
Summary
In this post, you learned how to use the following:
- NiFi as Producer to Kafka Topics.
- Spark Structured Streaming with Cassandra to ingest
messages using the Kafka API
- Spark Structured Streaming to persist to Cassandra
Database for continuously rapidly available SQL analysis.
- Realtime Dashboard Overview in Tableau
Truly remarkable .....
ReplyDeleteThanks Brijesh for your kind words :)
DeleteSumit it is definitely a good article. It reminded my days introducing apache metron as a SOC platform.
ReplyDeleteGlad to know it sir :)
DeleteAnd thanks for the motivation
Hi Sumit it is really good article. What is the system requirements for this pipeline? I'm trying in Ubuntu 16.04 machine with 6GB of RAM. My Nifi is running but couldn't open the UI port the same. Still loading for more than an hour. Can you please give some input
ReplyDeleteFor Nifi, it doesn't required high end server.. It's seems to have an issue with port. Please run netstat command to check if that port is listening/open state and also check the firewall if it's blocking it.
DeletePaste the error here...