osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: StreamingFileSink causing AmazonS3Exception


Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) 


I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed
and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6


However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <k.kloudas@xxxxxxxxxxxxxxxxx> wrote:
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas