osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Limit on number of files to read for Dataset


Hi Darshan,

This looks like a file system configuration issue to me.
Flink supports different file systems for S3 and there are also a few tuning knobs.

Did you have a look at the docs for file system configuration [1]?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/filesystems.html

2018-08-14 20:45 GMT+02:00 Darshan Singh <darshan.meel@xxxxxxxxx>:
Thanks for the details. I got it working. I have around 1 directory for each month and I am running for 12-15 month data.So I created a dataset from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than 120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for 6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
    at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown Source)

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
Hi,

Flink InputFormats generate their InputSplits sequentially on the JobManager.
These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <jornfranke@xxxxxxxxx>:
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <darshan.meel@xxxxxxxxx> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks