Stream IoT device data from Azure IoT Hub into Databricks Delta Lake

Featured

IoT devices produce a lot of data very fast. Capturing data from all those devices, which could be at millions, and managing them is the very first step in building a successful and effective IoT platform.

Like any other data solution, an IoT data platform could be built on-premise or on cloud. I’m a huge fan of cloud based solutions specially PaaS offerings. After doing a little bit of research I decided to go with Azure since it has the most comprehensive and easy to use set of service offerings when it comes to IoT and they are reasonably priced. In this post, I am going to show how to build the architecture displayed in the diagram below: connect your devices to Azure IoT Hub and then ingest records into Databricks Delta Lake as they stream in using Spark Streaming.

Solution Architecture

Setup Azure IoT Hub and Register a Device

The very first step is to set up Azure IoT Hub, register a device with it and test it by sending data across. This is very well explained by Microsoft here. Make sure you follow all the steps and you’re able to read the messages sent to IoT Hub at the end.

The only extra step we need to take is to add a new consumer group to the IoT Hub. Doing this means our Spark Streaming application will have its own offset, tracking where in the queue it has last read the records coming from devices. By assigning unique consumer groups to each application that subscribes to IoT Hub, we can send the record coming from IoT devices to multiple destinations, for example to store them in Blob storage, send them to Azure Stream Analytics and do real-time analytics, as well as a delta table in Databricks Delta Lake.

Navigate to IoT Hub page on the Azure portal and select your hub. Click on Built-in endpoints and add a new Consumer Group:

Add Consumer Group to IoT Hub

Databricks: Unified Analytics Platform & Delta Lake

Moving on to the next layer in our architecture, we’re going to set up Databricks. Databricks offers a platform that unifies data engineering, data science and business logic. It is basically PaaS offering for Spark on cloud, which speeds up data exploration and preparation.

Why Delta?

Delta Lake is a storage layer invented by Databricks to bring ACID transactions to big data workloads. This is a response to limitation within an existing big data storage mechanisms like Parquet: They are immutable. To update a record within a Parquet file, you need to re-write the whole file. With Delta, you can easily write update statements at records level. This is all we need to know about Delta file format for the purpose of what we want to build here, more about is here.

A very important result of this feature for IoT and streaming use cases is that we will be able to query the data as they arrive, instead of having to wait for a partition to be updated (re-written)

In this solution we will see how to set up Databricks, use Spark Streaming to subscribe to records coming in to Azure IoT Hub, and write them to a Delta table.

Setup Databricks

Navigate to Azure Portal and click on Create a Resource -> Analytics -> Azure Databricks. This is where you create a workspace, which is where you can access all your databricks assets. Fill up the new form that opens up and make sure you select Standard for pricing tier. Then hit Create:

Create Databricks Workspace

When the workspace is created, go to Azure Databricks Workspace resource page and click on Lunch Workspace. You will be navigated to your workspace. Create a new cluster with the same properties you see in the picture below. You can ask for bigger nodes or enable autoscaling, but it’s not needed for this tutorial:

Create Databricks Cluster

The next step is to create a notebook. Click on Home -> <Your Email Address> -> Create -> Notebook. Give it a name, select Scala as the default language of the notebook (you can change it later using %), and select the cluster where this notebook’s commands will run on.

Structured Streaming from IoT Hub

Now that we have setup our notebook, we can start writing code.

Connect to IoT Hub and read the stream

import org.apache.spark.eventhubs._
import  org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import  org.apache.spark.sql.functions.{ explode, split }

// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("--IOT HUB CONNECTION STRING FROM AZURE PORTAL--")
  .setEventHubName("--IoT Hub Name--")
  .build
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)
  .setConsumerGroup("delta")
  
val eventhubs = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

The code snippet above first creates a connection string pointing to the IoT Hub we created before. The only extra steps you need to take is to get the connection string from Azure portal and replace it in ConnectionStringBuilder and change the name in .setEventHubName to “<Event Hub-compatible name>” accordingly. Open Azure portal and go to your IoT Hub’s page. Click on Built-in endpoints and copy what you see below and paste in the code snippet in the notebook:

IoT Hub Endpoint Details

What we get after those commands are completed successfully is a DataFrame that has the following fields in it. The messages coming from our IoT device are in the “body” field:

Extract device data and create a Spark SQL Table

The next step would be to extract the device data coming in the body field of the DataFrame we built in previous step and build the DataFrame comprising of the fields we want to store in our Delta Lake to do analytics later on:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
val schema = (new StructType)
    .add("temperature", DoubleType)
    .add("humidity", DoubleType)
val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub-connection-device-id")
                  .as("Device_ID"),(from_json($"body".cast("string"), schema)
                  .as("telemetry_json"))).select("Enqueued_Time","Device_ID", "telemetry_json.*")

The resulting DataFrame looks like:

Now we can create a table from our DataFrame and start writing SQL commands on it:

df.createOrReplaceTempView("device_telemetry_data")

Create the final DataFrame and write stream to Delta table

We’re almost there. We have the data we receive from our IoT device in a Spark SQL table, which enables us to transform it easily with SQL commands.

Tables in a Big Data ecosystem are supposed to be partitioned. I mean they better be, otherwise they’ll cause all sorts of problems. The reason I extracted Enqueued_Time from JSON was to be able to partition my table by date/hour. IoT devices produce a lot of data and partitioning them by hour not only makes each partition reasonably sized, but also enable a certain type of analytics to be performed on the data when companies need to predict the performance of their devices at different times of the day or night, for example.

val finalDF = spark.sql("Select Date(Enqueued_Time) Date_Enqueued
, Hour(Enqueued_Time) Hour_Enqueued, Enqueued_Time, Device_ID
, temperature AS Temperature, humidity as Humidity  
from device_telemetry_data")

The resulting DataFrame has the following schema:

The final step is to write the stream to a Delta table:

finalDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .format("delta")
  .partitionBy("Date_Enqueued", "Hour_Enqueued")
  .table("delta_telemetry_data")

Let’s check the options passed to writeStream:

  • outputMode: Specifies how the records of a streaming DataFrame are written to the streaming sink. There are 2 modes:
    • Append: Only the new records will be written to the sink
    • Complete: All records will be written to the sink every time there is an update
    • Update: Only the updated records will be outputed to sink
  • option: checkpointLocation
    • This is needed to ensure fault-tolerance. Basically we specify a location to save all application progress information. This is specially important in case of a Driver failure, read more here.
  • format: The output sink where the result will be written, obviously “delta”.
  • partitionBy: The column(s) by which we want our table to be partitioned by. We decided to partition our table hourly as explained above, so we pass in date and hour.
  • table: The name of the table.

If all the steps above have worked, you should be able to query your table and see the records inserted into the Delta table by running the following command:

%sql
SELECT * FROM delta_telemetry_data

And we’re done! Now we have a table in our Delta Lake that holds our IoT devices data. You can take it from here and do ML on the data collected or mix it with other tables you might have in your Delta Lake. And definitely feel free to ask your questions below in comments section.

From Monolithic Architecture to Microservices and Event-Driven Systems

Featured

I’m a massive fan of streaming and real time data processing and solutions. I strongly believe a lot of use cases are going to be defined and implemented around fast and streaming data in near future, especially in IoT and streaming analytics. With 5G rolling out soon and its superfast bandwidth and wide geographical coverage, it’ll be much easier to capture and move data from devices in different locations to analyse and act upon.

In this post I am going to write about the traditional architecture for system development and why we need a new model, how streaming helped Microservices evolve into event-driven systems and advantages of using Kafka as the central data pipeline across the organisation.

Monolithic Architecture

Monolithic architecture is the traditional design and development approach where monolith application is built as one single unit.

A monolithic application is built in 3 parts:

  • A database, consisting of many tables usually in an RDBMS
  • A client-side UI, which is where users interact with the application
  • A server-side application that handles HTTP requests (by executing some domain specific logic), retrieve data and populate or update the UI

Some of the limitations of Monolithic architecture are:

  • Changes to the application are extremely slow: All components are highly coupled, which means changes usually result in a complete overhaul of the application
  • There is one code base, every small change result in a completely new release and version of the solution

Microservices Architecture

In a Microservices approach, applications and systems are broken into independent and modular components based on specific business capabilities. These capabilities are defined formally with business-oriented APIs, each of which implementing a specific business logic and function.

Since the function each Microservice provides are independent of each other, the implementation of components are completely hidden from other services as well.

This application of loose coupling minimizes the dependency between services and their consumers. They just need to know the format and type of output provided by the previous application in chain of Microservices and make sure their own output complies with what is expected by the next downstream service, through light-weight protocols. In other words, each Microservice calls the one it has dependency on, gets the result of its operation on the data, and applies the next bit of logic before passing on the result to the next application.

Advantages of Microservices are:

  • Isolation and Resilience: If one service fails, another one can be pinned up very quickly. The better approach is to have each layer in HA mode, to minimize the downtime
  • Scalability: Each service needs minimal resources and therefore scales easier
  • Autonomously Deployed: Upgrade and maintenance becomes very easy and effective through CI/CD
  • Relationship to Business: Each business unit owns their Microservices, as opposed a giant usually inefficient IT department

Event-Driven Microservices

Microservices architecture was an evolution of monolithic architecture and came from the realization that the bigger and more complex the systems get, the more inefficient they become and higher their cost of maintenance will be.

When it comes to backend and data storage, each Microservice is expected to have its own space to work, independent of other Microservices it interacts with. There are 2 options to achieve this: 1) Separate databases for each Microservice, 2) Separate schema in the same data store per Microservice.

The first approach is more traditional where multiple instances of, for example, MySQL are created and used by applications. It provides more independent and resilient Microservices, if those instances of database engines run on separate physical servers. The second approach is more modern and is popular among companies with on-premise or cloud-based big data solutions. And it’s resilient from the backend point of view as well, since all big data solutions have some level of replication and high availability incorporated in them.

Modern Microservices are all about making systems event-driven: instead of making remote requests and waiting for the response (services and components calling each other and tell each other what to do), we can send notifications to related microservices when an event occurs.

These events are facts about the business. For example, an ATM or online transaction, a new log entry, or a customer registering for a new mobile plan. They are the data points collected by organizations that make their datasets. The good thing is, we can store these events in the very same infrastructure that we use to broadcast them: Apache Kafka. The better thing is we can even process them in the same infrastructure with Stream Processing applications. This means our applications and systems are linked via this central data pipeline, that is capable of real time data broadcast and processing and all data sources are shared via this data pipeline.

In this architecture, the data that is processed and made ready to be used by applications is kept in Kafka topics and Microservices listen to those topics as the data streams in. When an event lands in a topic, all Microservices that have subscribed to the topic receive the data in real time and act upon it: landing data in a topic is like a notification that goes out to related applications.

Stream Branching

In the case that some Microservices need to work on a subset of the events in a Kafka topic instead of all of them, it is very inefficient to have them subscribed to the original topic and examine all records to find out which one they need to work on. Instead, we can have a streaming application to branch out the events in the original topic and redirect them to subsequent topics based on their kind. And since stream processing with Kafka is extremely efficient and fast, we get much better performance end to end.

The same principal applies to Microservices’ output as well. They read events from topics, do their things, and write the results back to output topics based on the business logics. And this becomes the chain of Microservices and Kafka topics.

Advantages of Event-Driven Microservices

So far we discussed how Kafka can be used as a source of truth to hold source data, act as the processing engine that transforms, cleanses and branches data and makes it ready to be used by Microservices and applications. This streaming backbone comes with a few other advantages worth mentioning:

High speed

The main use cases where Kafka is used are streaming and real time use cases. The reason is that Kafka is able to provide milliseconds response time needed in those scenarios. And that is the performance we will get all across the organization by using Kafka as the Enterprise Service Bus backbone for our Microservices architecture.

Increased agility and expandability

Having this high performing streaming backbone simplifies development and deployment of new use cases. As a result, the whole organization becomes more agile and able to respond to change as well as expand and answer new questions more efficiently and quickly.

Less pressure on source systems

In this architecture we read data from source systems once and keep them in Kafka topics for different applications to read from. This means all subsequent calls for data are answered by Kafka, not the source systems. And therefore, we don’t interfere with data generators.

Potential for fully asynchronous and non-blocking solutions

Obviously, we were aiming for more Independent and non-blocking applications from the beginning. Breaking down our application into Microservices means the components that build our solution can work at different paces. Also, we can deploy multiple instances of each Microservices component to work on subsets of events in parallel.

Machine Learning and Event-Driven Microservices Architecture

We discussed that events form the datasets an organisation collects and stores. We also discussed why Kafka is the best place to store these events and how it enables more effective Microservices implementation.

At the high level, a machine learning model consists of two different parts: model training and prediction. Training is the stage where historical data is used to learn the patterns within the data and prediction is where the algorithm predicts what’s going to happen based on the newer data.

Kafka and KSQL make machine learning both easy and scalable. Writing SQL statements is probably the easiest way to filter, enrich and transform data and with KSQL we can do that for the events that stream in. As for model training, we can set the retention period of the Kafka topic to a reasonable time period and point the model to those topics to be trained.

And finally, the trained models can be embedded in stream processing applications and deployed as a new Microservice.

What we get from above mentioned approach is an ML model and application that receives events as they stream in and spits out predictions in real time. You can read more about ML in the world of event-driven Microservices here: https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning

Conclusion

Companies have already started to get away from monolithic architecture because of its high cost of maintenance and upgrade. With Microservices approach, applications are split into small components which are less heavy-weight and focus on specific pieces of business logic. Event-driven architecture took Microservices to the next level and enabled it to respond to incoming events with more agility and flexibility. With Kafka as the backbone of event-driven systems, organisations are able to detect, process and respond to events and even predict the next events  in real time.Apache Kafka is much more than a messaging system now, and that’s what progressive companies across the world have realised. It can be used as a message bus, event processing engine and even a fully ACID compliant database, see more here: https://www.youtube.com/watch?v=v2RJQELoM6Y

Resources:

https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
https://www.bmc.com/blogs/microservices-architecture/
https://www.confluent.io/blog/build-deploy-scalable-machine-learning-production-apache-kafka/

Use Streaming Analytics to Identify and Visualise Fraudulent ATM Transactions in Real-Time

Featured

Storing and analysing big amounts of data is not the differentiator of successful companies in the process of decision making anymore. Today’s world is about how fast decision makers are provided with the right information to be able to make the right decision before it’s too late.

Streaming Analytics is correctly referred to as Perishable Insights, “which must be acted on within a given timeframe or else the chance to influence business outcomes will pass.” [Forbes]

The thing is, many companies see implementing streaming analytics platform as a very challenging and costly project. It is mainly because when talking to advisors or consultants, they are usually presented with a very complex architecture that needs a lot of effort and money to set it up and get it going.

That is not the reality of streaming analytics. When proper set of technologies and tools used, such platforms could be set up very quickly and effectively. Below is the diagram showing the architecture of the platform that I found working perfectly:

Looks beautiful and simple, doesn’t it? We’re going to use this architecture and build a solution that identifies fraudulent ATM transactions in real time. In 3 steps:

Step 1: Build and analyse streams of data

Everyone who has worked or is working in the field of Big Data have heard of Kafka. It’s the backbone of any streaming application and solution and was invented because in most streaming cases the speed at which data is generated in source is a lot faster than the speed at which it is consumed in the destination. Therefore Kafka topics are required to act as a buffer and get hold of the records until they are read by the consumers.

Like any other streaming solution, we’re going to use Kafka here as well. But not any edition of it, we’ll be using Confluent Kafka. Confluent has not only put great add-ons around Kafka and made it a lot easier to be deployed and managed, they are the pioneer in stream processing. Furthermore, it scales very efficiently and is able to work with very fast and very big data without any hiccups.

The most interesting component in Confluent platform for me is KSQL. It provides SQL on top of streams of data. And that sounds like heaven for someone like me who has spent most of his professional life writing and optimising SQL queries. And I love the logo too!

ksql

For the first part of this solution, I followed this blog post and created the streams and processed them with KSQL. The steps I took were:

./bin/confluent start

./bin/kafka-topics –topic atm_txns –zookeeper localhost:2181 —create –partitions 1 –replication-factor 1

  • Follow the blog post and create and process the stream. Just as the reference, your final Stream should look like this:

CREATE STREAM ATM_POSSIBLE_FRAUD  \
    WITH (PARTITIONS=1) AS \
SELECT  TIMESTAMPTOSTRING(T1.ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS T1_TIMESTAMP,  TIMESTAMPTOSTRING(T2.ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS T2_TIMESTAMP, \
        GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, ‘KM’) AS DISTANCE_BETWEEN_TXN_KM, \
        (T2.ROWTIME – T1.ROWTIME) AS MILLISECONDS_DIFFERENCE,  \
        (CAST(T2.ROWTIME AS DOUBLE) – CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 AS MINUTES_DIFFERENCE,  \
        GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, ‘KM’) / ((CAST(T2.ROWTIME AS DOUBLE) – CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 / 60) AS KMH_REQUIRED, \
        T1.ACCOUNT_ID AS ACCOUNT_ID, \
        T1.TRANSACTION_ID, T2.TRANSACTION_ID, \
        T1.AMOUNT, T2.AMOUNT, \
        T1.ATM, T2.ATM, \
        T1.location->lat AS T1_LAT, \
        T1.location->lon AS T1_LON, \
        T2.location->lat AS T2_LAT, \
        T2.location->lon AS T2_LON \
FROM   ATM_TXNS T1 \
       INNER JOIN ATM_TXNS_02 T2 \
        WITHIN (0 MINUTES, 10 MINUTES) \
        ON T1.ACCOUNT_ID = T2.ACCOUNT_ID \
WHERE   T1.TRANSACTION_ID != T2.TRANSACTION_ID \
  AND   (T1.location->lat != T2.location->lat OR \
         T1.location->lon != T2.location->lon) \
  AND   T2.ROWTIME != T1.ROWTIME;

Step 2: Ingest streams of data into data store in real-time

The next layer we need to implement in this architecture is the ingestion and storage. There are different tools in the market that are able to ingest data in some level of real time like Nifi, StreamSets and maybe Talend. And then for the storage, depending on on-premise or cloud preference, HDFS or Object Storage are the options.

The number one factor that I always consider when suggesting a solution to my clients is integrity and homogeneity in all layers of the purpose-built solutions. And when it comes to streaming where performance is the number one factor, I can’t think of a solution more reliable and faster than MemSQL. If you’re curious to know how fast the database is, watch this video. And be prepared for your mind to be blown!

Another reason I love MemSQL for streaming use cases is how well it integrates with Kafka through MemSQL Pipelines. Take the following steps to set up MemSQL and integrate it with your Confluent platform:

1- Install MemSQL on the environment of your choice: https://docs.memsql.com/guides/latest/install-memsql/

2- Fire off MemSQL CLI and create a new database:

create database streaming_demo_database;

3- Create a new table for the records you receive from Confluent:

CREATE TABLE atm_possible_fraud (INGESTION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
, MESSAGE_FROM_KAFKA JSON
, T1_TIMESTAMP AS MESSAGE_FROM_KAFKA::$T1_TIMESTAMP PERSISTED DATETIME
, T2_TIMESTAMP AS MESSAGE_FROM_KAFKA::$T2_TIMESTAMP PERSISTED DATETIME
, DISTANCE_BETWEEN_TXN_KM AS MESSAGE_FROM_KAFKA::$DISTANCE_BETWEEN_TXN_KM PERSISTED   DOUBLE
,MILLISECONDS_DIFFERENCE AS MESSAGE_FROM_KAFKA::$MILLISECONDS_DIFFERENCE PERSISTED   DOUBLE
,MINUTES_DIFFERENCE AS MESSAGE_FROM_KAFKA::$MINUTES_DIFFERENCE PERSISTED  DOUBLE
,KMH_REQUIRED AS MESSAGE_FROM_KAFKA::$KMH_REQUIRED PERSISTED  DOUBLE
,ACCOUNT_ID AS MESSAGE_FROM_KAFKA::$ACCOUNT_ID PERSISTED  CHAR(100)
,T1_TRANSACTION_ID AS MESSAGE_FROM_KAFKA::$T1_TRANSACTION_ID PERSISTED  CHAR(100)
,T2_TRANSACTION_ID AS MESSAGE_FROM_KAFKA::$T2_TRANSACTION_ID PERSISTED  CHAR(100)
,T1_AMOUNT AS MESSAGE_FROM_KAFKA::$T1_AMOUNT PERSISTED  DOUBLE
,T2_AMOUNT AS MESSAGE_FROM_KAFKA::$T2_AMOUNT PERSISTED  DOUBLE
,T1_ATM AS MESSAGE_FROM_KAFKA::$T1_ATM PERSISTED  CHAR(100)
,T2_ATM AS MESSAGE_FROM_KAFKA::$T2_ATM PERSISTED  CHAR(100)
,T1_LAT AS MESSAGE_FROM_KAFKA::$T1_LAT PERSISTED  DOUBLE
,T1_LON AS MESSAGE_FROM_KAFKA::$T1_LON PERSISTED  DOUBLE
,T2_LAT AS MESSAGE_FROM_KAFKA::$T2_LAT PERSISTED  DOUBLE
,T2_LON  AS MESSAGE_FROM_KAFKA::$T2_LON PERSISTED DOUBLE
);

A couple of points about this create table script:

  • The first column, INGESTION_TIME, is populated automatically when every record is ingested
  • The second column, MESSAGE_FROM_KAFKA, holds the records received from Confluent topics in JSON format
  • The rest are Persistent Computed columns in the table that are computed and populated when each JSON record lands in the table. This is another cool feature of MemSQL, makes it incredible easy to parse JSON data without the need to call any additional script or coding.

4- Create an index on INGESTION_TIME column. This is needed when we get to build our visualisation work in real-time with Zoomdata:

CREATE INDEX inserttime_index ON atm_possible_fraud (Ingestion_Time);

5- Create a pipeline that reads data from Confluent topics and inserts into MemSQL in real-time:

CREATE PIPELINE `atm_possible_fraud` AS LOAD DATA KAFKA ‘[IP_ADDRESS]:9092/ATM_POSSIBLE_FRAUD’ INTO TABLE `atm_possible_fraud` (MESSAGE_FROM_KAFKA);

6- Test and start the pipeline:

TEST PIPELINE atm_possible_fraud;

START PIPELINE atm_possible_fraud;

And we’re done. Now you can start ingesting data into Confluent Kafka topics and they will be replicated in your MemSQL table in real-time. On your Confluent server, make sure you have started your Confluent platform by running:

./bin/confluent status

Then go to the folder where you have downloaded gess and run following commands:

./gess.sh start

nc -v -u -l 6900 | [CONFLUENT_DIRECTORY]/bin/kafka-console-producer –broker-list localhost:9092 –topic atm_txns

And start querying your MemSQL table. You should see records ingested in there as they are produced into Confluent Kafka topic.

Step 3: Visualise your data in real-time

There are so many visualisation tools out there in the market, some of which claim they can visualise your data in real-time. But none of them can truly achieve that. Why? Because they all need to take custody of data to be able to visualise it: each and every record needs to be moved to the server where the visualisation engine runs, processed there and visualised to the users in form of graphs and dashboards.

There is one visualisation tool that is different from every other tool in the market in that it pushes down the query processing to the source where data is stored, and that is Zoomdata. Zoomdata doesn’t move data. It doesn’t take custody of the data. How does it work? glad you asked.

Zoomdata’s smart query engine takes the query that is meant to grab the data for the dashboard, applies its knowledge of the underlying data store and metadata about the tables involved in the visualisation, and breaks the query into many smaller queries called Micro-Queries. The idea is that instead of sending a big query down to the source and wait for the result to come back, it makes more sense to send those smaller queries down and sharpen the visualisation with the result from each micro-query.

Another very important point about Zoomdata is that it is truly self-service. It’s not like Tableau which claims to be self service but then there are certified Tableau developers. If a visualisation tool is simple enough and really is self-service, you shouldn’t need a certification to learn everything about it.

To create a dashboard pointing to our

1- Open Zoomdata UI and login as admin.

2- Click on the Setting from the top menu and then select Sources. From installed connectors, click on MemSQL logo and start configuring the data source:

3- Give your data source and click next. This is the page where you give the details and credentials to connect to your MemSQL cluster. Fill it up and then click next.

4- In the next page you’ll see the list of tables that exist in the data source you created, in our case the MemSQL database. Select “atm_possible_fraud”, the list of columns and sample the top 10 rows of the table will be loaded. Remember to toggle CACHING off, since we are building a real time dashboard. And then click Next:

Click Next.

5- The next page has the list of columns and their data types. Zoomdata infers the data types for each column from the data source and you can change them if they don’t match the type of analysis you want to do or if they are not correct. Read more about this tab here.

Click Next.

6- Next tab is where you define how often you would like to refresh the cached data. We don’t need to make any changes here since we’re not caching any data. Click Save & Next.

7- Now we’re on the Charts tab. This tab is used to define the default behaviour of each visualisation type. This means you can define which columns to be used to render each visualisation when they the dashboard opes for the first time (Zoomdata is 100% self service, meaning that users can change the dashboards at the run time without the need for IT or highly technical resources).

Another very important and interesting feature of your visualisation dashboard will be defined in this tab as well: the Time Bar. Zoomdata follows a streaming architecture, which enables it to connect in “Live Mode” to any data source capable of real-time data. The technology behind this feature is called Data DVR.

In Live Mode, Zoomdata visualisations immediately reflect changes in the source data as data streams in. The live stream can be paused, rewound and replayed, essentially treating it as a Data DVR. Follow these steps to set it up:

  • Select Ingestion Time from the Time Attribute drop-down. That’s the column we had our MemSQL table partitioned by, remember? This would be the column driving our time bar, and it makes sense to choose it: our real time dashboard needs to be able to get the values from our real time data source based on this column very fast.
  • Click on Enable Live Mode checkbox.
  • Select 1 for Refresh Rate and 1 second for Delay By. The idea is that Zoomdata will look for records added to the data source with 1 second delay. In the next versions of Zoomdata, it’ll be able to support milliseconds delays.

Click finish.

You will be redirected to the source page and you’ll see on the top that your new data source is created. Click on New Chart and Dashboard and start building live visualisations:

I leave finishing the visualisation to you. Below is a short video showing the incredible performance this streaming solution can provide. On the left side of the video I kick off my script that publishes records to Confluent Kafka topics, and it takes less than 3 seconds from that point until the visualisation is updated.

Streaming Analytics solution with Confluent, MemSQL and Zoomdata

Our solution was so simple and easy to implement that I could summarise it in one blog post. And at the same time, it’s capable of providing incredible performance running on 3 servers only (one Confluent, one MemSQL and one Zoomdata).

Want to know more about any of these products? Get in touch and let’s have a chat.

UDFs in KSQL: DateAdd

KSQL, the SQL engine for streaming data, is a very powerful tool that helps great deals in Streaming Analytics use cases. It comes with a set of functions that could be used to transform, filter or aggregate data and the good thing is that you can extend it easily by implementing and adding your own UDF (User Defined Function) or UDAF (User Defined Aggregate Function). Let’s see how we can do so and add a simple function to KSQL.

The UDF I want to implement here is DATEADD. If you’re familiar with SQL, you have definitely used it: it takes in a date and adds or subtracts a specific number value to a specific part of datetime, and spits out a new datetime.

To implement a User Defined Function (UDF or UDAF) you would need to code your function in Java and then import the jar file in your KSQL server. You can read about the full process here, I point out a couple of things that I believe you should pay attention to:

  • Make sure you set @UdfDescription and @Udf in your java code properly
  • Change the versions in pom.xml according to your environment. For example:
<confluent.version>5.1.0</confluent.version>
  • Pay attention the data types you can use in your java code. You can use only the following types as parameters or return values of your function:
Java TypeKSQL Type
intINTEGER
IntegerINTEGER
booleanBOOLEAN
BooleanBOOLEAN
longLONG
LongLONG
doubleDOUBLE
DoubleDOUBLE
StringVARCHAR
ListArray
MapMAP

The Code

As I said above, we need to implement our UDF in Java. And let me start talking about the code by saying that I’m not a Java developer. I can code in it, with lots of help from Google, but certainly not the best code optimiser and applier of best practices. So please be gentle:

package com.thebipalace.ksql.udfdateadd;
  
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

import java.util.Calendar;
import java.util.Date;

@UdfDescription(name = "DATEADD", description = "Get previous or future period for a given date")

public class DateAdd {

    @Udf(description = "Get previous or future period for a given date")


    public long dateAdd(final long  date,final String period ,final int amount) { 
        Calendar cal = Calendar.getInstance();
        Date currentDate = new Date(date);
        cal.setTime(currentDate);

        // print current date
        //System.out.println("The current date is : " + cal.getTime());

        Character periodChar = period.toCharArray()[0];
        switch(periodChar){
            case 'Y': cal.add(Calendar.YEAR, amount);
                break;
            case 'M': cal.add(Calendar.MONTH, amount);
                break;
            case 'D': cal.add(Calendar.DAY_OF_MONTH, amount);
                break;
            case 'H': cal.add(Calendar.HOUR, amount);
                break;
            case 'N': cal.add(Calendar.MINUTE, amount);
                break;
            case 'S': cal.add(Calendar.SECOND, amount);
                break;
        }
        return cal.getTime().getTime();
    }
}

It’s a very simple function that takes 3 parameters:

  • date: with a type of long (that represents the number of milliseconds passed since 1/Jan/1970) since KSQL UDFs don’ accept Date data types. You can use KSQL’s TIMESTAMPTOSTRING to convert long or BIGINT values representing dates into readable formats.
  • period: With the data type of String. This is the period you want to add or subtract from in your date. As you can see in the code, options are Year, Month, Day, Hour, Minute and Second.
  • amount: of type Int, the amount of time you want to move date back or forth. e.g. 1 month or 23 days

And spits out a new long representation of the new date/time which is the result of applying the amount of periods on date.

Deployment

To be able to start using the UDF with KSQl you need to deploy it to your KSQL cluster. Steps are listed in the link I mentioned above, this is basically what you need to do:

  • Compile your code by running following command in the root directory of your Java project:
mvn clean package
  • Take the jar file with “_with-dependencies” postfix to the server where KSQL is running and copy it to “<pathtoconfluent>/etc/ksql/ext”. Make sure “ksql-server.properties point to this location, for example”:
ksql.extension.dir=/home/centos/kafka/confluent-5.0.0/etc/ksql/ext/
  • And restart KSQL Server:
<path-to-confluent>/bin/confluent stop ksql-server
<path-to-confluent>/bin/confluent start ksql-server

Then Fire off KSQL CLI:

LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql

And list the functions. DATEADD should be there:

LIST FUNCTIONS;

And there you go. Your new UDF is ready to be used.

Usage

Our new UDF is ready to be used. Just use like any other function in your KSQL queries, here’s an example:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(DATEADD(ROWTIME, 'M', 1), 'yyyy-MM-dd HH:mm:ss') from orders_raw;

This function could be useful for period to period comparisons. For example, you’re running a marketing campaign and you want to compare the number of hits on your website from this year with last year when you were running the same campaign.

Or in sales: month to month comparison on how well your sales is going in real time.

Hope this is useful for some of you out there. Like always, feel free to reach out if you had any questions or comments/feedbacks.

AWS Glue Part 3: Automate Data Onboarding for Your AWS Data Lake

When it comes to building data lakes in AWS s3, it makes even more sense to use Spark. Why? Because you can take advantage of Glue and build ETL jobs that generate and execute Spark for you, server-less. It means you won’t need to worry about building and maintaining EMR clusters, scale them up and down based on when what job runs. Glue takes care of all of it for you.

AWW Glue

In part one and part two of my posts on AWS Glue, we saw how to create crawlers to catalogue our data and then how to develop ETL jobs to transform them. Here we’ll see how we can use Glue to automate onboarding new datasets into data lakes.

On-board New Data Sources Using Glue

On-boarding new data sources could be automated using Terraform and AWS Glue. By onbaording I mean have them traversed and catalogued, convert data to the types that are more efficient when queried by engines like Athena, and create tables for transferred data.

Below is the list of what needs to be implemented. Note that Terraform doesn’t fully support AWS Glue yet, so some steps needs to be implemented manually. See here for more information.

   1- Create s3 folder structure using Terraform (resource “aws_s3_bucket_object”). There are 2 folder structures that needs to be created:

       a- The structure that matches the pattern at which data lands, for example: s3://my_bucket/raw_data/data_source_name/table_name/. You can create multiple folders here, one per table that you’re onboarding.

      b- The structure to store data after it is transferred: s3://my_bucket/processed_data/data_source_name/table_name/.

   2- Create a new database for the source being on-boarded using Terraform. You can create this database in Glue (Terraform resource “aws_glue_catalog_database”) or in Athena (resource “aws_athena_database”). I couldn’t see any difference when I tried both options.

3- Create a new Crawler using Terraform for the new data source (Terraform doesn’t support Glue Crawlers yet, do this step manually until this issue is closed). This is the crawler responsible for inferring data structure of what’s landing in s3 and catalogue and create tables in Athena.

a- Crawler should point to the database related to the source. In example above, it should point to s3://my_bucket/raw_data/data_source_name/

b- Crawler will create one table per subfolder where it’s pointing to in s3, in Athena database (which will be used as source in ETL jobs later). In other words, we’ll need multiple folders in source folder in s3, but only one crawler in Glue.

c- Prefix table name to specify the table type, in this case raw e.g. “raw_”.

Note that tables created by this crawler are only for storing metadata. They won’t be used by users or data engineers to query data, we’ll create another set of tables for that in step 5.

4- Create new Glue ETL job using Terraform

a- Specify schedule according to the frequency at which data lands in s3

      b- ETL job will read the data in raw folder, convert it to Parquet (or any other columnar format like ORC), and store in Processed folder

   5- Create a new Crawler using Terraform to catalogue transformed data (again, you need to do this manually for now)

a- Schedule should match that of the ETL job in step 4. This is to make sure data processed and transformed by ETL is available for queries as soon as possible.

      b- It will create a table in Athena in the database where source table is

      c- Prefix table’s name: “processed_”

 

By following steps above, we have a self-evolving data on-boarding process that we can take from one environment to another in a matter of minutes. A very obvious use case would be to move from non-prod to prod after each source/table is tested and verified, just by pointing our Terraform scripts to the new environment.

Hope this post helps, and please do not hesitate to give me feedback via comments

 

 

 

 

AWS Glue Part 2: ETL your data and query the result in Athena

In part one of my posts on AWS Glue, we saw how Crawlers could be used to traverse data in s3 and catalogue them in AWS Athena.

Glue is a serverless service that could be used to create ETL jobs, schedule and run them. In this post we’ll create an ETL job using Glue, execute the job and then see the final result in Athena. We’ll go through the details of the code generated in a later post.

For the purpose this tutorial I am going to use Glue to flatten the json returned by calling Jira API. It’s a long and complex json response, you can see how it looks like here. We had to do it recently at work and it took 2 analysts 2 days to understand the structure and list out all the fields. Using Glue, it’ll take 15 minutes!

Note that if your JSON file contains arrays and you want to be able to flatten the data in arrays, you can use jq to get rid of array and have all the data in JSON format. More about jq here.

Let’s get started:

1. Navigate to AWS Glue console and click on Jobs under ETL in the left hand pane

2. Click on Add job button to kick off Add job wizard

3. Fill up job properties. Most of them are self-explanatory:

a. Provide name.

b. A role that has full Glue access as well as access to the s3 buckets where this job is going to read data from and write results to, as well as save Spark script it generates.

c. Specify whether you’re going to to use Glue interface to develop the basics of your job, have it run an existing script that is already pushed to s3, or start writing the Spark code from scratch.

In this example we’ll select option 1, to have Glue generate the script for us. We get the option to edit it later, if need be.

d. Specify s3 buckets where your script to be saved for future use and where temporary data would be:

etl_job_properties

4. Select where your source data is. This section lists the tables in Athena databases that the Glue role has access to. We’ll use the table we created in part one:

etl choose source

5. Next step? You guessed it right, choosing the target for your ETL job. I want to store the result of my job as a new table, convert my JSON to Parquet (since its faster and less expensive for Athena to query data stored in columnar format) and specify where I want my result to be stored in s3:

etl choose target

6. Here’s the exciting part. Glue matches all the columns in the source table to columns in the target table it’s going to create for us. This is where we can see how our JSON file actually looks like and flatten it by taking columns we’re interested in out of their respected JSON structs:

a. Expand fields, issuetype and project:

etl map source to dest

b. Remove all the unwanted columns by clicking on the cross button next to them on Target side. W can add the ones that we want to have in our flattened output one by one, by clicking on Add column on top right and then map columns in source to the new ones we just created:

etl map source to dest 2

7. Click Finish

8. The next page you’ll see is Glue’s script editor. Here you can review the Spark script generated for you and either run it as it is or make changes to it. For now we’re going to run it as it is. Click on Run job button. You’ll be asked to provide job parameters, put in 10 for the number of concurrent PDUs and click on Run job:

etl run job

Wait for the job to finish and head to the location in s3 where you stored the result. You’ll see a new file created there for you:

etl result s3

Now that we have our data transformed and converted to Parquet, it’s time to make it available for SQL queries. If you went through my first post on Glue, you’d know the answer is to use Crawlers to create the table in Athena. Follow those steps, create a crawler and have your table available to be queried using SQL. I have done that and this is how my result looks like for what we did together in this document:

etl_athena

Easy, right? You don’t have to worry about provisioning servers, have the right software and version installed on them, and then compete with other applications to acquire resources. That is the power of serverless services offered by cloud providers. Which I personally find very useful, time and cost saving.

 

AWS Glue Part 1: Discover and Catalogue Data Stored in s3

AWS Glue

Glue is a fully managed extract, transform, and load (ETL) service offered by Amazon Web Services. Glue discovers your data (stored in S3 or other databases) and stores the associated metadata (e.g. table definition and schema) in the Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

Once your ETL job is ready, you can schedule it to run on Glue’s fully managed, scale-out Apache Spark environment. It provides a flexible scheduler with dependency resolution, job monitoring, and alerting.

Glue provides out-of-the-box integration with Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and any Apache Hive Metastore-compatible application.

Discover Data Using Crawlers

AWS Glue is able to traverse data stores using Crawlers and populate data catalogues with one or more metadata tables. These tables could be used by ETL jobs later as source or target.

Below are the steps to add a crawler to analyse and catalogue data in an s3 bucket:

1. Sign in to the AWS Management Console and open the AWS Glue console. Choose the         Crawlers tab.

2. Choose Add crawler, it’ll lunch the Add crawler wizard. Follow the Wizard:

a. Specify a name and description for your crawler.

b. Add a data store. Here you have options to specify an s3 bucket or a JDBC connection. After selecting s3, select option for “Specified path in my account” and select folder icon next to “Include path” to select where the data to be crawled is:

Crawler Add Data Source

c. You can add another data source, in case you want to join data from 2 different places together:

Crawler Add Another Datasource

d. Choose an IAM role that has permissions to work with Glue. This role should have full access to run Glue jobs as well as access to the s3 buckets it reads data from and stores script to:

Crawler Choose IAM Role

e. Create a schedule for your Crawler. You can have it run on demand or chose one of the options in drop-down:

Crawler Schedule

f. The next step is to chose the location where the output from your crawler will be stored. This is a database in Athena, and you can pre-fix the name of the tables created by your crawler to be distinguishable easily from other tables in the database:

Crawler Configure Output

g. Review your crawler’s settings and click on Finish. You’ll be redirected to the main Crawlers page, where your crawler is listed.

h. Click on “Run it now?”:

Crawlers Main 2

When crawler finished running, go to Athena console and check your table’s there:

Athena Source Table

Examine table’s DDL. It’s an external table pointing to the location in s3 where your Crawler “crawled”. And start writing queries on it. It’s the first table you created using Glue crawlers. First of many. 🙂