osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

StreamingFileSink causing AmazonS3Exception


Hi there,

 

I'm trying to persist events coming from a Kinesis Stream to S3 using the new StreamingFileSink.

 

    final StreamingFileSink<TripEvent> bulkFormatSink = StreamingFileSink

        .forBulkFormat(

            new Path(...),

            ParquetAvroWriters.forSpecificRecord(TripEvent.class)

        )

        .withBucketAssigner(...)

        .build();

 

I'm ingesting 50-60k events/sec into a Kinesis stream with 64 shards. Flink is deployed on an EMR cluster with 4 m5.xlarge nodes and configured to use three nodes with two slots each. The job itself is running with a parallelism of 6 and checkpoints are triggered ever 15 minutes. This works, but I'm occasionally seeing checkpoints fail (roughly every 10 hours), which is caused by an AmazonS3Exception (see attachment for the whole trace):

 

Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: D2BDE02E02977189; S3 Extended Request ID: +elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A=), S3 Extended Request ID: +elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A=

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)

 

From the attached screenshot you can see that a couple of minutes before the exception occurs, around the time the checkpoint is triggered, the number of records that are read from the Kinesis stream is substantially dropping relative to the number of records that are ingested into the stream. Eventually the checkpoint fails, but as HA is configured for the cluster the job manages to recover from the failure.

 

My initial thought was that the cluster is overloaded, but the average cpu utilization is around 20%, the network isn’t saturated either, and scaling the cluster to larger nodes still showed a similar behavior.

 

Is this a known issue? It would be great if someone could give me some pointers on how to debug this further.

 

Thanks, Steffen

 




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094


Attachment: s3-exception-ganglia.png
Description: s3-exception-ganglia.png

Attachment: s3-exception-cloudwatch.png
Description: s3-exception-cloudwatch.png

Attachment: s3-exception-job-manager.log
Description: s3-exception-job-manager.log