AnsweredAssumed Answered

Strange Behavior With MapR Streams

Question asked by PETER.EDIKE on Apr 26, 2018
Latest reply on May 15, 2018 by maprcommunity

Hello everyone,

I have observed a strange behavior with MapR Streams, I have the following code in java

 

 

Map<String,Object> properties = new HashMap<>();
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "transaction_profile_group");
      properties.put("spark.kafka.poll.time", 2000);
      logger.info("Done loading application properties");
JavaStreamingContext javaStreamingContext = applicationConfiguration.javaStreamingContext();
      JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =

      KafkaUtils.createDirectStream(
            javaStreamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String,String>Subscribe(topics, properties)

        );
JavaDStream<TransactionLeg> transactionLegJavaDStream = directKafkaStream.map((Function<ConsumerRecord<String, String>, TransactionLeg>) record -> {
logger.info("received stream from "+ record.topic());
TransactionLeg transactionLeg = new TransactionLeg();
transactionLeg.setTransactionLegName(record.topic());
transactionLeg.setTransactionMessage(record.value());
return transactionLeg;
});
if(transactionLegJavaDStream !=null) {

transactionLegJavaDStream.foreachRDD((JavaRDD<TransactionLeg> transactionsJavaRDD) -> {
transactionsJavaRDD.foreach(processTransaction);
});
}

javaStreamingContext.start();
javaStreamingContext.awaitTermination();

 

There are about three topics configured.

 

However when we run the spark streaming job in a MapR Cluster, We keep seeing the following in the Container logs \

2018-04-26 16:40:22,693 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754921700 ms 2018-04-26 16:40:23,696 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754921800 ms 2018-04-26 16:40:24,505 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-0 to 5438002 to compensate 2018-04-26 16:40:24,521 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754921900 ms 2018-04-26 16:40:24,527 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438136 to compensate 2018-04-26 16:40:24,532 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922000 ms 2018-04-26 16:40:24,536 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438323 to compensate 2018-04-26 16:40:24,540 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922100 ms 2018-04-26 16:40:25,513 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-0 to 5438335 to compensate 2018-04-26 16:40:25,540 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922200 ms 2018-04-26 16:40:25,549 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438657 to compensate 2018-04-26 16:40:25,554 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922300 ms 2018-04-26 16:40:26,561 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922400 ms 2018-04-26 16:40:26,659 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-0 to 5438669 to compensate 2018-04-26 16:40:26,665 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922500 ms 2018-04-26 16:40:26,670 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438720 to compensate 2018-04-26 16:40:26,678 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922600 ms 2018-04-26 16:40:26,684 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438808 to compensate 2018-04-26 16:40:26,689 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922700 ms 2018-04-26 16:40:26,692 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438968 to compensate 2018-04-26 16:40:26,696 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922800 ms 2018-04-26 16:40:26,698 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5438993 to compensate 2018-04-26 16:40:26,702 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754922900 ms 2018-04-26 16:40:27,745 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754923000 ms 2018-04-26 16:40:27,852 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-0 to 5439002 to compensate 2018-04-26 16:40:27,858 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754923100 ms 2018-04-26 16:40:27,866 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5439053 to compensate 2018-04-26 16:40:27,884 INFO [JobGenerator] scheduler.JobScheduler: Added jobs for time 1524754923200 ms 2018-04-26 16:40:27,888 INFO [JobGenerator] kafka09.DirectKafkaInputDStream: poll(0) returned messages, seeking /iswdata/streams/products:superswitch_transactions-1 to 5439265 to compensate

 

I am completely at a loss as to why none of my messages are being processed. The job has been on for hours with not even a single message processed successfully. Please what Am I doing Wrong

Outcomes