From Monolithic Architecture to Microservices and Event-Driven Systems

Featured

I’m a massive fan of streaming and real time data processing and solutions. I strongly believe a lot of use cases are going to be defined and implemented around fast and streaming data in near future, especially in IoT and streaming analytics. With 5G rolling out soon and its superfast bandwidth and wide geographical coverage, it’ll be much easier to capture and move data from devices in different locations to analyse and act upon.

In this post I am going to write about the traditional architecture for system development and why we need a new model, how streaming helped Microservices evolve into event-driven systems and advantages of using Kafka as the central data pipeline across the organisation.

Monolithic Architecture

Monolithic architecture is the traditional design and development approach where monolith application is built as one single unit.

A monolithic application is built in 3 parts:

  • A database, consisting of many tables usually in an RDBMS
  • A client-side UI, which is where users interact with the application
  • A server-side application that handles HTTP requests (by executing some domain specific logic), retrieve data and populate or update the UI

Some of the limitations of Monolithic architecture are:

  • Changes to the application are extremely slow: All components are highly coupled, which means changes usually result in a complete overhaul of the application
  • There is one code base, every small change result in a completely new release and version of the solution

Microservices Architecture

In a Microservices approach, applications and systems are broken into independent and modular components based on specific business capabilities. These capabilities are defined formally with business-oriented APIs, each of which implementing a specific business logic and function.

Since the function each Microservice provides are independent of each other, the implementation of components are completely hidden from other services as well.

This application of loose coupling minimizes the dependency between services and their consumers. They just need to know the format and type of output provided by the previous application in chain of Microservices and make sure their own output complies with what is expected by the next downstream service, through light-weight protocols. In other words, each Microservice calls the one it has dependency on, gets the result of its operation on the data, and applies the next bit of logic before passing on the result to the next application.

Advantages of Microservices are:

  • Isolation and Resilience: If one service fails, another one can be pinned up very quickly. The better approach is to have each layer in HA mode, to minimize the downtime
  • Scalability: Each service needs minimal resources and therefore scales easier
  • Autonomously Deployed: Upgrade and maintenance becomes very easy and effective through CI/CD
  • Relationship to Business: Each business unit owns their Microservices, as opposed a giant usually inefficient IT department

Event-Driven Microservices

Microservices architecture was an evolution of monolithic architecture and came from the realization that the bigger and more complex the systems get, the more inefficient they become and higher their cost of maintenance will be.

When it comes to backend and data storage, each Microservice is expected to have its own space to work, independent of other Microservices it interacts with. There are 2 options to achieve this: 1) Separate databases for each Microservice, 2) Separate schema in the same data store per Microservice.

The first approach is more traditional where multiple instances of, for example, MySQL are created and used by applications. It provides more independent and resilient Microservices, if those instances of database engines run on separate physical servers. The second approach is more modern and is popular among companies with on-premise or cloud-based big data solutions. And it’s resilient from the backend point of view as well, since all big data solutions have some level of replication and high availability incorporated in them.

Modern Microservices are all about making systems event-driven: instead of making remote requests and waiting for the response (services and components calling each other and tell each other what to do), we can send notifications to related microservices when an event occurs.

These events are facts about the business. For example, an ATM or online transaction, a new log entry, or a customer registering for a new mobile plan. They are the data points collected by organizations that make their datasets. The good thing is, we can store these events in the very same infrastructure that we use to broadcast them: Apache Kafka. The better thing is we can even process them in the same infrastructure with Stream Processing applications. This means our applications and systems are linked via this central data pipeline, that is capable of real time data broadcast and processing and all data sources are shared via this data pipeline.

In this architecture, the data that is processed and made ready to be used by applications is kept in Kafka topics and Microservices listen to those topics as the data streams in. When an event lands in a topic, all Microservices that have subscribed to the topic receive the data in real time and act upon it: landing data in a topic is like a notification that goes out to related applications.

Stream Branching

In the case that some Microservices need to work on a subset of the events in a Kafka topic instead of all of them, it is very inefficient to have them subscribed to the original topic and examine all records to find out which one they need to work on. Instead, we can have a streaming application to branch out the events in the original topic and redirect them to subsequent topics based on their kind. And since stream processing with Kafka is extremely efficient and fast, we get much better performance end to end.

The same principal applies to Microservices’ output as well. They read events from topics, do their things, and write the results back to output topics based on the business logics. And this becomes the chain of Microservices and Kafka topics.

Advantages of Event-Driven Microservices

So far we discussed how Kafka can be used as a source of truth to hold source data, act as the processing engine that transforms, cleanses and branches data and makes it ready to be used by Microservices and applications. This streaming backbone comes with a few other advantages worth mentioning:

High speed

The main use cases where Kafka is used are streaming and real time use cases. The reason is that Kafka is able to provide milliseconds response time needed in those scenarios. And that is the performance we will get all across the organization by using Kafka as the Enterprise Service Bus backbone for our Microservices architecture.

Increased agility and expandability

Having this high performing streaming backbone simplifies development and deployment of new use cases. As a result, the whole organization becomes more agile and able to respond to change as well as expand and answer new questions more efficiently and quickly.

Less pressure on source systems

In this architecture we read data from source systems once and keep them in Kafka topics for different applications to read from. This means all subsequent calls for data are answered by Kafka, not the source systems. And therefore, we don’t interfere with data generators.

Potential for fully asynchronous and non-blocking solutions

Obviously, we were aiming for more Independent and non-blocking applications from the beginning. Breaking down our application into Microservices means the components that build our solution can work at different paces. Also, we can deploy multiple instances of each Microservices component to work on subsets of events in parallel.

Machine Learning and Event-Driven Microservices Architecture

We discussed that events form the datasets an organisation collects and stores. We also discussed why Kafka is the best place to store these events and how it enables more effective Microservices implementation.

At the high level, a machine learning model consists of two different parts: model training and prediction. Training is the stage where historical data is used to learn the patterns within the data and prediction is where the algorithm predicts what’s going to happen based on the newer data.

Kafka and KSQL make machine learning both easy and scalable. Writing SQL statements is probably the easiest way to filter, enrich and transform data and with KSQL we can do that for the events that stream in. As for model training, we can set the retention period of the Kafka topic to a reasonable time period and point the model to those topics to be trained.

And finally, the trained models can be embedded in stream processing applications and deployed as a new Microservice.

What we get from above mentioned approach is an ML model and application that receives events as they stream in and spits out predictions in real time. You can read more about ML in the world of event-driven Microservices here: https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning

Conclusion

Companies have already started to get away from monolithic architecture because of its high cost of maintenance and upgrade. With Microservices approach, applications are split into small components which are less heavy-weight and focus on specific pieces of business logic. Event-driven architecture took Microservices to the next level and enabled it to respond to incoming events with more agility and flexibility. With Kafka as the backbone of event-driven systems, organisations are able to detect, process and respond to events and even predict the next events  in real time.Apache Kafka is much more than a messaging system now, and that’s what progressive companies across the world have realised. It can be used as a message bus, event processing engine and even a fully ACID compliant database, see more here: https://www.youtube.com/watch?v=v2RJQELoM6Y

Resources:

https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
https://www.bmc.com/blogs/microservices-architecture/
https://www.confluent.io/blog/build-deploy-scalable-machine-learning-production-apache-kafka/

UDFs in KSQL: DateAdd

KSQL, the SQL engine for streaming data, is a very powerful tool that helps great deals in Streaming Analytics use cases. It comes with a set of functions that could be used to transform, filter or aggregate data and the good thing is that you can extend it easily by implementing and adding your own UDF (User Defined Function) or UDAF (User Defined Aggregate Function). Let’s see how we can do so and add a simple function to KSQL.

The UDF I want to implement here is DATEADD. If you’re familiar with SQL, you have definitely used it: it takes in a date and adds or subtracts a specific number value to a specific part of datetime, and spits out a new datetime.

To implement a User Defined Function (UDF or UDAF) you would need to code your function in Java and then import the jar file in your KSQL server. You can read about the full process here, I point out a couple of things that I believe you should pay attention to:

  • Make sure you set @UdfDescription and @Udf in your java code properly
  • Change the versions in pom.xml according to your environment. For example:
<confluent.version>5.1.0</confluent.version>
  • Pay attention the data types you can use in your java code. You can use only the following types as parameters or return values of your function:
Java TypeKSQL Type
intINTEGER
IntegerINTEGER
booleanBOOLEAN
BooleanBOOLEAN
longLONG
LongLONG
doubleDOUBLE
DoubleDOUBLE
StringVARCHAR
ListArray
MapMAP

The Code

As I said above, we need to implement our UDF in Java. And let me start talking about the code by saying that I’m not a Java developer. I can code in it, with lots of help from Google, but certainly not the best code optimiser and applier of best practices. So please be gentle:

package com.thebipalace.ksql.udfdateadd;
  
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

import java.util.Calendar;
import java.util.Date;

@UdfDescription(name = "DATEADD", description = "Get previous or future period for a given date")

public class DateAdd {

    @Udf(description = "Get previous or future period for a given date")


    public long dateAdd(final long  date,final String period ,final int amount) { 
        Calendar cal = Calendar.getInstance();
        Date currentDate = new Date(date);
        cal.setTime(currentDate);

        // print current date
        //System.out.println("The current date is : " + cal.getTime());

        Character periodChar = period.toCharArray()[0];
        switch(periodChar){
            case 'Y': cal.add(Calendar.YEAR, amount);
                break;
            case 'M': cal.add(Calendar.MONTH, amount);
                break;
            case 'D': cal.add(Calendar.DAY_OF_MONTH, amount);
                break;
            case 'H': cal.add(Calendar.HOUR, amount);
                break;
            case 'N': cal.add(Calendar.MINUTE, amount);
                break;
            case 'S': cal.add(Calendar.SECOND, amount);
                break;
        }
        return cal.getTime().getTime();
    }
}

It’s a very simple function that takes 3 parameters:

  • date: with a type of long (that represents the number of milliseconds passed since 1/Jan/1970) since KSQL UDFs don’ accept Date data types. You can use KSQL’s TIMESTAMPTOSTRING to convert long or BIGINT values representing dates into readable formats.
  • period: With the data type of String. This is the period you want to add or subtract from in your date. As you can see in the code, options are Year, Month, Day, Hour, Minute and Second.
  • amount: of type Int, the amount of time you want to move date back or forth. e.g. 1 month or 23 days

And spits out a new long representation of the new date/time which is the result of applying the amount of periods on date.

Deployment

To be able to start using the UDF with KSQl you need to deploy it to your KSQL cluster. Steps are listed in the link I mentioned above, this is basically what you need to do:

  • Compile your code by running following command in the root directory of your Java project:
mvn clean package
  • Take the jar file with “_with-dependencies” postfix to the server where KSQL is running and copy it to “<pathtoconfluent>/etc/ksql/ext”. Make sure “ksql-server.properties point to this location, for example”:
ksql.extension.dir=/home/centos/kafka/confluent-5.0.0/etc/ksql/ext/
  • And restart KSQL Server:
<path-to-confluent>/bin/confluent stop ksql-server
<path-to-confluent>/bin/confluent start ksql-server

Then Fire off KSQL CLI:

LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql

And list the functions. DATEADD should be there:

LIST FUNCTIONS;

And there you go. Your new UDF is ready to be used.

Usage

Our new UDF is ready to be used. Just use like any other function in your KSQL queries, here’s an example:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(DATEADD(ROWTIME, 'M', 1), 'yyyy-MM-dd HH:mm:ss') from orders_raw;

This function could be useful for period to period comparisons. For example, you’re running a marketing campaign and you want to compare the number of hits on your website from this year with last year when you were running the same campaign.

Or in sales: month to month comparison on how well your sales is going in real time.

Hope this is useful for some of you out there. Like always, feel free to reach out if you had any questions or comments/feedbacks.

How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}

Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue is to increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.

Spark Error CoarseGrainedExecutorBackend Driver disassociated! Shutting down: Spark Memory & memoryOverhead

Another common error we saw in yarn application logs was this:

17/08/31 15:58:07 WARN CoarseGrainedExecutorBackend: An unknown (datanode-022:43969) driver disconnected.

17/08/31 15:58:07 ERROR CoarseGrainedExecutorBackend: Driver 10.1.1.111:43969 disassociated! Shutting down.

Googling this error suggests increasing spark.yarn.driver.memoryOverhead or spark.yarn.executor.memoryOverhead or both. That has apparently worked for a lot of people. Or at least those who were smart enough to understand how these properties work.

What you need to consider here is that memoryOverhead is allocated out of the total amount of memory available to driver or executor, which is controlled by spark.driver.memory & spark.executor.memory.

What this means is that if you’re increasing executor’s or driver’s memoryOverhead, double check if there is enough memory allocated to driver and executor or not. In our case, the user was allocating all the memory available to driver as memoryOverhead, which meant there was none left for other other driver operations:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=8g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=8g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

You can clearly see what I meant in above paragraph. Instead of doing this, user should have increased executor and driver memory according to increase in executor memory overhead:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=16g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=16g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

 

Spark Error: Failed to Send RPC to Datanode

This past week we had quite few issues with users not being able to run Spark jobs running in YARN Cluster mode. Particularly a team that was on tight schedule used to get errors like this all the time:

java.io.IOException: Failed to send RPC 8277242275361198650 to datanode-055: java.nio.channels.ClosedChannelException

Mostly accompanied by error messages like:

org.apache.spark.SparkException: Error sending message [message = Heartbeat(9,[Lscala.Tuple2;@e47ba81,BlockManagerId(9, datanode-50 , 43381))]

ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed. This could happen because of a number of reasons:

1- We realized this happens a lot more often when our cluster is too busy and has hit maximum usage. What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.

2- Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:

spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

Please note that depending on your project and code, you may need to increase the values mentioned above.

3- Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout. Default values for these 2 parameters are 10s and 120s. You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

If none of what mentioned above helps your situation, then it is something you need to take to your cluster’s administrator. There could be something wrong with the datanodes where executors are sent to that admins are not aware of.

Happy coding!

YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler