Discussion

Kafka custom serializer/deserializer implementation

By default, the Kafka implementation serializes and deserializes ClipboardPages to and from JSON strings.

This document will describe how to implement a custom Java class and use this in your Kafka data set implementation to be able to use custom logic and formats.

The PegaSerde interface

You will have to create a Java class that implements the PegaSerde interface located in the package com.pega.platform.kafka.serde:

/**

* The interface for wrapping a serializer and deserializer for {@link com.pega.pegarules.pub.clipboard.ClipboardPage} objects.

*

* A class that implements this interface is expected to have a constructor with no parameter.

*/

public interface PegaSerde {

/**

* Configure this class, which will configure the underlying serializer and deserializer.

*

* @param tools PublicAPI

* @param configs configs in key/value pairs

*/

void configure(PublicAPI tools, Map<String, ?> configs);

/**

* Convert {@link com.pega.pegarules.pub.clipboard.ClipboardPage} into a byte array.

*

* @param tools PublicAPI associated with the request

* @param clipboardPage page to be serialized in to bytes

* @return serialized bytes

*/

byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage);

/**

* Deserialize a byte array into a {@link com.pega.pegarules.pub.clipboard.ClipboardPage} object.

*

* @param tools PublicAPI associated with the request

* @param data serialized bytes

* @return deserialized typed data

*/

ClipboardPage deserialize(PublicAPI tools, byte[] data);

}

Configure

void configure(PublicAPI tools, Map<String, ?> configs);

The configure method will be called with a map of key value pairs that can be used to pass configuration options to your class. This is how to parameterize initialization of your class while still maintaining the necessary public, zero argument constructor.

Serialize

byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage);

The serialize method is used to serialize a ClipboardPage into a byte array representation of the data format which is then sent to the Kafka producer. For example, if you are sending in JSON format, create a JSON string from the ClipboardPage and transform this string to a byte array so the Kafka consumer can read it as JSON.

Deserialize

ClipboardPage deserialize(PublicAPI tools, byte[] data);

The deserialize method is used to deserialize a byte array representation of the data format, received from the Kafka consumer, and create a ClipboardPage. For example, if you are receiving in JSON format, transform the byte array into a String or JSON object and use this to create a ClipboardPage.

The PegaSerde contract

All PegaSerde implementations must adhere to the following:

  • Implementation must have a public constructor with no parameters
  • Implementation must inherit from PegaSerde (located in com.pega.platform.kafka.serde)
  • Methods should fail hard instead of returning null (returning null will result in an exception but will not have any serialization specific context).
  • Implementations of serialize and deserialize must be implemented in a thread safe manner

Additionally, note that the following invariants apply:

  • One PegaSerde is created per data set
  • The configure method is called once and will contain the following keys:
    • "classname" containing the class name string on which the Kafka data set is created
    • "topicname" containing the topic name that is configured in the Kafka data set
    • Any additional configuration options that are configured in the Kafka data set (see Configure Kafka data set section below)

Example PegaSerde implementation

See below for a trivial example of a PegaSerde implementation. It implements a very simple CSV data format as follows: "key1,value1,key2,value2"

public class CsvPegaSerde implements PegaSerde {

private String className;

@Override

public void configure(PublicAPI tools, Map<String, ?> configs) {

className = configs.get("classname").toString();

}

@Override

public byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage) {

String customerId = clipboardPage.getString("CustomerId");

String customerAge = clipboardPage.getString("CustomerAge");

String value = Joiner.on(",").join("customerid", customerId, "customerage", customerAge);

return value.getBytes();

}

@Override

public ClipboardPage deserialize(PublicAPI tools, byte[] data) {

String value = new String(data);

List<String> values = Splitter.on(",").splitToList(value);

ClipboardPage page = tools.createPage(className, "");

for (int i = 0; i < values.size(); i += 2) {

page.getProperty(values.get(i)).setValue(values.get(i + 1));

}

return page;

}

}

Creating ClipboardPages

Internally, records are represented by ClipboardPage objects, which represent a collection of ClipboardProperty objects. So the data flow source has to deserialize data intoClipboardPages for ingestion by the following shapes, and the destination has to serialize ClipboardPage objects into a byte array to be consumed by Kafka.

You create a ClipboardPage object by going through the Pega PublicAPI (also know as tools) and invoking the createPage(String classname, String pageName) method:

ClipboardPage page = tools.createPage(className, "");

Values can be added to the ClipboardPage by setting then on a named property, which will be created if it does not exist:

page.getProperty("property_name").setValue("property_value");

For more ways of using ClipboardPages refer to the API documentation.

Creating and adding your class to Pega Platform

Use the following steps to create a java project with your PegaSerde implementation and add it to your Pega Platform application.

  1. Create a new or use an existing Java project
  2. Add the prpublic.jar as a dependency to your project
    • You can get the prpublic.jar from the location prweb/WEB-INF/lib on your tomcat server as described in this article
  3. Create a new Java class that implements the com.pega.platform.kafka.serde.PegaSerde interface
    • If you get the following exception "class file for com.pega.ibm.icu.math.BigDecimal not found" also add pricu2jdk.jar using the same method as step 2
  4. Build your project jar as a "fat" jar to ensure all dependencies are included
  5. Import your jar into Pega Platform by going to 'Configure > Application > Distribution > Import' and follow the steps in the wizard, make sure that you leave the Codeset name and version as the default (see screenshot below)
  6. In your prbootstrap.properties set the following parameter com.pega.pegarules.bootstrap.codeset.version.Customer=06-01-01 as described in this article
  7. Restart your system
  8. Configure your Kafka data set as described in the section below.

Configure Kafka data set

To configure your Kafka data set to use your PegaSerde implementation:

  1. create a regular data set of type Kafka
  2. Configure the 'Message Format' section to use the Custom message format
  3. Specify the fully qualified classname of your Java class
  4. (Optionally) specify any additional configuration options to be passed to your PegaSerde configure method

Metrics

When the detailed metrics on a data flow run are enabled (see Data Flow Run with Detailed Metrics), the serialization and deserialization latency times will be shown on the output and source shape statistics respectively. See the screenshot below.

Troubleshooting

Creating the Kafka data set:

Class x cannot be found on the classpath.

This means that the class that you are trying to use as the PegaSerde implementation cannot be found. This can be caused by:

  • The fully qualified class name has not been typed properly. This means package name plus class name with '.' example: com.pega.dsm.kafka.impl.serde.JsonSerde
  • The jar containing the PegaSerde implementation has not been added to the class path, please refer to the section of this document describing how to do this.

Class x does not have a public, zero argument, constructor.

The PegaSerde implementation class has to be instantiated through the Java reflection API, to do this the class needs to have a public constructor with no parameters.

Failed to instantiate class.

There can be several reasons that this exception shows up, all referring to the fact that there is a problem creating the class or performing the code in its constructor.

Please refer to the logs for the underlying exception that prevented the class to be initialized.

Class x is not of type PegaSerde.

When you see this message on the Kafka data set rule form it means that the class that the PegaSerde implementation class has been properly initialized but that it cannot be cast to the PegaSerde type. Make sure that you implement the correct interface found as described in this document, and refer to the logs for a more detailed description of the underlying problem.

Serialization and deserialization exceptions

Serializer/Deserializer returned null value.

This means that the PegaSerde implementation returned a null value while (de)serializing. Implementations should always fail hard, see the section on implementing the PegaSerde interface.

Other (de)serialization exceptions

Usually, this points to an exception occurring inside of the (de)serialization implementation methods and can be caused by problems with the code or the input record(s).

Refer to the logs for more information.

Comments

Keep up to date on this post and subscribe to comments

May 7, 2019 - 4:05am

Thanks for sharing this. Can you please help me with the Pega version where this was introduced?

Pega
May 20, 2019 - 1:33am
Response to Vipin.Agrawal

Hi Vipin, thank you for your question. This functionality is available from Pega Platform 8.2 onwards.

August 1, 2019 - 9:31pm

hi , im trying to locate the class com.pega.platform.kafka.serde and i couldnt find it. Can you tell me what the jar file name is and where I can find it?

Also in this guide it says fat jar. The apache kafka jar files are already present. Do I need to bundle them with the jar file as well?

Pega
August 2, 2019 - 3:27am
Response to DIVAKARJEY12

You'll need the 'prpublic.jar', this describes where to find it: https://community1.pega.com/community/pega-product-support/question/where-can-i-fing-prpublicjar-file-server

With regards to bundling the jar, if any dependencies are already present you do not need to include them. You can use your own discretion here as you would otherwise.

Let me know if you have any more questions.