AnsweredAssumed Answered

Message present in topic but not in Rdd

Question asked by onkar on Dec 1, 2017
Latest reply on May 14, 2018 by amigo23

Hi,

   We have monitoring spark application (Spark Dstream) which monitors other spark applications running on MapR cluster. Whenever application starts it sends message to an error topic.Monitoring job reads records from this Error topic  and sends an email if any record is received.In this code we are facing following issue:

val kafkaParams = Map[String, String](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "dummy",
  ConsumerConfig.GROUP_ID_CONFIG -> streamConfiguration.groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
    "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
    "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> streamConfiguration.offsetReset,
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)

//Subscribe to topics and create a dstream
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](streamConfiguration.
  setOfTopics, kafkaParams)
val messagesDStream = KafkaUtils.createDirectStream[String, String](
  sparkStreamingContext, LocationStrategies.PreferConsistent, consumerStrategy)

messagesDStream.foreachRDD { rdd: RDD[ConsumerRecord[String, String]] =>

log.error(rdd.count()) // Shows correct value i.e 1 in case of single record
log.error(rdd.count()) // Shows 0
}

1. As shown in above code snippet, When monitoring job reads messages in form of Dstream, an iterating over it 1st count method on rdd shows correct no of records (1 in our case) but on the very next line same function shows 0 record. Does that infer that Rdd is lost after calling count method i.e first action.? This seems very strange but we are not sure is there anything that we are missing.

 

2.  Whenever next record arrives on Dstream (Lets say X), that record is not logged and count method shows 0 record, but after arrival of 2nd record (Lets say Y) it shows count as 1  and prints previous record (i.e X).

 

 

Spark submit command:

/opt/mapr/spark/spark-2.1.0/bin/spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.submit.waitAppCompletion=false --num-executors 1 --name Monitor --executor-memory 2g --driver-cores 2 --executor-memory 2g  --executor-cores 2 --driver-memory 2g /home/mapr/spark/monitoring/jar/app.jar set_of_topics=/data:ErrA  error_topic=/data:Err batch_interval=10 max_messages_per_partition=5000

 

 

Update: We are using MapR 5.2.2  and Spark 2.1.0 and we can see some of the bug fixes in latest release of MapR 6.0 . (Spark 2.1.0-1710 Release Notes ). By any chance is it related to "late arriving message" or "offset issue" which got fixed in MapR 6.0. Please update.

Outcomes