AnsweredAssumed Answered

UnsatisfiedLinkError when tried to consume MapR Streams in Flink 1.4.0

Question asked by Velumani on Jan 23, 2018
Latest reply on Jan 24, 2018 by cathy

Hi,

I am trying to consume messages from MapR Streams in Flink job. I was able to do that with Flink's 1.3.2 version. When I upgrade to Flink 1.4.0 version I am getting UnsatisfiedLinkError Exception. 

I believe there is some version mismatch in Kafka jars. Plese, help in resolving this. 

 

pom.xml

<dependencies>

<!-- MapR Dependencies -->
<dependency>
<groupId>com.mapr.streams</groupId>
<artifactId>mapr-streams</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-mapr-1703-streams-5.2.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.14.2</version>
</dependency>

<!-- Flink Connector Kafka | exclude Kafka implementation to use MapR -->

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.4.0</version>

<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>

</dependency>


<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.7</version>
</dependency>


</dependencies>

 

Code
Properties properties = new Properties();
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("bootstrap.servers", "");

FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(
"/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);

DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);

 

Exception Trace

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:215)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at com.mapr.demos.EventCountSliding.main(EventCountSliding.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error occurred while instantiating com.mapr.streams.impl.listener.MarlinListener.
==> java.lang.UnsatisfiedLinkError: com.mapr.fs.jni.MarlinJniListener.OpenListener(Ljava/lang/String;Ljava/lang/String;IZZJJIIILjava/lang/String;Ljava/lang/String;JZ)J.
at org.apache.kafka.clients.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:41)
at org.apache.kafka.clients.consumer.KafkaConsumer.initializeConsumer(KafkaConsumer.java:616)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1590)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.kafka.clients.mapr.GenericHFactory.getImplementorInstance(GenericHFactory.java:38)
... 11 more
Caused by: java.lang.UnsatisfiedLinkError: com.mapr.fs.jni.MarlinJniListener.OpenListener(Ljava/lang/String;Ljava/lang/String;IZZJJIIILjava/lang/String;Ljava/lang/String;JZ)J
at com.mapr.fs.jni.MarlinJniListener.OpenListener(Native Method)
at com.mapr.streams.impl.listener.MarlinListenerImpl.<init>(MarlinListenerImpl.java:156)
at com.mapr.streams.impl.listener.MarlinListener.<init>(MarlinListener.java:58)

Outcomes