AnsweredAssumed Answered

Apache Flink Queryable State Client - Actor Not found Exception

Question asked by Velumani on Jan 3, 2018
Latest reply on Jan 12, 2018 by cathy

Hi,

I am running a Flink Job which uses the Queryable State feature of Apache Flink(1.3.2). I was able to do that in local mode. When I try to do that in Cluster mode (Yarn Session), I am getting Actor not found Exception. 
Please help me to understand what is missing. 
Exception Trace
Query failed because of the following Exception:
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
        at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
        at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
        at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
        at java.lang.Thread.run(Thread.java:745)
Client Creation Snippet 
 
 Configuration config = new Configuration();
    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
 
    final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
        .createHighAvailabilityServices(config, Executors.newSingleThreadScheduledExecutor(),
            HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
    this.client = new QueryableStateClient(config, highAvailabilityServices);
  }

Outcomes