From Monolithic Architecture to Microservices and Event-Driven Systems

Featured

I’m a massive fan of streaming and real time data processing and solutions. I strongly believe a lot of use cases are going to be defined and implemented around fast and streaming data in near future, especially in IoT and streaming analytics. With 5G rolling out soon and its superfast bandwidth and wide geographical coverage, it’ll be much easier to capture and move data from devices in different locations to analyse and act upon.

In this post I am going to write about the traditional architecture for system development and why we need a new model, how streaming helped Microservices evolve into event-driven systems and advantages of using Kafka as the central data pipeline across the organisation.

Monolithic Architecture

Monolithic architecture is the traditional design and development approach where monolith application is built as one single unit.

A monolithic application is built in 3 parts:

  • A database, consisting of many tables usually in an RDBMS
  • A client-side UI, which is where users interact with the application
  • A server-side application that handles HTTP requests (by executing some domain specific logic), retrieve data and populate or update the UI

Some of the limitations of Monolithic architecture are:

  • Changes to the application are extremely slow: All components are highly coupled, which means changes usually result in a complete overhaul of the application
  • There is one code base, every small change result in a completely new release and version of the solution

Microservices Architecture

In a Microservices approach, applications and systems are broken into independent and modular components based on specific business capabilities. These capabilities are defined formally with business-oriented APIs, each of which implementing a specific business logic and function.

Since the function each Microservice provides are independent of each other, the implementation of components are completely hidden from other services as well.

This application of loose coupling minimizes the dependency between services and their consumers. They just need to know the format and type of output provided by the previous application in chain of Microservices and make sure their own output complies with what is expected by the next downstream service, through light-weight protocols. In other words, each Microservice calls the one it has dependency on, gets the result of its operation on the data, and applies the next bit of logic before passing on the result to the next application.

Advantages of Microservices are:

  • Isolation and Resilience: If one service fails, another one can be pinned up very quickly. The better approach is to have each layer in HA mode, to minimize the downtime
  • Scalability: Each service needs minimal resources and therefore scales easier
  • Autonomously Deployed: Upgrade and maintenance becomes very easy and effective through CI/CD
  • Relationship to Business: Each business unit owns their Microservices, as opposed a giant usually inefficient IT department

Event-Driven Microservices

Microservices architecture was an evolution of monolithic architecture and came from the realization that the bigger and more complex the systems get, the more inefficient they become and higher their cost of maintenance will be.

When it comes to backend and data storage, each Microservice is expected to have its own space to work, independent of other Microservices it interacts with. There are 2 options to achieve this: 1) Separate databases for each Microservice, 2) Separate schema in the same data store per Microservice.

The first approach is more traditional where multiple instances of, for example, MySQL are created and used by applications. It provides more independent and resilient Microservices, if those instances of database engines run on separate physical servers. The second approach is more modern and is popular among companies with on-premise or cloud-based big data solutions. And it’s resilient from the backend point of view as well, since all big data solutions have some level of replication and high availability incorporated in them.

Modern Microservices are all about making systems event-driven: instead of making remote requests and waiting for the response (services and components calling each other and tell each other what to do), we can send notifications to related microservices when an event occurs.

These events are facts about the business. For example, an ATM or online transaction, a new log entry, or a customer registering for a new mobile plan. They are the data points collected by organizations that make their datasets. The good thing is, we can store these events in the very same infrastructure that we use to broadcast them: Apache Kafka. The better thing is we can even process them in the same infrastructure with Stream Processing applications. This means our applications and systems are linked via this central data pipeline, that is capable of real time data broadcast and processing and all data sources are shared via this data pipeline.

In this architecture, the data that is processed and made ready to be used by applications is kept in Kafka topics and Microservices listen to those topics as the data streams in. When an event lands in a topic, all Microservices that have subscribed to the topic receive the data in real time and act upon it: landing data in a topic is like a notification that goes out to related applications.

Stream Branching

In the case that some Microservices need to work on a subset of the events in a Kafka topic instead of all of them, it is very inefficient to have them subscribed to the original topic and examine all records to find out which one they need to work on. Instead, we can have a streaming application to branch out the events in the original topic and redirect them to subsequent topics based on their kind. And since stream processing with Kafka is extremely efficient and fast, we get much better performance end to end.

The same principal applies to Microservices’ output as well. They read events from topics, do their things, and write the results back to output topics based on the business logics. And this becomes the chain of Microservices and Kafka topics.

Advantages of Event-Driven Microservices

So far we discussed how Kafka can be used as a source of truth to hold source data, act as the processing engine that transforms, cleanses and branches data and makes it ready to be used by Microservices and applications. This streaming backbone comes with a few other advantages worth mentioning:

High speed

The main use cases where Kafka is used are streaming and real time use cases. The reason is that Kafka is able to provide milliseconds response time needed in those scenarios. And that is the performance we will get all across the organization by using Kafka as the Enterprise Service Bus backbone for our Microservices architecture.

Increased agility and expandability

Having this high performing streaming backbone simplifies development and deployment of new use cases. As a result, the whole organization becomes more agile and able to respond to change as well as expand and answer new questions more efficiently and quickly.

Less pressure on source systems

In this architecture we read data from source systems once and keep them in Kafka topics for different applications to read from. This means all subsequent calls for data are answered by Kafka, not the source systems. And therefore, we don’t interfere with data generators.

Potential for fully asynchronous and non-blocking solutions

Obviously, we were aiming for more Independent and non-blocking applications from the beginning. Breaking down our application into Microservices means the components that build our solution can work at different paces. Also, we can deploy multiple instances of each Microservices component to work on subsets of events in parallel.

Machine Learning and Event-Driven Microservices Architecture

We discussed that events form the datasets an organisation collects and stores. We also discussed why Kafka is the best place to store these events and how it enables more effective Microservices implementation.

At the high level, a machine learning model consists of two different parts: model training and prediction. Training is the stage where historical data is used to learn the patterns within the data and prediction is where the algorithm predicts what’s going to happen based on the newer data.

Kafka and KSQL make machine learning both easy and scalable. Writing SQL statements is probably the easiest way to filter, enrich and transform data and with KSQL we can do that for the events that stream in. As for model training, we can set the retention period of the Kafka topic to a reasonable time period and point the model to those topics to be trained.

And finally, the trained models can be embedded in stream processing applications and deployed as a new Microservice.

What we get from above mentioned approach is an ML model and application that receives events as they stream in and spits out predictions in real time. You can read more about ML in the world of event-driven Microservices here: https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning

Conclusion

Companies have already started to get away from monolithic architecture because of its high cost of maintenance and upgrade. With Microservices approach, applications are split into small components which are less heavy-weight and focus on specific pieces of business logic. Event-driven architecture took Microservices to the next level and enabled it to respond to incoming events with more agility and flexibility. With Kafka as the backbone of event-driven systems, organisations are able to detect, process and respond to events and even predict the next events  in real time.Apache Kafka is much more than a messaging system now, and that’s what progressive companies across the world have realised. It can be used as a message bus, event processing engine and even a fully ACID compliant database, see more here: https://www.youtube.com/watch?v=v2RJQELoM6Y

Resources:

https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
https://www.bmc.com/blogs/microservices-architecture/
https://www.confluent.io/blog/build-deploy-scalable-machine-learning-production-apache-kafka/

AWS Glue Part 3: Automate Data Onboarding for Your AWS Data Lake

When it comes to building data lakes in AWS s3, it makes even more sense to use Spark. Why? Because you can take advantage of Glue and build ETL jobs that generate and execute Spark for you, server-less. It means you won’t need to worry about building and maintaining EMR clusters, scale them up and down based on when what job runs. Glue takes care of all of it for you.

AWW Glue

In part one and part two of my posts on AWS Glue, we saw how to create crawlers to catalogue our data and then how to develop ETL jobs to transform them. Here we’ll see how we can use Glue to automate onboarding new datasets into data lakes.

On-board New Data Sources Using Glue

On-boarding new data sources could be automated using Terraform and AWS Glue. By onbaording I mean have them traversed and catalogued, convert data to the types that are more efficient when queried by engines like Athena, and create tables for transferred data.

Below is the list of what needs to be implemented. Note that Terraform doesn’t fully support AWS Glue yet, so some steps needs to be implemented manually. See here for more information.

   1- Create s3 folder structure using Terraform (resource “aws_s3_bucket_object”). There are 2 folder structures that needs to be created:

       a- The structure that matches the pattern at which data lands, for example: s3://my_bucket/raw_data/data_source_name/table_name/. You can create multiple folders here, one per table that you’re onboarding.

      b- The structure to store data after it is transferred: s3://my_bucket/processed_data/data_source_name/table_name/.

   2- Create a new database for the source being on-boarded using Terraform. You can create this database in Glue (Terraform resource “aws_glue_catalog_database”) or in Athena (resource “aws_athena_database”). I couldn’t see any difference when I tried both options.

3- Create a new Crawler using Terraform for the new data source (Terraform doesn’t support Glue Crawlers yet, do this step manually until this issue is closed). This is the crawler responsible for inferring data structure of what’s landing in s3 and catalogue and create tables in Athena.

a- Crawler should point to the database related to the source. In example above, it should point to s3://my_bucket/raw_data/data_source_name/

b- Crawler will create one table per subfolder where it’s pointing to in s3, in Athena database (which will be used as source in ETL jobs later). In other words, we’ll need multiple folders in source folder in s3, but only one crawler in Glue.

c- Prefix table name to specify the table type, in this case raw e.g. “raw_”.

Note that tables created by this crawler are only for storing metadata. They won’t be used by users or data engineers to query data, we’ll create another set of tables for that in step 5.

4- Create new Glue ETL job using Terraform

a- Specify schedule according to the frequency at which data lands in s3

      b- ETL job will read the data in raw folder, convert it to Parquet (or any other columnar format like ORC), and store in Processed folder

   5- Create a new Crawler using Terraform to catalogue transformed data (again, you need to do this manually for now)

a- Schedule should match that of the ETL job in step 4. This is to make sure data processed and transformed by ETL is available for queries as soon as possible.

      b- It will create a table in Athena in the database where source table is

      c- Prefix table’s name: “processed_”

 

By following steps above, we have a self-evolving data on-boarding process that we can take from one environment to another in a matter of minutes. A very obvious use case would be to move from non-prod to prod after each source/table is tested and verified, just by pointing our Terraform scripts to the new environment.

Hope this post helps, and please do not hesitate to give me feedback via comments

 

 

 

 

AWS Glue Part 2: ETL your data and query the result in Athena

In part one of my posts on AWS Glue, we saw how Crawlers could be used to traverse data in s3 and catalogue them in AWS Athena.

Glue is a serverless service that could be used to create ETL jobs, schedule and run them. In this post we’ll create an ETL job using Glue, execute the job and then see the final result in Athena. We’ll go through the details of the code generated in a later post.

For the purpose this tutorial I am going to use Glue to flatten the json returned by calling Jira API. It’s a long and complex json response, you can see how it looks like here. We had to do it recently at work and it took 2 analysts 2 days to understand the structure and list out all the fields. Using Glue, it’ll take 15 minutes!

Note that if your JSON file contains arrays and you want to be able to flatten the data in arrays, you can use jq to get rid of array and have all the data in JSON format. More about jq here.

Let’s get started:

1. Navigate to AWS Glue console and click on Jobs under ETL in the left hand pane

2. Click on Add job button to kick off Add job wizard

3. Fill up job properties. Most of them are self-explanatory:

a. Provide name.

b. A role that has full Glue access as well as access to the s3 buckets where this job is going to read data from and write results to, as well as save Spark script it generates.

c. Specify whether you’re going to to use Glue interface to develop the basics of your job, have it run an existing script that is already pushed to s3, or start writing the Spark code from scratch.

In this example we’ll select option 1, to have Glue generate the script for us. We get the option to edit it later, if need be.

d. Specify s3 buckets where your script to be saved for future use and where temporary data would be:

etl_job_properties

4. Select where your source data is. This section lists the tables in Athena databases that the Glue role has access to. We’ll use the table we created in part one:

etl choose source

5. Next step? You guessed it right, choosing the target for your ETL job. I want to store the result of my job as a new table, convert my JSON to Parquet (since its faster and less expensive for Athena to query data stored in columnar format) and specify where I want my result to be stored in s3:

etl choose target

6. Here’s the exciting part. Glue matches all the columns in the source table to columns in the target table it’s going to create for us. This is where we can see how our JSON file actually looks like and flatten it by taking columns we’re interested in out of their respected JSON structs:

a. Expand fields, issuetype and project:

etl map source to dest

b. Remove all the unwanted columns by clicking on the cross button next to them on Target side. W can add the ones that we want to have in our flattened output one by one, by clicking on Add column on top right and then map columns in source to the new ones we just created:

etl map source to dest 2

7. Click Finish

8. The next page you’ll see is Glue’s script editor. Here you can review the Spark script generated for you and either run it as it is or make changes to it. For now we’re going to run it as it is. Click on Run job button. You’ll be asked to provide job parameters, put in 10 for the number of concurrent PDUs and click on Run job:

etl run job

Wait for the job to finish and head to the location in s3 where you stored the result. You’ll see a new file created there for you:

etl result s3

Now that we have our data transformed and converted to Parquet, it’s time to make it available for SQL queries. If you went through my first post on Glue, you’d know the answer is to use Crawlers to create the table in Athena. Follow those steps, create a crawler and have your table available to be queried using SQL. I have done that and this is how my result looks like for what we did together in this document:

etl_athena

Easy, right? You don’t have to worry about provisioning servers, have the right software and version installed on them, and then compete with other applications to acquire resources. That is the power of serverless services offered by cloud providers. Which I personally find very useful, time and cost saving.

 

AWS Glue Part 1: Discover and Catalogue Data Stored in s3

AWS Glue

Glue is a fully managed extract, transform, and load (ETL) service offered by Amazon Web Services. Glue discovers your data (stored in S3 or other databases) and stores the associated metadata (e.g. table definition and schema) in the Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

Once your ETL job is ready, you can schedule it to run on Glue’s fully managed, scale-out Apache Spark environment. It provides a flexible scheduler with dependency resolution, job monitoring, and alerting.

Glue provides out-of-the-box integration with Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and any Apache Hive Metastore-compatible application.

Discover Data Using Crawlers

AWS Glue is able to traverse data stores using Crawlers and populate data catalogues with one or more metadata tables. These tables could be used by ETL jobs later as source or target.

Below are the steps to add a crawler to analyse and catalogue data in an s3 bucket:

1. Sign in to the AWS Management Console and open the AWS Glue console. Choose the         Crawlers tab.

2. Choose Add crawler, it’ll lunch the Add crawler wizard. Follow the Wizard:

a. Specify a name and description for your crawler.

b. Add a data store. Here you have options to specify an s3 bucket or a JDBC connection. After selecting s3, select option for “Specified path in my account” and select folder icon next to “Include path” to select where the data to be crawled is:

Crawler Add Data Source

c. You can add another data source, in case you want to join data from 2 different places together:

Crawler Add Another Datasource

d. Choose an IAM role that has permissions to work with Glue. This role should have full access to run Glue jobs as well as access to the s3 buckets it reads data from and stores script to:

Crawler Choose IAM Role

e. Create a schedule for your Crawler. You can have it run on demand or chose one of the options in drop-down:

Crawler Schedule

f. The next step is to chose the location where the output from your crawler will be stored. This is a database in Athena, and you can pre-fix the name of the tables created by your crawler to be distinguishable easily from other tables in the database:

Crawler Configure Output

g. Review your crawler’s settings and click on Finish. You’ll be redirected to the main Crawlers page, where your crawler is listed.

h. Click on “Run it now?”:

Crawlers Main 2

When crawler finished running, go to Athena console and check your table’s there:

Athena Source Table

Examine table’s DDL. It’s an external table pointing to the location in s3 where your Crawler “crawled”. And start writing queries on it. It’s the first table you created using Glue crawlers. First of many. 🙂

 

Airflow & Celery on Redis: when Airflow picks up old task instances

This is going to be a quick post on Airflow. We realized that in one of our environments, Airflow scheduler picks up old task instances that were already a success (whether marked as success or completed successfully). You can verify this is actually your issue by ssh into your Airflow workers, and run:

ps -ef | grep airflow

And check the DAG Run IDs: most of them are for old runs.

This happens when Celery’s Backend, in our case Redis, has old keys (or duplicate keys) of task runs. So the solution would be to clear Celery queue. And here are the steps to do it when Celery runs on Redis:

1- Stop Airflow Scheduler:

sudo initctl status airflow-scheduler

sudo initctl stop airflow-scheduler

2- Stop webserver:

sudo initctl status airflow-webserver

sudo initctl stop airflow-webserver

3- Stop Celery Flower:

cd /var/lib/airflow/bin

./airflow.sh flower status

./airflow.sh flower stop

4- Stop workers:

cd /var/lib/airflow/bin

./airflow.sh worker status

./airflow.sh worker stop

Now ssh into the server where Redis is running and type “redis-cli” and press enter to get into Redis CLI. Follow steps below to flush Redis DB:

  1. INFO keyspace — List keyspaces

    a. You should get only 1 result back

  2. SELECT 0 — Select Database
  3. config get dir —  Get database file location to take backup
  4. Copy file “xxxx.db” from above location to your home directory
  5. FLUSHDB — Flush database

Now you can start all Airflow services:

1- Scheduler commands

sudo initctl start airflow-scheduler

sudo initctl status airflow-scheduler

2- Webserver commands

sudo initctl start airflow-webserver

sudo initctl status airflow-webserver

3- Flower commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh flower start &

./airflow.sh flower status

4- Worker commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh worker start &

./airflow.sh worker status

 

Go back to Airflow and validate all DAGs are starting and completing successfully.

And he happy ever after! 🙂

How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}

Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue is to increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.