End to End data pipeline for Monitoring Real-Time BlockChain- Crypto Currency Data Using Apache APIs: Nifi, Kafka, Spark, Cassandra, and Real-Time Dashboard using Tableau.

End to End data pipeline for Monitoring Real-Time BlockChain- Crypto Currency Data Using Apache APIs: Nifi, Kafka, Spark, Cassandra, and Real-Time Dashboard using Tableau.

In this blog post, we will learn how to build a real-time analytics dashboard in Tableau using Apache NiFi, Spark streaming, Kafka, Cassandra.

Spark streaming is widely used in real-time data processing, especially with Apache Kafka. A typical scenario involves a Nifi as producer application writing to a Kafka topic. The Spark application then subscribes to the topic and consumes records. This records be further processed downstream using operations like filter etc. and write into a Cassandra for persistent storage and get integrated with tableau for realtime dashboard.

This solution also fit in scenarios where businesses are struggling to make sense of the data collected over customer habits and preferences to be able to make smarter business decisions.




Below is the quick introduction and configurations required of the tools which I have used in this blog:

1) Apache NiFi: It is an open source software for automating and managing the flow of data between systems. It is a powerful and reliable system to process and distribute data. It provides a web-based User Interface for creating, monitoring, & controlling data flows. It has a highly configurable and modifiable data flow process that can modify data at runtime. It is easily extensible through the development of custom components.

Setup the Apache Nifi, visit http://nifi.apache.org/ to download



# Set JAVA_HOME in nifi-env.sh file.

# Start the Nifi
[root@node1 bin]# /opt/nifi-1.11.4/bin/nifi.sh start

# Edit nifi.properties under /opt/nifi-1.11.4/conf for custom port
nifi.web.http.port = 9001 // Change as per your requirement

# Open URL with configure http://node1:{port}/nifi

Drag & Drop the 3 processors (InvokeHTTP, SplitJson and PublishKafka) and configure the below parameters.

InvokeHTTP: à
Remote URL: https://api.nomics.com/v1/currencies/ticker?key={YOUR_KEY}&ids={YOUR_DESIRED_COINS}&interval=1d,7d,30d

SplitJson à
JsonPathExpression : $.*    // This will convert the 1 single records to multiple

PublishKafka:à
       kafka Brokers : node1:9092,node2:9092,node3:9092
       Topic Name : crypto-topic
       Max Request size : 5 MB


2)  Apache Kafka: Apache Kafka is an open source distributed streaming platform which is useful in building real-time data pipelines and stream processing applications. I have used Confluent Kafka which is a data streaming platform based on Apache Kafka: a full-scale streaming platform, capable of not only publish-and-subscribe, but also the storage and processing of data within the stream. Confluent is a more complete distribution of Apache Kafka.
Setup Confluent Kafka: visit https://docs.confluent.io/current/installation/installing_cp/rhel-centos.html and follow the instruction
# Create a topic

[root@node1 ~]# kafka-topics --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 3 --partitions 3 --topic crypto-topic

## Topic details:

[root@node1 ~]# kafka-topics --describe --zookeeper localhost:2181 --topic crypto-topic
Topic: crypto-topic     PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: crypto-topic     Partition: 0    Leader: 1       Replicas: 3,2,1 Isr: 1
        Topic: crypto-topic     Partition: 1    Leader: 1       Replicas: 1,3,2 Isr: 1
        Topic: crypto-topic     Partition: 2    Leader: 1       Replicas: 2,1,3 Isr: 1


3) Apache Spark: Apache Spark is an open source and flexible in-memory framework which serves as an alternative to map-reduce for handling batch, real-time analytics, and data processing workloads. It provides native bindings for the Java, Scala, Python, and R programming languages, and supports SQL, streaming data, machine learning, and graph processing.
[root@node1 opt]# wget https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 opt]# tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz
[root@node1 ~]# cat > /opt/spark/conf/slaves                #// Enter the total number servers who act as a slaves
node1
node2
node3
## Start the 3 nodes Spark Cluster
[root@node1 ~]# /opt/spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.out
node1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node1.out
node2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out





4) Apache Cassandra is the open-source, active-everywhere NoSQL that powers the internet’s largest applications. Get the benefits of open-source with direct access to the engineers that support the largest Cassandra deployments. I have used Datastax Cassandra which is is scale-out data infrastructure for enterprises that need to handle anyworkload in any cloud and Built on the foundation of Apache Cassandra, DataStax Enterprise adds an operationalreliability, monitoring and security layer hardened by the largest internet apps and the Fortune100
Setup Datastax Cassandra : https://downloads.datastax.com/#enterprise
[root@node1 opt]# wget https://downloads.datastax.com/enterprise/dse-6.8.tar.gz
[root@node1 opt]# tar -zxvf dse-6.8.tar.gz

# Start the Cassandra
[root@node3 opt]# /opt/dse-6.8.0/bin/dse cassandra -R

# Create key Space & table

[root@node3 bin]# ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 6.8.0 | DSE 6.8.0 | CQL spec 3.4.5 | DSE protocol v2]
Use HELP for help.
cqlsh> CREATE KEYSPACE cryptoks
WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};

cqlsh> CREATE TABLE cryptousd (
    rank text PRIMARY KEY,
    name text,
    market_cap text,
    price text,
    circulating_supply text,
       max_supply text,
       high text,
       price_timestamp text,
       price_change_1d text,
       volume_1d text,
       volume_change_1d text,
       market_cap_change_1d text,
       price_change_7d text,
       volume_7d text,
       volume_change_7d text,
       market_cap_change_7d text,
       price_change_30d text,
       volume_30d text,
       volume_change_30d text,
       market_cap_change_30d text);

NOTE:-> I have used all column as a text, please change datatype as per your requirement.



5) Tableau:  Tableau is a powerful and fastest growing data visualization tool used in the Business Intelligence Industry. It helps in simplifying raw data into the very easily understandable format. Data analysis is very fast with Tableau and the visualizations created are in the form of dashboards and worksheets.





Now our Platform is ready and it’s time to play!.




Start the processors and you will see some outputs in In, Read/Write, Out, Tasks/Time and Open consumer console of kafka to verify the incoming traffic from Nifi




## Realtime Data is now coming into kafka topic from Nifi

[root@node1 ~]# kafka-console-consumer --bootstrap-server localhost:9092 --topic crypto-topic
{"id":"ETH","currency":"ETH","symbol":"ETH","name":"Ethereum","logo_url":"https://s3.us-east-2.amazonaws.com/nomics-api/static/images/currencies/eth.svg","rank":"2","price":"212.23360664","price_date":"2020-05-09T00:00:00Z","price_timestamp":"2020-05-09T13:59:00Z","market_cap":"23527378222","circulating_supply":"110856045","1d":{"price_change":"-0.00148155","price_change_pct":"-0.0000","volume":"17928137260.83","volume_change":"-2140901901.17","volume_change_pct":"-0.1067","market_cap_change":"2764406.65","market_cap_change_pct":"0.0001"},"7d":{"price_change":"-0.78461067","price_change_pct":"-0.0037","volume":"127772060660.33","volume_change":"-17089839955.15","volume_change_pct":"-0.1180","market_cap_change":"-66629458.38","market_cap_change_pct":"-0.0028"},"30d":{"price_change":"49.51167869","price_change_pct":"0.3043","volume":"557873724388.82","volume_change":"97606332444.58","volume_change_pct":"0.2121","market_cap_change":"5555252216.45","market_cap_change_pct":"0.3091"},"high":"1395.34621699","high_timestamp":"2018-01-13T00:00:00Z"}


-----------output omitted ----------------------------------------------

## Now, We will execute the pyspark script which I have created and can downloaded from my github account https://github.com/sumitbagga05/realtime/blob/master/crypto.py

I have segregated the major part of the scripts into sections so that you all will get some idea about it.
##################################################################################
# To view realtime dataframe on console                                                                                  
##################################################################################

columns_df.writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "true")\
        .format("console") \
        .start()

Realtime Output in Console:



###################################################################################
## User Defined Function for moving the data into cassandra
###################################################################################

def writeToCassandra(writeDF, epochId):
    writeDF.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="cryptousd", keyspace="cryptoks") \
        .mode('append') \
        .save()

## Taking Data as batch to realtime
df3 = columns_df.writeStream \
                .trigger(processingTime="5 seconds") \
                .outputMode("update") \
                .foreachBatch(writeToCassandra) \
                .start()




Real-Time Output in Tableau



Summary



In this post, you learned how to use the following:
  • NiFi as Producer to Kafka Topics.
  • Spark Structured Streaming with Cassandra to ingest messages using the Kafka API
  • Spark Structured Streaming to persist to Cassandra Database for continuously rapidly available SQL analysis.
  • Realtime Dashboard Overview in Tableau

Real Time Fraud Analytics using Azure Event Hub and Azure Stream Analytics.





Today will discuss end-to-end illustration of how to use Azure Stream Analytics and storing it into Blob Storage/Data Lake Storage Gen2. Blog 2 will focus on visualisation part.
This tutorial uses the example of real-time fraud detection based on phone-call data. This will also work on other types of fraud detection, such as credit card fraud or identity theft.

Scenario: Telecommunications and SIM fraud detection in real time.

A XYZ company has a huge amount of incoming calls data. They want to detect fraudulent calls in real time and inform the user or block the specific number.
To detect type of fraud, the company needs to examine incoming phone records and look for specific patterns. In this case, calls made around the same time in different countries/regions. Any phone records that fall into this category are written to storage for subsequent analysis.

Prerequisites:

In this tutorial, you'll simulate phone-call data by using a client app that generates sample phone call metadata. Some of the records that the app produces look like fraudulent calls.
·         An Azure account.
·         The call-event generator app, TelcoGenerator.zip, which can be downloaded from the Microsoft Download Center.

Create an Azure Event Hubs to ingest events

To analyse a data stream, you ingest into azure using Azure event hub. It allows to ingest millions of events per second and then store the event information.

Create a Namespace and Event hub

You first create an event hub namespace, then add an event hub to that namespace.
Azure Portal à create resource à All services à Event hub in Analytics à Select Add ->

Remain other values as defaults.
Click the new namespace, and in the namespace pane, click Event Hub



Grant access to the event hub and get a connection string
In the event hub pane, click Shared access policies and then click + Add.




After the policy has been deployed, click it in the list of shared access policies
Find the box labeled CONNECTION STRING-PRIMARY KEY and click the copy button next to the connection string
Connection string looks like this:
Endpoint=sb://infrasolution-eh-ns.servicebus.windows.net/;SharedAccessKeyName=asa-policy;SharedAccessKey=iuykGVOF3yOLRYodNDURCpBJSfoqKIirVdxf9w04ia4=;EntityPath=ev-frauddetection

Configure and start the event generator application

 Configure the TelcoGenerator app

Edit “telcodatagen.exe.config” The <appSettings> section will look like the following example. (For clarity, the lines are wrapped and some characters have been removed from the authorization token.)

  <appSettings>
    <!-- Service Bus specific app setings for messaging connections -->
    <add key="EventHubName" value="ev-frauddetection"/>
    <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://infrasolution-eh-ns.servicebus.windows.net/;SharedAccessKeyName=asa-policy;SharedAccessKey=iuykGVOF3yOLRYodNDURCpBJSfoqKIirVdxf9w04ia4="/>
  </appSettings>

Start the application
telcodatagen.exe 1000 0.2 2
The parameters are:
·         Number of CDRs per hour.
·         SIM Card Fraud Probability: How often, as a percentage of all calls, that the app should simulate a fraudulent call. The value 0.2 means that about 20% of the call records will look fraudulent.
·         Duration in hours. The number of hours that the app should run. You can also stop the app any time by pressing Ctrl+C at the command line.


Create a Stream Analytics job to manage streaming data
In the Azure portal, click Create a resource > Internet of Things > Stream Analytics job.

Configure job Input & Output
In the dashboard or the All resources pane, find and select the asa-frauddetection-job Stream Analytics job.


Run a test connection and Test Query



After configuring input and output jobs, You will see below kind of output in overview section.
Go to the storage account and check the container. You will see a file with increment of size.

Check the Size, its now increased to 260 KB