Saturday, October 10, 2015

SPARK SQL User Defined Functions (UDFs) for WSO2 Data Analytics Server

What are UDFs ?

Generally SPARK-Sequel having some built-in functions, we can use that built-in functions in the Spark script without adding any extra code or calculation. However some times user requirement is not satisfied by that built-in functions. At that time user can write some own custom functions called UDFs and they are operate on distributed data-frames and works row by row unless you're creating an user defined aggregation function. WSO2 DAS has an abstraction layer for generic Spark UDF which makes it convenient to introduce UDFs to the server.

Here I will describe how to write Spark-Sequel UDF Example in Java.

Simple UDF to convert the date into the given date format

Step 1: Create the POJO class

The following example shows the UDF POJO for converting the date in the format of eg:Thu Sep 24 09:35:56 IST 2015 to the date in the format of yyyy-MM-dd. The name of the Spark UDF should be the name of the method defined in the class (in this example it is dateStr). This will be used when invoking the UDF through Spark-SQL. Here dateStr("Thu Sep 24 09:35:56 IST 2015") returns the String “2015-09-24”. (POJO class name: AnalyticsUDF)
















 Step 2: Packaging the class as jar

The custom UDF class you created should be bundled as a jar and added to <DAS_HOME>/repository/components/lib directory.

Step 3: Update spark UDF configuration file

Add the newly created custom UDF to the <DAS_HOME>/repository/conf/analytics/spark/spark-udf-config.xml file as shown in the example below.








(Here org.wso2.carbon.pc.spark.udfs is the package name of the class).


Saturday, October 3, 2015

BPMN data publisher in WSO2 Business Process Server

The purpose of writing this blog post is to demonstrate how to publish completed BPMN process and task instances to the WSO2 Data Analytics Server (DAS). Before moving into the configuration part of Business Process Server (BPS) to publish data, first I will give you a brief introduction about this new feature.

The execution data on process instances has to be captured for analyzing the behavior of processes. In order to facilitate this, execution data of BPMN process instances has to be published to WSO2 DAS. Once published, such data can be analyzed and presented from DAS.

This data publishing component can run independent of the process runtime and access instance data using the REST API, Java API or directly from the database. Data streams, data items included in streams, polling frequency, publishing endpoints, etc have to configurable as much as possible.

Proposed solution mainly consisted of two parts. The first step is to access the completed process and task instances data using the activiti engine JAVA API. The next step is to publish instances which grabbed from the step one to WSO2 Data Analytics Server (DAS) through a data publisher.

WSO2 Business Process Server

 

Business Process Management (BPM) is a key technology and a systematic approach to increase productivity and re-energizing businesses, making an organization's work-flow more effective, more efficient and more capable of adapting to an ever-changing environment. WSO2 Business Process Server (BPS) enables developers to easily deploy, manage and monitor business processes and process instances written using the WS-BPEL and BPMN standards through a complete web-based graphical user interface. Here a business process is an activity or set of activities that will accomplish a specific organizational goal.
 

Business Process Model Notation (BPMN)

 

BPMN is a graphical notation which describes the logic of steps in a business process. This notation has been especially designed to coordinate the sequence of processes and messages that flow between participants in different activities. The BPMN provides a common language which allows all the parties involved to communicate processes clearly, completely and efficiently. In this way, BPMN defines the notation and semantics of a Business Process Diagram (BPD).

BPD is a diagram based on the flowchart technique, designed to present a graphical sequence of all the activities that take place during a process. It also includes all relative information for making an analysis. BPD is a diagram designed for the use of process analysts who design, control and manage processes. In a BPD diagram there are a series of graphical elements that are grouped into categories.

Access instances using activiti engine JAVA API


The primary component that have to deal with when designing and developing BPMN processes with Activiti is the Activiti Engine. Here the activiti engine is the entry point to deploy new process definitions, starting new process instances, querying for process instances, user tasks which are running as well as already completed and so on. In the first part, need to capture all the completed process and task instances and for that use a in-built interface called HistoryService which provided in the JAVA API to grab the completed process instances and task instances.

The HistoryService provides an interface to query for completed process and task instances to gain the information about them such as process definition key, process instance id, start time, end time, assignee of the task, owner, claim time and etc. This service exposes mainly query capabilities to access this historic data. Before dive into the configuration part of the server let's try to identify how the historic data about process and task instances is stored inside the Activiti Engine database.

The historic process instances store in the database table called ACT_HI_PROCINST and the historic tasks store in ACT_HI_ACTINST table. A historic process instance is stored in the ACT_HI_PROCINST table when a new process instance is started. So when try to query on historic process instances will also grant results of all created process instances that are still running which don't have an end time yet. When the process instance enters its first user task state a record in the historic activity table is made. Whenever the task is finished, the record is updated with the end time of the task. When the user task is completed, the end time of the task instance is updated with the time at completion.


Data Agent

DAS data agent is used to collect data from WSO2 service-hosting products (such as WSO2 Business Process Server (BPS), WSO2 ESB, WSO2 Application Server (AS), WSO2 Data Services Server (DSS) and etc.) and send to the WSO2 DAS server. DAS server receives data events via its Thrift API. This protocol uses a binary protocol and enables fast data transmission between the service-publishing products and DAS server. Data Agent can be configured early through stream definition so that the data to be extracted from the data-publishing product can be per-defined.
 

Data Publisher


Data publisher generally allow to send data to a predefined set of data fields, to the DAS only specified in the stream definitions. This is a set of fixed data fields, which send through a data bridge. And can also send custom key-value pairs with data events. Here in this project use the thrift Data publisher and it will be sending events to the Data Bridge via thrift using the tcp transport. When starting the server the Data Receiver of the Data Bridge is exposing two thrift ports. The “Thrift SSL port” should be used for secure data transmission and the “Thrift port” for non secure data transfer. To access the secure port and send data need connect to their server with the following url format.

ssl://<Ip address for the server>:<Thrift SSL post>

E.g. ssl://10.100.7.75:7711

Data Streams

 

In the stream concept the data sender has to agree on a set of data types it wish to send in an each data event. When the message is sent to the Data Receiver, the set of types of it wish to send, is sent with the message defining the Stream. This is known as the Stream Definition. Each Stream Definition is uniquely identified using the pair of Stream Name and its Version. Data Stream can be defined in two ways. One is using a Java code, and the other way is defining a Stream as a Java string object with the format of a JSON object. In this project use the JAVA based stream definition and you can see it in the implementation phase in this report.

(Like creating tables before sending any data to store in a database, first define the streams before sending events. Here streams are description of how the sending data look like (like database Schema))

Before sending events I need to define the stream definition to send data. Data Bridge can use various Stream Definition Stores and according to the stream definition store the frequency of defining the streams varies. Below two generic streams can be provided.
    process.instance.stream:
    {
      "name": "BPMN_Process_Instance_Data_Publish",
      "version": "1.0.0",
      "nickName": "",
      "description": "BPMN process instances data",
      "payloadData": [
        {
          "name": "processDefinitionId",
          "type": "STRING"
        },
        {
          "name": "processInstanceId",
          "type": "STRING"
        },
        {
          "name": "startActivityId",
          "type": "STRING"
        },
        {
          "name": "startUserId",
          "type": "STRING"
        },
        {
          "name": "startTime",
          "type": "STRING"
        },
        {
          "name": "endTime",
          "type": "STRING"
        },
        {
          "name": "duration",
          "type": "LONG"
        },
        {
          "name": "tenantId",
          "type": "STRING"
        }
      ]
    }

    task.instance.stream:
    {
      "name": "BPMN_Task_Instance_Data_Publish",
      "version": "1.0.0",
      "nickName": "",
      "description": "BPMN user tasks data",
      "payloadData": [
        {
          "name": "taskDefinitionKey",
          "type": "STRING"
        },
        {
          "name": "taskInstanceId",
          "type": "STRING"
        },
        {
          "name": "processInstanceId",
          "type": "STRING"
        },
        {
          "name": "createTime",
          "type": "STRING"
        },
        {
          "name": "startTime",
          "type": "STRING"
        },
        {
          "name": "endTime",
          "type": "STRING"
        },
        {
          "name": "duration",
          "type": "LONG"
        },
        {
          "name": "assignee",
          "type": "STRING"
        }
      ]
    }
     


      Configuring BPMN Data publisher to publish data to the WSO2 DAS

       Step 1

      In the BPS UI console, first select the Configure tag and then go to BPMN Data Publisher. Now you can see the configuration UI for the publisher as below. Then fill the Thrift API configuration part according to the correct DAS (Data Analytics Server) instance server username password and the thrift url.




      Step 2

      To enable the data publisher for publishing events to the DAS you should have to change the value of the property called dataPublishingEnabled to true. (This configuration includes in the activiti.xml file. You can find it from the <BPS_HOME>/repository/conf folder.)

      Step 3

      After doing the first two steps restart the BPS server instance. (To avoid the port conflict with DAS, change the port offset of the BPS inside of carbon.xml file). Now in the DAS side you should have to persist the stream definitions for both task and process instances separately. You can see the fallowing output when clicking the streams menu item in DAS UI console. For enabling to store the incoming data from the BPS server, need to create two data receivers to capture process and task instances data.  




      How to configure the event receivers ??? Click Here  

      Step 4

      You can see the event flow between event receivers and the persist event store by clicking the menu item called Flow in DAS UI console as below.
        
      Step 5

      Finally you can see the sample published process and task instances through the BPMN data publisher to the data Analytics Server in the following two figures respectively.