Storing and analysing big amounts of data is not the differentiator of successful companies in the process of decision making anymore. Today’s world is about how fast decision makers are provided with the right information to be able to make the right decision before it’s too late.
Streaming Analytics is correctly referred to as Perishable Insights, “which must be acted on within a given timeframe or else the chance to influence business outcomes will pass.” [Forbes]
The thing is, many companies see implementing streaming analytics platform as a very challenging and costly project. It is mainly because when talking to advisors or consultants, they are usually presented with a very complex architecture that needs a lot of effort and money to set it up and get it going.
That is not the reality of streaming analytics. When proper set of technologies and tools used, such platform could be set up very quickly and effectively. Below is the diagram showing the architecture of the platform that I found working perfectly:
Looks beautiful and simple, doesn’t it? We’re going to use this architecture and build a solution that identifies fraudulent ATM transactions in real time. In 3 steps:
Step 1: Build and analyse streams of data
Everyone who has worked or is working in the field of Big Data have heard of Kafka. It’s the backbone of any streaming application and solution and was invented because in most streaming cases the speed at which data is generated in source is a lot faster than the speed at which it is consumed in the destination. Therefore Kafka topics are required to act as a buffer and get hold of the records until they are read by the consumers.
Like any other streaming solution, we’re going to use Kafka here as well. But not any edition of it, we’ll be using Confluent Kafka. Confluent has not only put great add-ons around Kafka and made it a lot easier to be deployed and managed, they are the pioneer in stream processing. Furthermore, it scales very efficiently and is able to work with very fast and very big data without any hiccups.
The most interesting component in Confluent platform for me is KSQL. It provides SQL on top of streams of data. And that sounds like heaven for someone like me who has spent most of his professional life writing and optimising SQL queries. And I love the logo too!
For the first part of this solution, I followed this blog post and created the streams and processed them with KSQL. The steps I took were:
- Download and set up Confluent platform: https://www.confluent.io/download-cp/
- Start your Confluent platform:
- Download and set up gess: https://github.com/rmoff/gess
- Create the Topic using Confluent’s “kafka-topics” command:
./bin/kafka-topics –topic atm_txns –zookeeper localhost:2181 —create –partitions 1 –replication-factor 1
- Follow the blog post and create and process the stream. Just as the reference, your final Stream should look like this:
CREATE STREAM ATM_POSSIBLE_FRAUD \
WITH (PARTITIONS=1) AS \
SELECT TIMESTAMPTOSTRING(T1.ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS T1_TIMESTAMP, TIMESTAMPTOSTRING(T2.ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS T2_TIMESTAMP, \
GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, ‘KM’) AS DISTANCE_BETWEEN_TXN_KM, \
(T2.ROWTIME – T1.ROWTIME) AS MILLISECONDS_DIFFERENCE, \
(CAST(T2.ROWTIME AS DOUBLE) – CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 AS MINUTES_DIFFERENCE, \
GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, ‘KM’) / ((CAST(T2.ROWTIME AS DOUBLE) – CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 / 60) AS KMH_REQUIRED, \
T1.ACCOUNT_ID AS ACCOUNT_ID, \
T1.TRANSACTION_ID, T2.TRANSACTION_ID, \
T1.AMOUNT, T2.AMOUNT, \
T1.ATM, T2.ATM, \
T1.location->lat AS T1_LAT, \
T1.location->lon AS T1_LON, \
T2.location->lat AS T2_LAT, \
T2.location->lon AS T2_LON \
FROM ATM_TXNS T1 \
INNER JOIN ATM_TXNS_02 T2 \
WITHIN (0 MINUTES, 10 MINUTES) \
ON T1.ACCOUNT_ID = T2.ACCOUNT_ID \
WHERE T1.TRANSACTION_ID != T2.TRANSACTION_ID \
AND (T1.location->lat != T2.location->lat OR \
T1.location->lon != T2.location->lon) \
AND T2.ROWTIME != T1.ROWTIME;
Step 2: Ingest streams of data into data store in real-time
The next layer we need to implement in this architecture is the ingestion and storage. There are different tools in the market that are able to ingest data in some level of real time like Nifi, StreamSets and maybe Talend. And then for the storage, depending on on-premise or cloud preference, HDFS or Object Storage are the options.
The number one factor that I always consider when suggesting a solution to my clients is integrity and homogeneity in all layers of the purpose-built solutions. And when it comes to streaming where performance is the number one factor, I can’t think of a solution more reliable and faster than MemSQL. If you’re curious to know how fast the database is, watch this video. And be prepared for your mind to be blown!
Another reason I love MemSQL for streaming use cases is how well it integrates with Kafka through MemSQL Pipelines. Take the following steps to set up MemSQL and integrate it with your Confluent platform:
1- Install MemSQL on the environment of your choice: https://docs.memsql.com/guides/latest/install-memsql/
2- Fire off MemSQL CLI and create a new database:
create database streaming_demo_database;
3- Create a new table for the records you receive from Confluent:
CREATE TABLE atm_possible_fraud (INGESTION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
, MESSAGE_FROM_KAFKA JSON
, T1_TIMESTAMP AS MESSAGE_FROM_KAFKA::$T1_TIMESTAMP PERSISTED DATETIME
, T2_TIMESTAMP AS MESSAGE_FROM_KAFKA::$T2_TIMESTAMP PERSISTED DATETIME
, DISTANCE_BETWEEN_TXN_KM AS MESSAGE_FROM_KAFKA::$DISTANCE_BETWEEN_TXN_KM PERSISTED DOUBLE
,MILLISECONDS_DIFFERENCE AS MESSAGE_FROM_KAFKA::$MILLISECONDS_DIFFERENCE PERSISTED DOUBLE
,MINUTES_DIFFERENCE AS MESSAGE_FROM_KAFKA::$MINUTES_DIFFERENCE PERSISTED DOUBLE
,KMH_REQUIRED AS MESSAGE_FROM_KAFKA::$KMH_REQUIRED PERSISTED DOUBLE
,ACCOUNT_ID AS MESSAGE_FROM_KAFKA::$ACCOUNT_ID PERSISTED CHAR(100)
,T1_TRANSACTION_ID AS MESSAGE_FROM_KAFKA::$T1_TRANSACTION_ID PERSISTED CHAR(100)
,T2_TRANSACTION_ID AS MESSAGE_FROM_KAFKA::$T2_TRANSACTION_ID PERSISTED CHAR(100)
,T1_AMOUNT AS MESSAGE_FROM_KAFKA::$T1_AMOUNT PERSISTED DOUBLE
,T2_AMOUNT AS MESSAGE_FROM_KAFKA::$T2_AMOUNT PERSISTED DOUBLE
,T1_ATM AS MESSAGE_FROM_KAFKA::$T1_ATM PERSISTED CHAR(100)
,T2_ATM AS MESSAGE_FROM_KAFKA::$T2_ATM PERSISTED CHAR(100)
,T1_LAT AS MESSAGE_FROM_KAFKA::$T1_LAT PERSISTED DOUBLE
,T1_LON AS MESSAGE_FROM_KAFKA::$T1_LON PERSISTED DOUBLE
,T2_LAT AS MESSAGE_FROM_KAFKA::$T2_LAT PERSISTED DOUBLE
,T2_LON AS MESSAGE_FROM_KAFKA::$T2_LON PERSISTED DOUBLE
A couple of points about this create table script:
- The first column, INGESTION_TIME, is populated automatically when every record is ingested
- The second column, MESSAGE_FROM_KAFKA, holds the records received from Confluent topics in JSON format
- The rest are Persistent Computed columns in the table that are computed and populated when each JSON record lands in the table. This is another cool feature of MemSQL, makes it incredible easy to parse JSON data without the need to call any additional script or coding.
4- Create an index on INGESTION_TIME column. This is needed when we get to build our visualisation work in real-time with Zoomdata:
CREATE INDEX inserttime_index ON atm_possible_fraud (Ingestion_Time);
5- Create a pipeline that reads data from Confluent topics and inserts into MemSQL in real-time:
CREATE PIPELINE `atm_possible_fraud` AS LOAD DATA KAFKA ‘[IP_ADDRESS]:9092/ATM_POSSIBLE_FRAUD’ INTO TABLE `atm_possible_fraud` (MESSAGE_FROM_KAFKA);
6- Test and start the pipeline:
TEST PIPELINE atm_possible_fraud;
START PIPELINE atm_possible_fraud;
And we’re done. Now you can start ingesting data into Confluent Kafka topics and they will be replicated in your MemSQL table in real-time. On your Confluent server, make sure you have started your Confluent platform by running:
Then go to the folder where you have downloaded gess and run following commands:
nc -v -u -l 6900 | [CONFLUENT_DIRECTORY]/bin/kafka-console-producer –broker-list localhost:9092 –topic atm_txns
And start querying your MemSQL table. You should see records ingested in there as they are produced into Confluent Kafka topic.
Step 3: Visualise your data in real-time
There are so many visualisation tools out there in the market, some of which claim they can visualise your data in real-time. But none of them can truly achieve that. Why? Because they all need to take custody of data to be able to visualise it: each and every record needs to be moved to the server where the visualisation engine runs, processed there and visualised to the users in form of graphs and dashboards.
There is one visualisation tool that is different from every other tool in the market in that it pushes down the query processing to the source where data is stored, and that is Zoomdata. Zoomdata doesn’t move data. It doesn’t take custody of the data. How does it work? glad you asked.
Zoomdata’s smart query engine takes the query that is meant to grab the data for the dashboard, applies its knowledge of the underlying data store and metadata about the tables involved in the visualisation, and breaks the query into many smaller queries called Micro-Queries. The idea is that instead of sending a big query down to the source and wait for the result to come back, it makes more sense to send those smaller queries down and sharpen the visualisation with the result from each micro-query.
Another very important point about Zoomdata is that it is truly self-service. It’s not like Tableau which claims to be self service but then there are certified Tableau developers. If a visualisation tool is simple enough and really is self-service, you shouldn’t need a certification to learn everything about it.
To create a dashboard pointing to our
1- Open Zoomdata UI and login as admin.
2- Click on the Setting from the top menu and then select Sources. From installed connectors, click on MemSQL logo and start configuring the data source:
3- Give your data source and click next. This is the page where you give the details and credentials to connect to your MemSQL cluster. Fill it up and then click next.
4- In the next page you’ll see the list of tables that exist in the data source you created, in our case the MemSQL database. Select “atm_possible_fraud”, the list of columns and sample the top 10 rows of the table will be loaded. Remember to toggle CACHING off, since we are building a real time dashboard. And then click Next:
5- The next page has the list of columns and their data types. Zoomdata infers the data types for each column from the data source and you can change them if they don’t match the type of analysis you want to do or if they are not correct. Read more about this tab here.
6- Next tab is where you define how often you would like to refresh the cached data. We don’t need to make any changes here since we’re not caching any data. Click Save & Next.
7- Now we’re on the Charts tab. This tab is used to define the default behaviour of each visualisation type. This means you can define which columns to be used to render each visualisation when they the dashboard opes for the first time (Zoomdata is 100% self service, meaning that users can change the dashboards at the run time without the need for IT or highly technical resources).
Another very important and interesting feature of your visualisation dashboard will be defined in this tab as well: the Time Bar. Zoomdata follows a streaming architecture, which enables it to connect in “Live Mode” to any data source capable of real-time data. The technology behind this feature is called Data DVR.
In Live Mode, Zoomdata visualisations immediately reflect changes in the source data as data streams in. The live stream can be paused, rewound and replayed, essentially treating it as a Data DVR. Follow these steps to set it up:
- Select Ingestion Time from the Time Attribute drop-down. That’s the column we had our MemSQL table partitioned by, remember? This would be the column driving our time bar, and it makes sense to choose it: our real time dashboard needs to be able to get the values from our real time data source based on this column very fast.
- Click on Enable Live Mode checkbox.
- Select 1 for Refresh Rate and 1 second for Delay By. The idea is that Zoomdata will look for records added to the data source with 1 second delay. In the next versions of Zoomdata, it’ll be able to support milliseconds delays.
You will be redirected to the source page and you’ll see on the top that your new data source is created. Click on New Chart and Dashboard and start building live visualisations:
I leave finishing the visualisation to you. Below is a short video showing the incredible performance this streaming solution can provide. On the left side of the video I kick off my script that publishes records to Confluent Kafka topics, and it takes less than 3 seconds from that point until the visualisation is updated.
Our solution was so simple and easy to implement that I could summarise it in one blog post. And at the same time, it’s capable of providing incredible performance running on 3 servers only (one Confluent, one MemSQL and one Zoomdata).
Want to know more about any of these products? Get in touch and let’s have a chat.