[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Handling burst I/O when using tumbling/sliding windows


Sorry for getting back so late and thanks for the improved document :) I think now I got your idea.

You are now trying (or have you already done it?) to implement a custom window assigner, that would work as in the [Figure 3] from your document? 

I think that indeed should be possible and relatively easy to do without the need for API changes.


On 1 Oct 2018, at 17:48, Rong Rong <walterddr@xxxxxxxxx> wrote:

Hi Piotrek,

Thanks for the quick response. To follow up with the questions:
Re 1). Yes it is causing network I/O issues on Kafka itself.

Re 2a). Actually. I thought about it last weekend and I think there's a way
for a work around: We directly duplicated the key extraction logic in our
window assigner. Since the element record is passed in, it should be OK to
create a customized window assigner to handle offset-based on key by
extracting the key from record
This was the main part of my change: to let WindowAssignerContext to
provide current key information extracted from KeyedStateBackend.

Re 2b). Thanks for the explanation, we will try to profile it! We've seems
some weird behaviors previously when loading up the network buffer in
Flink, although it's very rare and inconsistent when trying to reproduce.

Re 3) Regarding the event time offset. I think I might have not explain my
idea clearly. I added some more details to the doc. Please kindly take a
In a nutshell, window offsets does not change the event time of records at
all. We simply changes how window assigner assigns records to windows with
various different offsets.


On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>


Thanks for the response again :)

Re 1). Do you mean that this extra burst external I/O network traffic is
causing disturbance with other systems reading/writing from Kafka? With
Kafka itself?

Re 2a) Yes, it should be relatively simple, however any new brick makes
the overall component more and more complicated, which has long term
consequences in maintenance/refactoring/adding new features/just making
reading the code more difficult etc.

Re 2b) With setup of:

WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink

RateLimitingOperator would just slow down data processing via standard
back pressure mechanism. Flink by default allocates 10% of the memory to
Network buffers we could partially relay on them to buffer some smaller
bursts, without blocking whole pipeline altogether. Essentially
RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
record emission from the WindowOperator. So yes, there would be still batch
emission of the data in the WindowOperator itself, but it would be
prolonged/slowed down in terms of wall time because of down stream back
pressure caused by RateLimitingOperator.

Btw, with your proposal, with what event time do you want to emit the
delayed data? If the event time of the produced records changes based on
using/not using windows offsets, this can cause quite a lot of semantic
problems and side effects for the downstream operators.


On 28 Sep 2018, at 15:18, Rong Rong <walterddr@xxxxxxxxx> wrote:

Hi Piotrek,

Thanks for getting back to me so quickly. Let me explain.

Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
system with same partition number on both side. It is causing degraded
performance in external I/O network traffic.
It is definitely possible to configure more resource (e.g. larger
count) for output to handle the burst but it can also be resolved through
some sort of smoothing through internal (either through rate limiting as
you suggested, or through the dynamic offset).

Re 2a). Yes I agree and I think I understand your concern. However it is
one simple API addition with default fallbacks that are fully
backward-compatible (or I think it be made fully compatible if I missed
corner cases).
Re 2b). Yes. there could be many potential issues that causes data burst.
However, putting aside the scenarios that was caused by the nature of the
stream (data skew, bursts) that both affects input and output. We want to
address specifically the case that a smooth input is *deterministically*
resulting in burst output. What we are proposing here is kind of exactly
like the case of users' customer operator. However we can't do so unless
there's an API to control the offset.

Regarding the problem of rate limiting and skew. I think I missed one key
point from you. I think you are right. If we introduce a *new rate
operator *(with size > 0) it will
- causes extra state usage within the container (moving all the
components from window operator and store in rate limit buffer at window
- will not cause data skew problem: The data skew problem I mentioned is
that, if data are buffered in window operator state longer for some data
but not the other. Then potentially some panes will handle more late
arrival than others.

However if it is possible to get rid of the extra memory usage we will
definitely benchmark the rate-limit approach. Can you be more specific on
how setting the rate-limit operator (size = 0) can resolve the burst
If I understand correctly the backpressure will cause the watermark to
advance, but once it crosses the window boundary, there will still be a
batch of messages emitting out of the window operator at the same time,


On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>


Re 1. Can you be more specific? What system are you using, what’s
happening and how does it brake?

While delaying windows firing is probably the most cost effective
for this particular problem, it has some disadvantages:
a) putting even more logic to already complicated component
b) not solving potential similar problems. I can easily imagine the same
issue happening to other scenarios then "interval based operators” such
      - input sources faster then output sinks
      - data skew
      - data bursts
      - users' custom operators causing data bursts
      - users’ custom operators being prone to bursts (maybe something
like AsyncOperator or something else that works with an external
system) -
so the problem might not necessarily be limited to the sinks

As far as I recall, there were some users reporting some similar issues.

Regarding potential drawbacks of rate limiting, I didn’t understand this

However the problem is similar to delay triggers which can provide
degraded performance for skew sensitive downstream service, such as
feature extraction results to deep learning model.

The way how I could imagine RateLimitingOperator is that it could take a
parameters: rate limits, buffer size limit.

With buffer size = 0, it would cause immediately a back pressure if rate
is exceeded
With buffer size > 0, ti would first buffer events on the state and only
when reaching max buffer size, causing the back pressure

For the case with WindowOperator, if windows are evicted and removed
the state, using buffer size > 0, wouldn’t cause increased state usage,
would only move the state from the WindowOperator to the


On 27 Sep 2018, at 17:28, Rong Rong <walterddr@xxxxxxxxx> wrote:

HI Piotrek,

Yes, to be more clear,
1) the network I/O issue I am referring to is in between Flink and
sink. We did not see issues in between operators.
2) yes we've considered rate limiting sink functions as well which is
mentioned in the doc. along with some of the the pro-con we identified.

This kind of problem seems to only occur in WindowOperator so far, but
it can probably occur to any aligned interval based operator.


On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <



Thanks for the proposal. Could you provide more
background/explanation/motivation why do you need such feature? What
mean by “network I/O” degradation?

On it’s own burst writes shouldn’t cause problems within Flink. If
do, we might want to fix the original underlying problem and if they
causing problems in external systems, we also might think about other
approaches to fix/handle the problem (write rate limiting?), which
might be
more general and not fixing only bursts originating from
I’m not saying that your proposal is bad or anything, but I would just
to have more context :)


On 26 Sep 2018, at 19:21, Rong Rong <walterddr@xxxxxxxxx> wrote:

Hi Dev,

I was wondering if there's any previous discussion regarding how to
burst network I/O when deploying Flink applications with window

We've recently see some significant network I/O degradation when
use sliding window to perform rolling aggregations. The pattern is
periodic: output connections get no traffic for a period of time
burst at window boundaries (in our case every 5 minutes).

We have drafted a doc


how we proposed to handle it to smooth the output traffic spikes.
kindly take a look, any comments and suggestions are highly