Question

Kafka connection issue.

Hello all,

We are working on establishing the kafka connection in pega 7.3.1 version. When I am testing with the test connection in the kafka dataset I am able to connect.

But while I am ran the dataflow when the external team publishing data on the kafka topic I could not able to process data to the kafka dataset.

I am seeing following errors in the logs:

Caused by: com.pega.dsm.dnode.api.dataflow.StageException: Exception in stage: ProductUpdates_TS_Test

at com.pega.dsm.dnode.api.dataflow.StageException.create(StageException.java:39) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageOutputSubscriber.onError(DataFlowStage.java:486) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageInputSubscriber.onError(DataFlowStage.java:354) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:302) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.stream.DataSubscriber.onError(DataSubscriber.java:60) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:302) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:180) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.access$000(KafkaBrowseOperation.java:50) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation$1.emit(KafkaBrowseOperation.java:99) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:335) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:53) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$2.emit(DataObservableImpl.java:158) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:335) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:53) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlow$2.run(DataFlow.java:405) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlow$2.run(DataFlow.java:399) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext$1.run(DataFlowThreadContext.java:151) ~[dnode-7.3.1.jar:?]

... 10 more

Caused by: com.pega.dsm.dnode.api.ExceptionWithInputRecord: java.lang.IllegalStateException: Cannot parse json

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:180) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.access$000(KafkaBrowseOperation.java:50) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation$1.emit(KafkaBrowseOperation.java:99) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:335) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:53) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$2.emit(DataObservableImpl.java:158) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:335) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:53) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlow$2.run(DataFlow.java:405) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.api.dataflow.DataFlow$2.run(DataFlow.java:399) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext$1.run(DataFlowThreadContext.java:151) ~[dnode-7.3.1.jar:?]

... 10 more

Caused by: java.lang.IllegalStateException: Cannot parse json

at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:369) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:198) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:218) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.access$000(KafkaBrowseOperation.java:50) ~[dnode-7.3.1.jar:?]

at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation$1.emit(KafkaBrowseOperation.java:99) ~[dnode-7.3.1.jar:?]

Thanks.

***Edited by Moderator: Lochan to move post from Pega Academy to PSC***

Comments

Keep up to date on this post and subscribe to comments

Pega
September 25, 2019 - 2:58am

Hi,

Can you please attach the data flow configuration.

From the above logs I have observed that exception is coming at ' ProductUpdates_TS_Test' component.

What is that component? Is it a strategy or Dataflow.

I have also observed 'Caused by: java.lang.IllegalStateException: Cannot parse json' exception.

What is the data that you are trying to push to kafka dataset?

September 25, 2019 - 10:46am

Hello,

Thanks for the response.

We are using Dataflow rule to load data. I have attached the dataflow configuration and the Json format. please help us to resolve this issue.

Thanks,

Sharath.

 

September 25, 2019 - 2:19pm

Looks like the ClipboardPageJsonConverter cannot handle the data used.  Later on we completely rewrote the deserializer, I wonder the outcome of a newer Pega release.  Do you have a Pega 8.x system to test with the same data?