UDFs in KSQL: DateAdd

KSQL, the SQL engine for streaming data, is a very powerful tool that helps great deals in Streaming Analytics use cases. It comes with a set of functions that could be used to transform, filter or aggregate data and the good thing is that you can extend it easily by implementing and adding your own UDF (User Defined Function) or UDAF (User Defined Aggregate Function). Let’s see how we can do so and add a simple function to KSQL.

The UDF I want to implement here is DATEADD. If you’re familiar with SQL, you have definitely used it: it takes in a date and adds or subtracts a specific number value to a specific part of datetime, and spits out a new datetime.

To implement a User Defined Function (UDF or UDAF) you would need to code your function in Java and then import the jar file in your KSQL server. You can read about the full process here, I point out a couple of things that I believe you should pay attention to:

  • Make sure you set @UdfDescription and @Udf in your java code properly
  • Change the versions in pom.xml according to your environment. For example:
<confluent.version>5.1.0</confluent.version>
  • Pay attention the data types you can use in your java code. You can use only the following types as parameters or return values of your function:
Java TypeKSQL Type
intINTEGER
IntegerINTEGER
booleanBOOLEAN
BooleanBOOLEAN
longLONG
LongLONG
doubleDOUBLE
DoubleDOUBLE
StringVARCHAR
ListArray
MapMAP

The Code

As I said above, we need to implement our UDF in Java. And let me start talking about the code by saying that I’m not a Java developer. I can code in it, with lots of help from Google, but certainly not the best code optimiser and applier of best practices. So please be gentle:

package com.thebipalace.ksql.udfdateadd;
  
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

import java.util.Calendar;
import java.util.Date;

@UdfDescription(name = "DATEADD", description = "Get previous or future period for a given date")

public class DateAdd {

    @Udf(description = "Get previous or future period for a given date")


    public long dateAdd(final long  date,final String period ,final int amount) { 
        Calendar cal = Calendar.getInstance();
        Date currentDate = new Date(date);
        cal.setTime(currentDate);

        // print current date
        //System.out.println("The current date is : " + cal.getTime());

        Character periodChar = period.toCharArray()[0];
        switch(periodChar){
            case 'Y': cal.add(Calendar.YEAR, amount);
                break;
            case 'M': cal.add(Calendar.MONTH, amount);
                break;
            case 'D': cal.add(Calendar.DAY_OF_MONTH, amount);
                break;
            case 'H': cal.add(Calendar.HOUR, amount);
                break;
            case 'N': cal.add(Calendar.MINUTE, amount);
                break;
            case 'S': cal.add(Calendar.SECOND, amount);
                break;
        }
        return cal.getTime().getTime();
    }
}

It’s a very simple function that takes 3 parameters:

  • date: with a type of long (that represents the number of milliseconds passed since 1/Jan/1970) since KSQL UDFs don’ accept Date data types. You can use KSQL’s TIMESTAMPTOSTRING to convert long or BIGINT values representing dates into readable formats.
  • period: With the data type of String. This is the period you want to add or subtract from in your date. As you can see in the code, options are Year, Month, Day, Hour, Minute and Second.
  • amount: of type Int, the amount of time you want to move date back or forth. e.g. 1 month or 23 days

And spits out a new long representation of the new date/time which is the result of applying the amount of periods on date.

Deployment

To be able to start using the UDF with KSQl you need to deploy it to your KSQL cluster. Steps are listed in the link I mentioned above, this is basically what you need to do:

  • Compile your code by running following command in the root directory of your Java project:
mvn clean package
  • Take the jar file with “_with-dependencies” postfix to the server where KSQL is running and copy it to “<pathtoconfluent>/etc/ksql/ext”. Make sure “ksql-server.properties point to this location, for example”:
ksql.extension.dir=/home/centos/kafka/confluent-5.0.0/etc/ksql/ext/
  • And restart KSQL Server:
<path-to-confluent>/bin/confluent stop ksql-server
<path-to-confluent>/bin/confluent start ksql-server

Then Fire off KSQL CLI:

LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql

And list the functions. DATEADD should be there:

LIST FUNCTIONS;

And there you go. Your new UDF is ready to be used.

Usage

Our new UDF is ready to be used. Just use like any other function in your KSQL queries, here’s an example:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(DATEADD(ROWTIME, 'M', 1), 'yyyy-MM-dd HH:mm:ss') from orders_raw;

This function could be useful for period to period comparisons. For example, you’re running a marketing campaign and you want to compare the number of hits on your website from this year with last year when you were running the same campaign.

Or in sales: month to month comparison on how well your sales is going in real time.

Hope this is useful for some of you out there. Like always, feel free to reach out if you had any questions or comments/feedbacks.

3 thoughts on “UDFs in KSQL: DateAdd

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s