AnsweredAssumed Answered

Spark Streaming - MapR Streams - Losing partitions!?

Question asked by john.humphreys on Oct 17, 2017
Latest reply on Jan 25, 2018 by john.humphreys

My Issue

 

I have a spark streaming job reading from a MapR streams topic (Kafka Direct method, code below) with 24 partitions and millions of messages coming through.  It works totally fine for days, but at some point I can see that I'm missing some data in my charts (like I'll have 47 points out of 60 for an hour).

 

When I stop the streaming job, I a few partitions at the very end of the topic (e.g. partitions 20-23) reset their offset to 0.  So, it seems like they were acting up even though they were reporting correct offsets in Spark (I print the offsets on every cycle), and then they lose their offset entirely after that.

 

Partitions 1-19 in this case would be totally fine and would have committed/saved their offset.  The amount of partitions affected changes every time it happens, but they're always the last partitions (like x -> 23}.

 

This is blocking a new product release for us, so it's pretty urgent and I greatly appreciate any help!

 

Environment

  • MapR 5.2.0
  • Spark 2.1.0 from MEP 3.0

 

Errors/Logs

I consistently log the offsets on every batch and they look fine even while we're seeing the missing data.  No interesting warn or error logs are provided by the driver or executors when this happens (at least not on the last interval, I'll keep trying to get them in case they do sometimes log).

 

Architecture

We use pretty large batches (like 10 minute intervals with millions of messages, so it's pretty easy to follow).  I know that's a large batch interval for a streaming app; there is a lot of overhead in our processing and it's easier to do less cycles with more records, and streaming is preferable for monitoring/health checks/etc/avoiding scheduling concerns/etc.  Again, it seems quite stable aside from this streams bug losing data.

 

Offset Commit Code


  val
consumerStrategy = ConsumerStrategies.Subscribe[String, String](
    topicNames, kafkaParams)
  val batches = KafkaUtils.createDirectStream[String, String](ssc,
    LocationStrategies.PreferConsistent, consumerStrategy)
...

  batches.foreachRDD(records => {
      val offsetRanges: Array[OffsetRange] = records.asInstanceOf[HasOffsetRanges].offsetRanges
...

val
endRanges = offsetRanges
.map(range => OffsetRange.apply(range.topic, range.partition, range.untilOffset,
range.untilOffset))
  batches.asInstanceOf[CanCommitOffsets].commitAsync(endRanges, new OffsetCommitCallback {...})

Outcomes