package [Link].
json;
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
/** Some issue is there in this code, not working **/
public class Consumer {
private static Scanner in;
public static void main(String[] args) {
//String topicName = argv[0];
//String groupId = argv[1];
String topicName = "binod"; // Kafka topic name
String groupId = "group_binod_test";
in = new Scanner([Link]);
ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
[Link]();
String line = "";
while () {
line = [Link]();
}
[Link]().wakeup();
[Link]("Stopping consumer .....");
[Link]();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,JsonNode> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
[Link] = topicName;
[Link] = groupId;
}
public void run() {
Properties configProperties = new Properties();
[Link](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "1kafka-
server-ip-address:9092");
[Link](ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"[Link]");
[Link](ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"[Link]");
[Link](ConsumerConfig.GROUP_ID_CONFIG, groupId);
[Link](ConsumerConfig.CLIENT_ID_CONFIG, "simple");
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, JsonNode>(configProperties);
[Link](topicName);
ObjectMapper mapper = new ObjectMapper();
//Start processing messages
try {
while (true) {
ConsumerRecords<String, JsonNode> records =
[Link](Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
ConsumerRecords<String, JsonNode> records =
(ConsumerRecords<String, JsonNode>) [Link](100);
for (ConsumerRecord<String, JsonNode> record : records) {
JsonNode jsonNode = [Link]();
[Link]([Link](jsonNode,[Link]));
}
}
}catch(WakeupException ex){
[Link]("Exception caught " + [Link]());
} catch (JsonProcessingException e) {
[Link]();
} finally{
[Link]();
[Link]("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,JsonNode> getKafkaConsumer(){
return [Link];
}
}
}