AnsweredAssumed Answered

MapR-Streams + Spark Streams - Unknown Topic/Partition

Question asked by john.humphreys on Dec 6, 2017
Latest reply on Dec 7, 2017 by john.humphreys

Vinayak Meghraj and Community Manager - Can you please help - having a pretty urgent issue with this?  Vinayak I saw you talking about a linked issue that seems similar to this.

 

I'm trying to explicitly provide offsets for my partition to KafkaUtils.createDirectStream(), but I'm getting an error.

 

I'm attempting to provide the offsets explicitly because I seem to be hitting a MapR Streams bug where some of my topic partitions are losing their offset every now and then.  That issue sounds similar to this one: Restore Spark Streaming with MapR Streams.

 

In any case, I thought I would be able to "fix" the offsets by storing them externally and then starting up the stream with them, but I get this error:

 

org.apache.kafka.common.errors.UnknownTopicOrPartitionException: No such file or directory (2) Could not seek

 ...

2017-12-06 09:25:30,3393 ERROR StreamsListener fs/client/marlin/cc/listener/listenerimpl.cc:699 Thread: 45955 Seek called on unsubscribed partitions 2017-12-06 09:25:30,3394 ERROR StreamsListener fs/client/marlin/cc/listener/jni_listener.cc:626 Thread: 45955 Seek failed with err:2

 

Here's the relevant restore code:


  val offsets = Array(6138805199L, 6138062423L, 6137421118L, 6137236503L, 6141301709L, 6134792020L,
        6134059231L, 6134289530L)
    .zipWithIndex
    .map(x => (new TopicPartition(Config.config.MINUTE_TOPIC, x._2), x._1))
    .toMap

  ...
  val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicNames, kafkaParams, offsets)
  val batches = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,
    consumerStrategy)


Does the error I'm getting imply that the topic partition is corrupted or something?

 

Note that I collected the offsets from this API method, so I'm sure they're okay:

 

https://apihost:8443/rest/stream/topic/info?path=/nmr/eis/sysm/pmp/metrics/dev/maas/streams/internal&topic=minute

Sample results (for first 2 partitions).

 
"timestamp": 1512569305693,
"timeofday": "2017-12-06 09:08:25.693 GMT-0500",
"status": "OK",
"total": 8,
"data": [
{
"partitionid": 0,
"physicalsize": 272514883584,
"logicalsize": 10990088085504,
"maxoffset": 6138805199,
"minoffsetacrossconsumers": 6012704502,
"mintimestamp": "2017-11-13T02:59:33.006-0500",
"maxtimestamp": "2017-12-06T09:02:52.881-0500",
"mintimestampacrossconsumers": "1969-12-31T07:00:00.000-0500",
"fid": "5505.726.968892",
"master": "psclxp00004.nomura.com:5660",
"servers": "psclxp00005.nomura.com:5660, psclxp00007.nomura.com:5660, psclxp00004.nomura.com:5660"
},
{
"partitionid": 1,
"physicalsize": 274408464384,
"logicalsize": 10662451109888,
"maxoffset": 6138062423,
"minoffsetacrossconsumers": 6012364662,
"mintimestamp": "2017-11-13T02:59:33.040-0500",
"maxtimestamp": "2017-12-06T09:02:52.817-0500",
"mintimestampacrossconsumers": "1969-12-31T07:00:00.000-0500",
"fid": "2915.12939.6410016",
"master": "psclxp00004.nomura.com:5660",
"servers": "psclxp00004.nomura.com:5660, psclxp00006.nomura.com:5660, psclxp00005.nomura.com:5660"
},

Outcomes