Question

Error Reading JSON message from KAFKA topic

Hello Every one,

I am working on 7.4 and for my Use case

I am streaming the JSON message to Kafka topic which is configured in the external infrastructure (ECP).The hand shake is successful with below steps-

1. I am able to stream messages to Kafka topic where i am storing the JSON message in one string say .pyNote and stream to KAFKA successfully using DataSet-Execute method

I am trying to to retrieve the message and i tried below 2 options -

A. DataSet-Execute method on step page ReadEventsFromKafka of work class with operation as "browse" and store results in "OperationResult" of Code-Pega-List but i am getting error while reading from the topic "com.pega.pegarules.pub.PRRuntimeException: Exception during data set execution"

B. DataFlow-Execute method on step page ReadEventsFromKafka of work class with operation as "Start" with below steps-

--> call data flow ReadKafkaMessages

--> define the dataset name in the first shape and then defined the custom activity(ReadMsg) in the next shape and using the same step page ReadEventsFromKafka but i am not able to read it and on tracer it gives below error.

Note:: SharedKafka2_MatrixONBEvents is also configured as consumer and producer both.

Can you please help here

m.pega.dsm.dnode.api.dataflow.StageException: Exception in stage: SharedKafka2_MatrixONBEvents at com.pega.dsm.dnode.api.dataflow.StageException.create(StageException.java:39) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageOutputSubscriber.onError(DataFlowStage.java:512) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageInputSubscriber.onError(DataFlowStage.java:380) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:305) at com.pega.dsm.dnode.api.stream.DataSubscriber.onError(DataSubscriber.java:60) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:305) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:175) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.access$000(KafkaBrowseOperation.java:50) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation$1.emit(KafkaBrowseOperation.java:100) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$3.emit(DataObservableImpl.java:161) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:417) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:411) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext$1.run(DataFlowThreadContext.java:161) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:44) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:41) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:109) Caused by: com.pega.dsm.dnode.api.ExceptionWithInputRecord: java.lang.IllegalStateException: Cannot parse json ... 22 more Caused by: java.lang.IllegalStateException: Cannot parse json at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:411) at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:246) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:217) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:163) ... 21 more Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'test': was expecting 'null', 'true', 'false' or NaN at [Source: (com.pega.dsm.dnode.util.ClipboardPageJsonConverter$ByteBufferInputStream); line: 1, column: 6] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:673) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3502) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2837) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchTrue(UTF8StreamJsonParser.java:2771) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:287) ... 24 more

***Edited by Moderator: Pallavi to update platform capability tags***

Comments

Keep up to date on this post and subscribe to comments

March 8, 2019 - 9:31pm

check this link, you should use real time dataflow rule to consume Kafka dataset: https://community.pega.com/knowledgebase/articles/kafka-data-sets-decision-management

September 11, 2019 - 5:01am

i have used above but it didnot resolve the problem can you please suggest

October 17, 2019 - 1:37pm
Response to Nikhil_Garge

Hi Nikhil,

I have data set configured with JSON and when I run by default in Clipboard "OperationResult" I am  getting results.

But when i configured data flow with producer/input as data set and out put with configured activity to store to data base

values are not storing to data base.  Please let me know if you aware of the solution