Saturday, October 10, 2015

SPARK SQL User Defined Functions (UDFs) for WSO2 Data Analytics Server

What are UDFs ?

Generally SPARK-Sequel having some built-in functions, we can use that built-in functions in the Spark script without adding any extra code or calculation. However some times user requirement is not satisfied by that built-in functions. At that time user can write some own custom functions called UDFs and they are operate on distributed data-frames and works row by row unless you're creating an user defined aggregation function. WSO2 DAS has an abstraction layer for generic Spark UDF which makes it convenient to introduce UDFs to the server.

Here I will describe how to write Spark-Sequel UDF Example in Java.

Simple UDF to convert the date into the given date format

Step 1: Create the POJO class

The following example shows the UDF POJO for converting the date in the format of eg:Thu Sep 24 09:35:56 IST 2015 to the date in the format of yyyy-MM-dd. The name of the Spark UDF should be the name of the method defined in the class (in this example it is dateStr). This will be used when invoking the UDF through Spark-SQL. Here dateStr("Thu Sep 24 09:35:56 IST 2015") returns the String “2015-09-24”. (POJO class name: AnalyticsUDF)
















 Step 2: Packaging the class as jar

The custom UDF class you created should be bundled as a jar and added to <DAS_HOME>/repository/components/lib directory.

Step 3: Update spark UDF configuration file

Add the newly created custom UDF to the <DAS_HOME>/repository/conf/analytics/spark/spark-udf-config.xml file as shown in the example below.








(Here org.wso2.carbon.pc.spark.udfs is the package name of the class).


2 comments:

  1. i use following following java class

    package org.wso2.sparkudf;

    import java.text.SimpleDateFormat;
    import java.util.Date;

    public class TimeUDF {

    public static void main(String arg[]){}

    public String time(Long timeStamp) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    Date date = new Date(timeStamp.longValue());
    return sdf.format(date);
    }

    public long current_time(Integer param) {
    return System.currentTimeMillis();
    }
    }

    then I create the appropriate jar file and save the given path.
    then I update the correct .xml file.
    But didn't work properly.

    can you more elaborate this.

    ReplyDelete
  2. Please do share your configuration file. Did you try this udf function (i.e time) through the spark console in DAS ? If so what was the error message you have got ? Can you please change the data type of "timeStamp" parameter to long instead of using wrapper class (Long) and check.

    ReplyDelete