[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Cannot configure akka.ask.timeout

Hi Alex,

The hard code I’ve found is [1] and [2].

We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 

You can also try listing S3 files concurrently, or paste your sample code here.

[2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117

On Dec 13, 2018, at 1:09 AM, Alex Vinnik <alvinnik.g@xxxxxxxxx> wrote:


Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.

What hardcoded value you were referring to? 


On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinnik.g@xxxxxxxxx> wrote:
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143


On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi.bd@xxxxxxxxx> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?


On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinnik.g@xxxxxxxxx> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.


On 2018/07/13 10:24:16, Lukas Kircher <l...@xxxxxxxxx> wrote:
> Hello,>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
> Best regards and thanks for your help,>
> Lukas>