AnsweredAssumed Answered

MapR Streams - Cursor updates failed due to checkAndPut condition.

Question asked by john.humphreys on Dec 6, 2017
Latest reply on Dec 8, 2017 by cathy

 

I'm trying to manually set/reset the offset of a partition of a MapR Streams topic using the Java API.  It seems to work okay for new consumer groups, but existing ones give the following error when I try to commit the new offset.

 

Why does this error occur and what does it mean?  Is there a way around it?

Position pre-seek: 241200001
Position post-seek: 5880289075
** Starting to commit offset.
Committed partition: 241200001
Application done.
2017-12-06 16:28:44,9762 ERROR StreamsListener fs/client/marlin/cc/listener/listenerimpl.cc:2631 Thread: 25726 Cursor updates failed due to checkAndPut condition for key \x03ctminute:00000001p007Gmaas.digestion.minute on fid 2192.19176.49966934 assignSeq 1 prevCommitOffset -1 curCommitOff 5880289075


  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig);
  //TopicPartition partitionX = new TopicPartition(topic, 7);
  TopicPartition partitionY = new TopicPartition(topic, 7);
  consumer.assign(Arrays.asList(partitionY/*, partitionX*/));
 
  Map
<TopicPartition, OffsetAndMetadata> m = new HashMap<>();
  m.put(partitionY, new OffsetAndMetadata(5880289075L));

  System.out.println("Position pre-seek: " + consumer.position(partitionY));
  consumer.seek(partitionY, 5880289075L);
  System.out.println("Position post-seek: " + consumer.position(partitionY));

  System.out.println("** Starting to commit offset.");
  consumer.commitAsync(m, new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        System.out.println("** Offset commit complete;");
        if (exception != null) {
            System.out.println("Exception: " + exception.getMessage());
        }
        if (offsets != null) {
            System.out.println("Offsets: " + offsets);
        }
    }
  });

  System.out.println("Committed partition: " + consumer.committed(partitionY).offset());

Outcomes