[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

S3A AWSS3IOException from Flink's BucketingSink to S3

I have a Flink app with high parallelism (400) running in AWS EMR. It uses Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that each worker will write to a part file in S3. This means all workers combined could potentially generate/write to 400 files (due to 400 parallelism).

After a few days, one of the workers will fail with the exception:

    org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly but consistently on separate job executions. When it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint. 

What is the cause for this and how can it be resolved? Also, how can the job be configured to restart/recover from the last successful checkpoint instead of staying in the FAILING state?