AnsweredAssumed Answered

Flink with MapR shading issues

Question asked by ani.desh1512 on Jun 25, 2017
Latest reply on Jun 27, 2017 by ani.desh1512

I am trying to use Flink (1.3.0) with MapR(5.2.1). Basically, I ll be using flink with a mapr client. I ll be using maprfs to store recovery state and checkpoints for flink. Accordingly, I built Flink for Mapr as follows with maven 3.1:

 

mvn clean install -DskipTests -Dscala.version=2.10.6 -Pvendor-repos,mapr -Dhadoop.version=2.7.0-mapr-1703 -Dzookeeper.version=3.4.5-mapr-1604

 

I, then added /opt/mapr/lib/* to Flink classpath, added Datadog metrics entry to config and to test the config, started flink service via:
./bin/jobmanager.sh start local.
In the jobmanager logs, I see the following error:

 

ERROR org.apache.flink.runtime.metrics.MetricRegistry               - Could not instantiate metrics reporter dghttp. Metrics might not be exposed/reported.
java.lang.IllegalStateException: Failed contacting Datadog to validate API key
        at org.apache.flink.metrics.datadog.DatadogHttpClient.validateApiKey(DatadogHttpClient.java:73)
        at org.apache.flink.metrics.datadog.DatadogHttpClient.<init>(DatadogHttpClient.java:61)
        at org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:104)
        at org.apache.flink.runtime.metrics.MetricRegistry.<init>(MetricRegistry.java:129)
        at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:188)
        at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1921)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2322)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2053)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2052)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2139)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2172)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2117)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1992)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1990)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1990)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certi
fication path to requested target
        at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
        at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1949)
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
        at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
        at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
        at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
        at sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1062)
        at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
        at org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:268)
        at org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:238)
        at org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:149)
        at org.apache.flink.shaded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:192)
        at org.apache.flink.shaded.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
        at org.apache.flink.shaded.okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

 

This error disappears when I remove the mapr libs from the Flink_Classpath.
I encounter a similar error (SSL handshake exception, PKIX path build failed) when I try to use aws-sdk(1.11.123) jar in my code and submit that code to flink.

 

I think the shaded libs are causing this error. Am I right in assuming that?

I was also able to pinpoint the particular jar which might be causing this issue. That jar might be json-20080701.jar.

If I do not include this json jar, then when i start the jobmanager I get the error :

java.io.IOException: failure to login: java.lang.NoClassDefFoundError: org/json/JSONException

So, I cannot use maprfs file scheme with Flink without this jar.
Is there any workaround for this?

Outcomes