0% found this document useful (0 votes)
12 views2 pages

Kafka JSON Consumer Example Code

This Java code implements a Kafka consumer that listens to a specified topic and processes JSON messages. It creates a separate thread for consuming messages and uses Jackson for JSON deserialization. There are some issues in the code, such as incorrect handling of ConsumerRecords and potential misconfiguration of the Kafka consumer properties.

Uploaded by

sshankaran333
Copyright
© All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
12 views2 pages

Kafka JSON Consumer Example Code

This Java code implements a Kafka consumer that listens to a specified topic and processes JSON messages. It creates a separate thread for consuming messages and uses Jackson for JSON deserialization. There are some issues in the code, such as incorrect handling of ConsumerRecords and potential misconfiguration of the Kafka consumer properties.

Uploaded by

sshankaran333
Copyright
© All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd

package [Link].

json;

import [Link];
import [Link];
import [Link];

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];
import [Link];
import [Link];

/** Some issue is there in this code, not working **/

public class Consumer {

private static Scanner in;

public static void main(String[] args) {

//String topicName = argv[0];


//String groupId = argv[1];
String topicName = "binod"; // Kafka topic name
String groupId = "group_binod_test";

in = new Scanner([Link]);

ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);


[Link]();
String line = "";
while (![Link]("exit")) {
line = [Link]();
}
[Link]().wakeup();
[Link]("Stopping consumer .....");
[Link]();
}

private static class ConsumerThread extends Thread{


private String topicName;
private String groupId;
private KafkaConsumer<String,JsonNode> kafkaConsumer;

public ConsumerThread(String topicName, String groupId){


[Link] = topicName;
[Link] = groupId;
}

public void run() {


Properties configProperties = new Properties();
[Link](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "1kafka-
server-ip-address:9092");
[Link](ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"[Link]");
[Link](ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"[Link]");
[Link](ConsumerConfig.GROUP_ID_CONFIG, groupId);
[Link](ConsumerConfig.CLIENT_ID_CONFIG, "simple");

//Figure out where to start processing messages from


kafkaConsumer = new KafkaConsumer<String, JsonNode>(configProperties);
[Link](topicName);
ObjectMapper mapper = new ObjectMapper();

//Start processing messages


try {
while (true) {

ConsumerRecords<String, JsonNode> records =


[Link](Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {

ConsumerRecords<String, JsonNode> records =


(ConsumerRecords<String, JsonNode>) [Link](100);
for (ConsumerRecord<String, JsonNode> record : records) {
JsonNode jsonNode = [Link]();

[Link]([Link](jsonNode,[Link]));
}
}
}catch(WakeupException ex){
[Link]("Exception caught " + [Link]());
} catch (JsonProcessingException e) {
[Link]();
} finally{
[Link]();
[Link]("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,JsonNode> getKafkaConsumer(){
return [Link];
}
}
}

You might also like