AnsweredAssumed Answered

Getting wrong record.. even after seeking to offset while using Maprstreams

Question asked by aklankal on Dec 11, 2017
Latest reply on Dec 19, 2017 by jamesrgrinter

I've a Spark Streaming application that runs on separate Mesos cluster outside of our MapR cluster and consumes messages from MapRStreams. This application was running fine until we upgraded our Mesos environment and resubmitted the Spark Streaming job(as a Marathon job). I started getting the below Exception(please note that enable.auto.commit=false - we store the offsets in our MapRDB table). I restarted the application many times and truncated the offsets table as well, but the error is persistent. I think the issue lies with MaprStreams storing/caching the offset. Is this true? 

 

Exception

17/12/07 11:15:06 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 
(TID 3, 10.8.187.230, executor 3): java.lang.AssertionError: assertion failed:
Got wrong record for spark-executor-group-aeon-seen stream-seen:topic-seen 8 even after seeking to offset 1    
 at scala.Predef$.assert(Predef.scala:170)     
 at org.apache.spark.streaming.kafka09.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)     
 at org.apache.spark.streaming.kafka09.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:226)      
at org.apache.spark.streaming.kafka09.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:192)     
 at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)     
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)     
 at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)     
 at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)     
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)      
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)     
 at org.apache.spark.scheduler.Task.run(Task.scala:99)     
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)     
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 

Below is the environment setup

Spark - 2.1.2

Mapr - 5.2.0

MapRStreams settings -

    auto.offset.reset = earliest

    enable.auto.commit = false

We store the stream offsets in MapR-DB table.

 

Code snippet

   

Map<TopicPartition, Long> fromOffsets = getOffsetFromDB(prop);

JavaInputDStream<ConsumerRecord<String, String>> stream = null;

   

    if(fromOffsets == null) {

    stream =

          KafkaUtils.createDirectStream(

          streamCtx,

            LocationStrategies.PreferConsistent(),

            ConsumerStrategies.Subscribe(topics, kafkaParams)

          );

    } else {

    logger.info("Inside else - fetchiong offset from db");

    logger.info("offsets from db = " + fromOffsets.toString());

    stream = KafkaUtils.createDirectStream(

    streamCtx,

      LocationStrategies.PreferConsistent(),

      ConsumerStrategies.<String, String>Assign(new ArrayList(fromOffsets.keySet()), kafkaParams, fromOffsets)

    );

    }

 

    

stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String,String>>>() {

  @Override

  public void call(JavaRDD<ConsumerRecord<String, String>> arg0)

  throws Exception {

    OffsetRange[] offsetRanges = ((HasOffsetRanges) arg0.rdd()).offsetRanges();

  

    //mapToPair..

   //reduceByKey..

  ...

 

 

     for(OffsetRange o : offsetRanges) {

            Put put = new Put(rowKey.toString().getBytes()); //add consumer group

            put.addColumn(fFamily, Bytes.toBytes(o.partition()), Bytes.toBytes(o.untilOffset()));

            hTable.put(put)

    }

});

Outcomes