[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS][TABLE] How to handle empty delete for UpsertSource

Hi all,
Thanks a lot for your replies, @Piotr Nowojski
<piotr@xxxxxxxxxxxxxxxxx> @Fabian
Hueske <fhueske@xxxxxxxxxx> @Xingcan Cui <xingcanc@xxxxxxxxx>

Option 3 will send empty deletes to downstream operators, but downstream
operators seems don't know how to deal with it.
Assume that upsert source forward deletes by default. Let's see what
problems we have.

Suppose we have two upsert tables with key. One is category(key: cate_id),
the other is item(key: item_id).
cate_id,  cate_v
(delete  a, 1)

item_id, cate_id, item_v
(add   2, a, 1)
(add   2, a, 2)
(delete 1, a, 1)
(add 1, a, 2)

case 1: source

> insert into xxx
> select * from item;

In this case, forward the deletes and remove data from the sink table.
There seems to be no problem here.

case 2: join

> insert into xxx
> select * from item join category on item.cate_id = category.cate_id;

Same to case1, would user want to delete all items of category 'a'? For
record (delete  a, 1) from category, should it be filtered by join or be
joined with all items from item table?
If be filtered, it is semantically conflict with case 1.
If not be filtered, it means we have to storage delete messages in state of
join since data from item table may come later to join. Also, how to deal
with these deletes in state is a problem. I think the logic of join will be
very complicated.

case 3: aggregate

> insert into xxx
> select sum(item_v) from item

What's the result of sum, 4 or 5?
If the answer is 4, how can the downstream group_by tell which delete is an
empty delete or a retract delete? PS: the changelog from item table to
group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably
can add a message type to solve the problem, but it definitely will bring
large changes and I don't think it is time to do it.
If the answer is 3, the result is definitely wrong.

case 4: Filter

> insert into xxx
> select item_id, item_v from item where item_v < 2 and item_id = 1

For item of item_id=1, if Filter also generates empty deletes, these two
kinds of deletes can not be distinguished by the sink. The problem is:
should we filter empty deletes in sink?
If yes, it is semantically conflicting with case 1.
If no, deletes generated by Filter can not be filtered, this can cause
devastating pressure on the physical database.
Furthermore, I don't think we can use best effort. Considering the
anti-spamming scenario, users probably just want to get top 1% data from
the result, the rest 99% of the data will all become delete messages after
the Filter. This would be a disaster for a storage. Especially for the
case, most of the coming keys are new ones and can not be swallowed by a
cache with best effort. We can not release a version of flink that take a
chance to bring a disaster to our user, even the change is small.

Thus, I don't think we should allow empty deletes output from source. It
not only extends a new feature, but also extends a new concept that a
dynamic table allows empty deletes in it. Once there are empty deletes in a
dynamic table, we need to support processing empty deletes in all operators
of table. We also have to define clear semantics on it.

Thanks again for all your replies and patience.
Best, Hequn

On Tue, Aug 21, 2018 at 10:17 PM Xingcan Cui <xingcanc@xxxxxxxxx> wrote:

> Hi Hequn,
> Thanks for this discussion.
> Personally, I’m also in favor of option 3. There are two reasons for that:
> (1) A proctime-based upsert table source does not guarantee the records’
> order, which means empty delete messages may not really be "empty". Simply
> discarding them may cause semantics problems.
> (2) Materializing the table in the source doesn't sound like an efficient
> solution, especially considering the downstream operators may also need to
> materialize the immediate tables many times.
> Therefore, why not choosing a "lazy strategy", i.e., just forward the
> messages and let the operators that are sensitive with empty delete to
> tackle them.
> As for the filtering problem, maybe the best approach would be to cache
> all the keys that meet the criteria and send a retract message when it
> changes.
> BTW, recently, I’m getting a more and more intense feeling that maybe we
> should merge the retract message and upsert message into a unified “update
> message”. (Append Stream VS Update Stream).
> Best,
> Xingcan
> > On Aug 20, 2018, at 7:51 PM, Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
> wrote:
> >
> > Hi,
> >
> > Thanks for bringing up this issue here.
> >
> > I’m not sure whether sometimes swallowing empty deletes could be a
> problem or always swallowing/forwarding them is better. I guess for most
> use cases it doesn't matter. Maybe the best for now would be to always
> forward them, since if they are a problem, user could handle them somehow,
> either in custom sink wrapper or in system that’s downstream from Flink.
> Also maybe we could have this configurable in the future.
> >
> > However this thing seems to me like a much lower priority compared to
> performance implications. Forcing upsert source to always keep all of the
> keys on the state is not only costly, but in many cases it can be a blocker
> from executing a query at all. Not only for the UpsertSource -> Calc ->
> UpsertSink, but also for example in the future for joins or ORDER BY
> (especially with LIMIT) as well.
> >
> > I would apply same reasoning to FLINK-9528.
> >
> > Piotrek
> >
> >> On 19 Aug 2018, at 08:21, Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
> >>
> >> Hi all,
> >>
> >> Currently, I am working on FLINK-8577 Implement proctime DataStream to
> >> Table upsert conversion <
> https://issues.apache.org/jira/browse/FLINK-8577>.
> >> And a design doc can be found here
> >> <
> https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing
> >.
> >> It received many valuable suggestions. Many thanks to all of you.
> >> However there are some problems I think may need more discussion.
> >>
> >> *Terms*
> >>
> >>  1. *Upsert Stream:* Stream that include a key definition and will be
> >>  updated. Message types include insert, update and delete.
> >>  2. *Upsert Source:* Source that ingest Upsert Stream.
> >>  3. *Empty Delete:* For a specific key, the first message is a delete
> >>  message.
> >>
> >> *Problem to be discussed*
> >> How to handle empty deletes for UpsertSource?
> >>
> >> *Ways to solve the problem*
> >>
> >>  1. Throw away empty delete messages in the UpsertSource(personally in
> >>  favor of this option)
> >>     - advantages
> >>     - This makes sense in semantics. An empty table + delete message is
> >>        still an empty table. Losing deletes does not affect the final
> results.
> >>        - At present, the operators or functions in flink are assumed to
> >>        process the add message first and then delete. Throw away
> >> empty deletes in
> >>        source, so that the downstream operators do not need to
> >> consider the empty
> >>        deletes.
> >>        - disadvantages
> >>     - Maintaining the state in source is expensive, especially for some
> >>        simple sql like: UpsertSource -> Calc -> UpsertSink.
> >>        2. Throw away empty delete messages when source generate
> >>  retractions, otherwise pass empty delete messages down
> >>     - advantages
> >>     - Downstream operator does not need to consider empty delete
> messages
> >>        when the source generates retraction.
> >>        - Performance is better since source don't have to maintain state
> >>        if it doesn't generate retractions.
> >>        - disadvantages
> >>     - The judgment that whether the downstream operator will receive
> >>        empty delete messages is complicated. Not only take source into
> >>        consideration, but also should consider the operators that
> >> are followed by
> >>        source. Take join as an example, for the sql: upsert_source
> >> -> upsert_join,
> >>        the join receives empty deletes while in sql(upsert_source ->
> >> group_by ->
> >>        upsert_join), the join doesn't since empty deletes are ingested
> by
> >>        group_by.
> >>        - The semantics of how to process empty deletes are not clear.
> >>        Users may be difficult to understand, because sometimes empty
> >> deletes are
> >>        passed down, but sometimes don't.
> >>        3. Pass empty delete messages down always
> >>     - advantages
> >>     - Performance is better since source don't have to maintain state if
> >>        it doesn't generate retractions.
> >>        - disadvantages
> >>     - All table operators and functions in flink need to consider empty
> >>        deletes.
> >>
> >> *Another related problem*
> >> Another related problem is FLINK-9528 Incorrect results: Filter does not
> >> treat Upsert messages correctly
> >> <https://issues.apache.org/jira/browse/FLINK-9528> which I think
> should be
> >> considered together.
> >> The problem in FLINK-9528 is, for sql like upsert_source -> filter ->
> >> upsert_sink, when the data of a key changes from non-filtering to
> >> filtering, the filter only removes the upsert message such that the
> >> previous version remains in the result.
> >>
> >>  1. One way to solve the problem is to make UpserSource generates
> >>  retractions.
> >>  2. Another way is to make a filter aware of the update semantics
> >>  (retract or upsert) and convert the upsert message into a delete
> message if
> >>  the predicate evaluates to false.
> >>
> >> The second way will also generate many empty delete messages. To avoid
> too
> >> many empty deletes, the solution is to maintain a filter state at sink
> to
> >> prevent the empty deletes from causing devastating pressure on the
> physical
> >> database. However, if UpsertSource can also output empty deletes, these
> >> empty deletes will be more difficult to control. We don't know where
> these
> >> deletes come from, and whether should be filtered out. The ambiguity of
> the
> >> semantics of processing empty deletes makes the user unable to judge
> >> whether there will be empty deletes output.
> >>
> >> *My personal opinion*
> >> From my point of view, I think the first option(Throw away empty delete
> >> messages in the UpsertSource) is the best, not only because the
> semantics
> >> are more clear but also the processing logic of the entire table layer
> can
> >> be more simple thus more efficient. Furthermore the performance loss is
> >> acceptable (We can even only store key in state when source doesn't
> >> generate retraction).
> >>
> >> Any suggestions are greatly appreciated!
> >>
> >> Best, Hequn
> >
> >